{-# OPTIONS_GHC -Wno-missing-fields #-}
{-# OPTIONS_GHC -Wno-incomplete-patterns #-}

{-|
Module      : Z.IO.BIO.Concurrent
Description : Base64 codec
Copyright   : (c) Dong Han, 2017-2020
License     : BSD
Maintainer  : winterland1989@gmail.com
Stability   : experimental
Portability : non-portable

This module provides some concurrent 'BIO' node, to ease the implementation of producer-consumer model.
All sources and sinks return by this module are safe to be used in multiple threads.

  * Use 'newTQueuePair' for common cases.
  * Use 'newTBQueuePair' if you have a fast producer and you don't want input get piled up in memory.
  * Use 'newBroadcastTChanPair' if you want messages get broadcasted, i.e. every message written by
    producers will be received by every consumers.

It's important to correctly set the numebr of producers, internally it keeps a counter on how many producers
reached their ends, and send EOF to all consumers when last producer ends. So it's a good idea to catch
exceptions and pull the sink(which indicate EOF) on producer side.

@
(sink, src) <- newTQueuePair 2  -- it's important to correctly set the numebr of producers

--------------------------------------------------------------------------------
-- producers

forkIO $ do
    ...
    push x sink             -- producer using push
    ...
    pull sink               -- when EOF is reached, manually pull, you may consider put it in a bracket.

forkIO $ do
    ...
    (runBIO $ ... . sink) -- producer using BIO
        `onException` (pull sink)

--------------------------------------------------------------------------------
-- consumers

forkIO $ do
    ...
    r <- pull src           -- consumer using pull
    case r of Just r' -> ...
              _ -> ...      -- EOF indicate all producers reached EOF

forkIO $ do
    ...
    runBIO $ src . ...    -- consumer using BIO
@

-}

module Z.IO.BIO.Concurrent where

import Control.Monad
import Control.Concurrent
import Control.Concurrent.STM
import qualified Data.Sequence as Seq
import Data.Sequence (Seq((:<|),(:|>)))
import GHC.Natural
import Z.IO.BIO.Base
import Z.Data.PrimRef
import Z.IO.Exception

-- | Zip two BIO node by running them concurrently.
--
-- This implementation use 'MVar' to synchronize two BIO's output, which has some implications:
--
--   * Two node should output same numebr of results.
--   * If the number differs, one node maybe
--
zip :: BIO a b -> BIO a c -> BIO a (b,c)
{-# INLINABLE zip #-}
zip :: BIO a b -> BIO a c -> BIO a (b, c)
zip BIO a b
b1 BIO a c
b2 = \ Maybe (b, c) -> IO ()
k Maybe a
mx -> do
    TVar Bool
bEOF <- Bool -> IO (TVar Bool)
forall a. a -> IO (TVar a)
newTVarIO Bool
False
    TVar Bool
cEOF <- Bool -> IO (TVar Bool)
forall a. a -> IO (TVar a)
newTVarIO Bool
False
    TVar (Seq b)
bBuf <- Seq b -> IO (TVar (Seq b))
forall a. a -> IO (TVar a)
newTVarIO Seq b
forall a. Seq a
Seq.empty
    TVar (Seq c)
cBuf <- Seq c -> IO (TVar (Seq c))
forall a. a -> IO (TVar a)
newTVarIO Seq c
forall a. Seq a
Seq.empty
    ThreadId
_ <- IO () -> IO ThreadId
forkIO (BIO a b
b1 (TVar (Seq b) -> TVar Bool -> Maybe b -> IO ()
forall a. TVar (Seq a) -> TVar Bool -> Maybe a -> IO ()
f TVar (Seq b)
bBuf TVar Bool
bEOF) Maybe a
mx)
    ThreadId
_ <- IO () -> IO ThreadId
forkIO (BIO a c
b2 (TVar (Seq c) -> TVar Bool -> Maybe c -> IO ()
forall a. TVar (Seq a) -> TVar Bool -> Maybe a -> IO ()
f TVar (Seq c)
cBuf TVar Bool
cEOF) Maybe a
mx)
    (Maybe (b, c) -> IO ())
-> TVar (Seq b) -> TVar (Seq c) -> TVar Bool -> TVar Bool -> IO ()
forall a b a.
(Maybe (a, b) -> IO a)
-> TVar (Seq a) -> TVar (Seq b) -> TVar Bool -> TVar Bool -> IO a
loop Maybe (b, c) -> IO ()
k TVar (Seq b)
bBuf TVar (Seq c)
cBuf TVar Bool
bEOF TVar Bool
cEOF
  where
    f :: TVar (Seq a) -> TVar Bool -> Maybe a -> IO ()
f TVar (Seq a)
xBuf TVar Bool
xEOF = \ Maybe a
mx ->
        case Maybe a
mx of
            Just a
x -> STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar (Seq a) -> (Seq a -> Seq a) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar (Seq a)
xBuf (Seq a -> a -> Seq a
forall a. Seq a -> a -> Seq a
:|> a
x)
            Maybe a
_ -> STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar Bool -> Bool -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar Bool
xEOF Bool
True

    loop :: (Maybe (a, b) -> IO a)
-> TVar (Seq a) -> TVar (Seq b) -> TVar Bool -> TVar Bool -> IO a
loop Maybe (a, b) -> IO a
k TVar (Seq a)
bBuf TVar (Seq b)
cBuf TVar Bool
bEOF TVar Bool
cEOF = IO (IO a) -> IO a
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (IO (IO a) -> IO a)
-> (STM (IO a) -> IO (IO a)) -> STM (IO a) -> IO a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM (IO a) -> IO (IO a)
forall a. STM a -> IO a
atomically (STM (IO a) -> IO a) -> STM (IO a) -> IO a
forall a b. (a -> b) -> a -> b
$ do
        Seq a
bs <- TVar (Seq a) -> STM (Seq a)
forall a. TVar a -> STM a
readTVar TVar (Seq a)
bBuf
        Seq b
cs <- TVar (Seq b) -> STM (Seq b)
forall a. TVar a -> STM a
readTVar TVar (Seq b)
cBuf
        Bool
beof <- TVar Bool -> STM Bool
forall a. TVar a -> STM a
readTVar TVar Bool
bEOF
        Bool
ceof <- TVar Bool -> STM Bool
forall a. TVar a -> STM a
readTVar TVar Bool
cEOF
        case Seq a
bs of
            a
b :<| Seq a
bs' -> case Seq b
cs of
                b
c :<| Seq b
cs' -> do
                    TVar (Seq a) -> Seq a -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Seq a)
bBuf Seq a
bs'
                    TVar (Seq b) -> Seq b -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Seq b)
cBuf Seq b
cs'
                    IO a -> STM (IO a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe (a, b) -> IO a
k ((a, b) -> Maybe (a, b)
forall a. a -> Maybe a
Just (a
b, b
c)) IO a -> IO a -> IO a
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> (Maybe (a, b) -> IO a)
-> TVar (Seq a) -> TVar (Seq b) -> TVar Bool -> TVar Bool -> IO a
loop Maybe (a, b) -> IO a
k TVar (Seq a)
bBuf TVar (Seq b)
cBuf TVar Bool
bEOF TVar Bool
cEOF)
                Seq b
_ -> if Bool
ceof then IO a -> STM (IO a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe (a, b) -> IO a
k Maybe (a, b)
forall a. Maybe a
EOF) else STM (IO a)
forall a. STM a
retry
            Seq a
_ -> if Bool
beof then IO a -> STM (IO a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe (a, b) -> IO a
k Maybe (a, b)
forall a. Maybe a
EOF) else STM (IO a)
forall a. STM a
retry

-- | Make an unbounded queue and a pair of sink and souce connected to it.
newTQueuePair :: Int -- ^ number of producers
              -> IO (Sink a, Source a)
{-# INLINABLE newTQueuePair #-}
newTQueuePair :: Int -> IO (Sink a, Source a)
newTQueuePair Int
n = do
    TQueue (Maybe a)
q <- IO (TQueue (Maybe a))
forall a. IO (TQueue a)
newTQueueIO
    Counter
ec <- Int -> IO Counter
newCounter Int
0
    (Sink a, Source a) -> IO (Sink a, Source a)
forall (m :: * -> *) a. Monad m => a -> m a
return
        ( \ Maybe () -> IO ()
k Maybe a
mx -> case Maybe a
mx of
                Just a
_ -> STM () -> IO ()
forall a. STM a -> IO a
atomically (TQueue (Maybe a) -> Maybe a -> STM ()
forall a. TQueue a -> a -> STM ()
writeTQueue TQueue (Maybe a)
q Maybe a
mx)
                Maybe a
_ -> do
                    Int
i <- Counter -> Int -> IO Int
atomicAddCounter' Counter
ec Int
1
                    Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
i Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
n) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
                        STM () -> IO ()
forall a. STM a -> IO a
atomically (TQueue (Maybe a) -> Maybe a -> STM ()
forall a. TQueue a -> a -> STM ()
writeTQueue TQueue (Maybe a)
q Maybe a
forall a. Maybe a
EOF)
                        Maybe () -> IO ()
k Maybe ()
forall a. Maybe a
EOF

        , \ Maybe a -> IO ()
k Maybe Void
_ ->
            let loop :: IO ()
loop = ((forall a. IO a -> IO a) -> IO ()) -> IO ()
forall b. ((forall a. IO a -> IO a) -> IO b) -> IO b
uninterruptibleMask (((forall a. IO a -> IO a) -> IO ()) -> IO ())
-> ((forall a. IO a -> IO a) -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \ forall a. IO a -> IO a
restore -> do
                    Maybe a
x <- IO (Maybe a) -> IO (Maybe a)
forall a. IO a -> IO a
restore (IO (Maybe a) -> IO (Maybe a)) -> IO (Maybe a) -> IO (Maybe a)
forall a b. (a -> b) -> a -> b
$ STM (Maybe a) -> IO (Maybe a)
forall a. STM a -> IO a
atomically (TQueue (Maybe a) -> STM (Maybe a)
forall a. TQueue a -> STM a
readTQueue TQueue (Maybe a)
q)
                    case Maybe a
x of Just a
_ -> Maybe a -> IO ()
k Maybe a
x IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO ()
loop
                              Maybe a
_ -> do STM () -> IO ()
forall a. STM a -> IO a
atomically (TQueue (Maybe a) -> Maybe a -> STM ()
forall a. TQueue a -> a -> STM ()
unGetTQueue TQueue (Maybe a)
q Maybe a
forall a. Maybe a
EOF)
                                      Maybe a -> IO ()
k Maybe a
forall a. Maybe a
EOF
            in IO ()
loop)

-- | Make an bounded queue and a pair of sink and souce connected to it.
newTBQueuePair :: Int       -- ^ number of producers
               -> Natural   -- ^ queue buffer bound
               -> IO (Sink a, Source a)
{-# INLINABLE newTBQueuePair #-}
newTBQueuePair :: Int -> Natural -> IO (Sink a, Source a)
newTBQueuePair Int
n Natural
bound = do
    TBQueue (Maybe a)
q <- Natural -> IO (TBQueue (Maybe a))
forall a. Natural -> IO (TBQueue a)
newTBQueueIO Natural
bound
    Counter
ec <- Int -> IO Counter
newCounter Int
0
    (Sink a, Source a) -> IO (Sink a, Source a)
forall (m :: * -> *) a. Monad m => a -> m a
return
        ( \ Maybe () -> IO ()
k Maybe a
mx -> case Maybe a
mx of
                Just a
_ -> STM () -> IO ()
forall a. STM a -> IO a
atomically (TBQueue (Maybe a) -> Maybe a -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue (Maybe a)
q Maybe a
mx)
                Maybe a
_ -> do
                    Int
i <- Counter -> Int -> IO Int
atomicAddCounter' Counter
ec Int
1
                    Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
i Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
n) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
                        STM () -> IO ()
forall a. STM a -> IO a
atomically (TBQueue (Maybe a) -> Maybe a -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue (Maybe a)
q Maybe a
forall a. Maybe a
EOF)
                        Maybe () -> IO ()
k Maybe ()
forall a. Maybe a
EOF

        , \ Maybe a -> IO ()
k Maybe Void
_ ->
            let loop :: IO ()
loop = ((forall a. IO a -> IO a) -> IO ()) -> IO ()
forall b. ((forall a. IO a -> IO a) -> IO b) -> IO b
uninterruptibleMask (((forall a. IO a -> IO a) -> IO ()) -> IO ())
-> ((forall a. IO a -> IO a) -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \ forall a. IO a -> IO a
restore -> do
                    Maybe a
x <- IO (Maybe a) -> IO (Maybe a)
forall a. IO a -> IO a
restore (IO (Maybe a) -> IO (Maybe a)) -> IO (Maybe a) -> IO (Maybe a)
forall a b. (a -> b) -> a -> b
$ STM (Maybe a) -> IO (Maybe a)
forall a. STM a -> IO a
atomically (TBQueue (Maybe a) -> STM (Maybe a)
forall a. TBQueue a -> STM a
readTBQueue TBQueue (Maybe a)
q)
                    case Maybe a
x of Just a
_ -> Maybe a -> IO ()
k Maybe a
x IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO ()
loop
                              Maybe a
_ -> do STM () -> IO ()
forall a. STM a -> IO a
atomically (TBQueue (Maybe a) -> Maybe a -> STM ()
forall a. TBQueue a -> a -> STM ()
unGetTBQueue TBQueue (Maybe a)
q Maybe a
forall a. Maybe a
EOF)
                                      Maybe a -> IO ()
k Maybe a
forall a. Maybe a
EOF
            in IO ()
loop)

-- | Make a broadcast chan and a sink connected to it, and a function return sources to receive broadcast message.
newBroadcastTChanPair :: Int                        -- ^ number of producers
                      -> IO (Sink a, IO (Source a)) -- ^ (Sink, IO Source)
{-# INLINABLE newBroadcastTChanPair #-}
newBroadcastTChanPair :: Int -> IO (Sink a, IO (Source a))
newBroadcastTChanPair Int
n = do
    TChan (Maybe a)
b <- IO (TChan (Maybe a))
forall a. IO (TChan a)
newBroadcastTChanIO
    Counter
ec <- Int -> IO Counter
newCounter Int
0
    let dupSrc :: IO (Source a)
dupSrc = do
            TChan (Maybe a)
c <- STM (TChan (Maybe a)) -> IO (TChan (Maybe a))
forall a. STM a -> IO a
atomically (TChan (Maybe a) -> STM (TChan (Maybe a))
forall a. TChan a -> STM (TChan a)
dupTChan TChan (Maybe a)
b)
            Source a -> IO (Source a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Source a -> IO (Source a)) -> Source a -> IO (Source a)
forall a b. (a -> b) -> a -> b
$ \ Maybe a -> IO ()
k Maybe Void
_ ->
                let loop :: IO ()
loop = do
                        Maybe a
x <- STM (Maybe a) -> IO (Maybe a)
forall a. STM a -> IO a
atomically (TChan (Maybe a) -> STM (Maybe a)
forall a. TChan a -> STM a
readTChan TChan (Maybe a)
c)
                        case Maybe a
x of Just a
_ -> Maybe a -> IO ()
k Maybe a
x IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO ()
loop
                                  Maybe a
_ -> Maybe a -> IO ()
k Maybe a
forall a. Maybe a
EOF
                in IO ()
loop

    (Sink a, IO (Source a)) -> IO (Sink a, IO (Source a))
forall (m :: * -> *) a. Monad m => a -> m a
return
        (\ Maybe () -> IO ()
k Maybe a
mx -> case Maybe a
mx of
            Just a
_ -> STM () -> IO ()
forall a. STM a -> IO a
atomically (TChan (Maybe a) -> Maybe a -> STM ()
forall a. TChan a -> a -> STM ()
writeTChan TChan (Maybe a)
b Maybe a
mx)
            Maybe a
_ -> do Int
i <- Counter -> Int -> IO Int
atomicAddCounter' Counter
ec Int
1
                    Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
i Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
n) (STM () -> IO ()
forall a. STM a -> IO a
atomically (TChan (Maybe a) -> Maybe a -> STM ()
forall a. TChan a -> a -> STM ()
writeTChan TChan (Maybe a)
b Maybe a
forall a. Maybe a
EOF))
                    Maybe () -> IO ()
k Maybe ()
forall a. Maybe a
EOF
       , IO (Source a)
dupSrc)