{-# OPTIONS_GHC -Wno-missing-fields #-}
{-# OPTIONS_GHC -Wno-incomplete-patterns #-}

{-|
Module      : Z.IO.BIO.Concurrent
Description : Base64 codec
Copyright   : (c) Dong Han, 2017-2020
License     : BSD
Maintainer  : winterland1989@gmail.com
Stability   : experimental
Portability : non-portable

This module provides some concurrent 'BIO' node, to ease the implementation of producer-consumer model.
All sources and sinks return by this module are safe to be used in multiple threads.

  * Use 'newTQueuePair' for common cases.
  * Use 'newTBQueuePair' if you have a fast producer and you don't want input get piled up in memory.
  * Use 'newBroadcastTChanPair' if you want messages get broadcasted, i.e. every message written by
    producers will be received by every consumers.

It's important to correctly set the numebr of producers, internally it keeps a counter on how many producers
reached their ends, and send EOF to all consumers when last producer ends. So it's a good idea to catch
exceptions and pull the sink(which indicate EOF) on producer side.

@
(sink, src) <- newTQueuePair 2  -- it's important to correctly set the numebr of producers

--------------------------------------------------------------------------------
-- producers

forkIO $ do
    ...
    push x sink             -- producer using push
    ...
    pull sink               -- when EOF is reached, manually pull, you may consider put it in a bracket.

forkIO $ do
    ...
    (runBIO $ ... . sink) -- producer using BIO
        `onException` (pull sink)

--------------------------------------------------------------------------------
-- consumers

forkIO $ do
    ...
    r <- pull src           -- consumer using pull
    case r of Just r' -> ...
              _ -> ...      -- EOF indicate all producers reached EOF

forkIO $ do
    ...
    runBIO $ src . ...    -- consumer using BIO
@

-}

module Z.IO.BIO.Concurrent where

import Control.Monad
import Control.Concurrent
import Control.Concurrent.STM
import qualified Data.Sequence as Seq
import Data.Sequence (Seq((:<|),(:|>)))
import GHC.Natural
import Z.IO.BIO.Base
import Z.Data.PrimRef
import Z.IO.Exception

-- | Zip two BIO node by running them concurrently.
--
-- This implementation use 'MVar' to synchronize two BIO's output, which has some implications:
--
--   * Two node should output same numebr of results.
--   * If the number differs, one node maybe
--
zip :: BIO a b -> BIO a c -> BIO a (b,c)
{-# INLINABLE zip #-}
zip :: forall a b c. BIO a b -> BIO a c -> BIO a (b, c)
zip BIO a b
b1 BIO a c
b2 = \ Maybe (b, c) -> IO ()
k Maybe a
mx -> do
    TVar Bool
bEOF <- forall a. a -> IO (TVar a)
newTVarIO Bool
False
    TVar Bool
cEOF <- forall a. a -> IO (TVar a)
newTVarIO Bool
False
    TVar (Seq b)
bBuf <- forall a. a -> IO (TVar a)
newTVarIO forall a. Seq a
Seq.empty
    TVar (Seq c)
cBuf <- forall a. a -> IO (TVar a)
newTVarIO forall a. Seq a
Seq.empty
    ThreadId
_ <- IO () -> IO ThreadId
forkIO (BIO a b
b1 (forall {a}. TVar (Seq a) -> TVar Bool -> Maybe a -> IO ()
f TVar (Seq b)
bBuf TVar Bool
bEOF) Maybe a
mx)
    ThreadId
_ <- IO () -> IO ThreadId
forkIO (BIO a c
b2 (forall {a}. TVar (Seq a) -> TVar Bool -> Maybe a -> IO ()
f TVar (Seq c)
cBuf TVar Bool
cEOF) Maybe a
mx)
    forall {a} {b} {a}.
(Maybe (a, b) -> IO a)
-> TVar (Seq a) -> TVar (Seq b) -> TVar Bool -> TVar Bool -> IO a
loop Maybe (b, c) -> IO ()
k TVar (Seq b)
bBuf TVar (Seq c)
cBuf TVar Bool
bEOF TVar Bool
cEOF
  where
    f :: TVar (Seq a) -> TVar Bool -> Maybe a -> IO ()
f TVar (Seq a)
xBuf TVar Bool
xEOF = \ Maybe a
mx ->
        case Maybe a
mx of
            Just a
x -> forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar (Seq a)
xBuf (forall a. Seq a -> a -> Seq a
:|> a
x)
            Maybe a
_ -> forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TVar a -> a -> STM ()
writeTVar TVar Bool
xEOF Bool
True

    loop :: (Maybe (a, b) -> IO a)
-> TVar (Seq a) -> TVar (Seq b) -> TVar Bool -> TVar Bool -> IO a
loop Maybe (a, b) -> IO a
k TVar (Seq a)
bBuf TVar (Seq b)
cBuf TVar Bool
bEOF TVar Bool
cEOF = forall (m :: * -> *) a. Monad m => m (m a) -> m a
join forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
        Seq a
bs <- forall a. TVar a -> STM a
readTVar TVar (Seq a)
bBuf
        Seq b
cs <- forall a. TVar a -> STM a
readTVar TVar (Seq b)
cBuf
        Bool
beof <- forall a. TVar a -> STM a
readTVar TVar Bool
bEOF
        Bool
ceof <- forall a. TVar a -> STM a
readTVar TVar Bool
cEOF
        case Seq a
bs of
            a
b :<| Seq a
bs' -> case Seq b
cs of
                b
c :<| Seq b
cs' -> do
                    forall a. TVar a -> a -> STM ()
writeTVar TVar (Seq a)
bBuf Seq a
bs'
                    forall a. TVar a -> a -> STM ()
writeTVar TVar (Seq b)
cBuf Seq b
cs'
                    forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe (a, b) -> IO a
k (forall a. a -> Maybe a
Just (a
b, b
c)) forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> (Maybe (a, b) -> IO a)
-> TVar (Seq a) -> TVar (Seq b) -> TVar Bool -> TVar Bool -> IO a
loop Maybe (a, b) -> IO a
k TVar (Seq a)
bBuf TVar (Seq b)
cBuf TVar Bool
bEOF TVar Bool
cEOF)
                Seq b
_ -> if Bool
ceof then forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe (a, b) -> IO a
k forall a. Maybe a
EOF) else forall a. STM a
retry
            Seq a
_ -> if Bool
beof then forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe (a, b) -> IO a
k forall a. Maybe a
EOF) else forall a. STM a
retry

-- | Make an unbounded queue and a pair of sink and souce connected to it.
newTQueuePair :: Int -- ^ number of producers
              -> IO (Sink a, Source a)
{-# INLINABLE newTQueuePair #-}
newTQueuePair :: forall a. Int -> IO (Sink a, Source a)
newTQueuePair Int
n = do
    TQueue (Maybe a)
q <- forall a. IO (TQueue a)
newTQueueIO
    Counter
ec <- Int -> IO Counter
newCounter Int
0
    forall (m :: * -> *) a. Monad m => a -> m a
return
        ( \ Maybe () -> IO ()
k Maybe a
mx -> case Maybe a
mx of
                Just a
_ -> forall a. STM a -> IO a
atomically (forall a. TQueue a -> a -> STM ()
writeTQueue TQueue (Maybe a)
q Maybe a
mx)
                Maybe a
_ -> do
                    Int
i <- Counter -> Int -> IO Int
atomicAddCounter' Counter
ec Int
1
                    forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
i forall a. Eq a => a -> a -> Bool
== Int
n) forall a b. (a -> b) -> a -> b
$ do
                        forall a. STM a -> IO a
atomically (forall a. TQueue a -> a -> STM ()
writeTQueue TQueue (Maybe a)
q forall a. Maybe a
EOF)
                        Maybe () -> IO ()
k forall a. Maybe a
EOF

        , \ Maybe a -> IO ()
k Maybe Void
_ ->
            let loop :: IO ()
loop = forall b. ((forall a. IO a -> IO a) -> IO b) -> IO b
uninterruptibleMask forall a b. (a -> b) -> a -> b
$ \ forall a. IO a -> IO a
restore -> do
                    Maybe a
x <- forall a. IO a -> IO a
restore forall a b. (a -> b) -> a -> b
$ forall a. STM a -> IO a
atomically (forall a. TQueue a -> STM a
readTQueue TQueue (Maybe a)
q)
                    case Maybe a
x of Just a
_ -> Maybe a -> IO ()
k Maybe a
x forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO ()
loop
                              Maybe a
_ -> do forall a. STM a -> IO a
atomically (forall a. TQueue a -> a -> STM ()
unGetTQueue TQueue (Maybe a)
q forall a. Maybe a
EOF)
                                      Maybe a -> IO ()
k forall a. Maybe a
EOF
            in IO ()
loop)

-- | Make an bounded queue and a pair of sink and souce connected to it.
newTBQueuePair :: Int       -- ^ number of producers
               -> Natural   -- ^ queue buffer bound
               -> IO (Sink a, Source a)
{-# INLINABLE newTBQueuePair #-}
newTBQueuePair :: forall a. Int -> Natural -> IO (Sink a, Source a)
newTBQueuePair Int
n Natural
bound = do
    TBQueue (Maybe a)
q <- forall a. Natural -> IO (TBQueue a)
newTBQueueIO Natural
bound
    Counter
ec <- Int -> IO Counter
newCounter Int
0
    forall (m :: * -> *) a. Monad m => a -> m a
return
        ( \ Maybe () -> IO ()
k Maybe a
mx -> case Maybe a
mx of
                Just a
_ -> forall a. STM a -> IO a
atomically (forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue (Maybe a)
q Maybe a
mx)
                Maybe a
_ -> do
                    Int
i <- Counter -> Int -> IO Int
atomicAddCounter' Counter
ec Int
1
                    forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
i forall a. Eq a => a -> a -> Bool
== Int
n) forall a b. (a -> b) -> a -> b
$ do
                        forall a. STM a -> IO a
atomically (forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue (Maybe a)
q forall a. Maybe a
EOF)
                        Maybe () -> IO ()
k forall a. Maybe a
EOF

        , \ Maybe a -> IO ()
k Maybe Void
_ ->
            let loop :: IO ()
loop = forall b. ((forall a. IO a -> IO a) -> IO b) -> IO b
uninterruptibleMask forall a b. (a -> b) -> a -> b
$ \ forall a. IO a -> IO a
restore -> do
                    Maybe a
x <- forall a. IO a -> IO a
restore forall a b. (a -> b) -> a -> b
$ forall a. STM a -> IO a
atomically (forall a. TBQueue a -> STM a
readTBQueue TBQueue (Maybe a)
q)
                    case Maybe a
x of Just a
_ -> Maybe a -> IO ()
k Maybe a
x forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO ()
loop
                              Maybe a
_ -> do forall a. STM a -> IO a
atomically (forall a. TBQueue a -> a -> STM ()
unGetTBQueue TBQueue (Maybe a)
q forall a. Maybe a
EOF)
                                      Maybe a -> IO ()
k forall a. Maybe a
EOF
            in IO ()
loop)

-- | Make a broadcast chan and a sink connected to it, and a function return sources to receive broadcast message.
newBroadcastTChanPair :: Int                        -- ^ number of producers
                      -> IO (Sink a, IO (Source a)) -- ^ (Sink, IO Source)
{-# INLINABLE newBroadcastTChanPair #-}
newBroadcastTChanPair :: forall a. Int -> IO (Sink a, IO (Source a))
newBroadcastTChanPair Int
n = do
    TChan (Maybe a)
b <- forall a. IO (TChan a)
newBroadcastTChanIO
    Counter
ec <- Int -> IO Counter
newCounter Int
0
    let dupSrc :: IO (Source a)
dupSrc = do
            TChan (Maybe a)
c <- forall a. STM a -> IO a
atomically (forall a. TChan a -> STM (TChan a)
dupTChan TChan (Maybe a)
b)
            forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ \ Maybe a -> IO ()
k Maybe Void
_ ->
                let loop :: IO ()
loop = do
                        Maybe a
x <- forall a. STM a -> IO a
atomically (forall a. TChan a -> STM a
readTChan TChan (Maybe a)
c)
                        case Maybe a
x of Just a
_ -> Maybe a -> IO ()
k Maybe a
x forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO ()
loop
                                  Maybe a
_ -> Maybe a -> IO ()
k forall a. Maybe a
EOF
                in IO ()
loop

    forall (m :: * -> *) a. Monad m => a -> m a
return
        (\ Maybe () -> IO ()
k Maybe a
mx -> case Maybe a
mx of
            Just a
_ -> forall a. STM a -> IO a
atomically (forall a. TChan a -> a -> STM ()
writeTChan TChan (Maybe a)
b Maybe a
mx)
            Maybe a
_ -> do Int
i <- Counter -> Int -> IO Int
atomicAddCounter' Counter
ec Int
1
                    forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
i forall a. Eq a => a -> a -> Bool
== Int
n) (forall a. STM a -> IO a
atomically (forall a. TChan a -> a -> STM ()
writeTChan TChan (Maybe a)
b forall a. Maybe a
EOF))
                    Maybe () -> IO ()
k forall a. Maybe a
EOF
       , IO (Source a)
dupSrc)