-- | Asynchronous communication between pipes

{-# LANGUAGE CPP, RankNTypes, Safe #-}

module Pipes.Concurrent (
    -- * Inputs and Outputs
    Input(..),
    Output(..),

    -- * Pipe utilities
    fromInput,
    toOutput,

    -- * Mailboxes
    Mailbox(),
    fromMailbox,
    toMailbox,
    send',
    recv',

    -- * Actors
    spawn,
    spawn',
    withSpawn,
    withBuffer,
    Buffer(..),
    unbounded,
    bounded,
    latest,
    newest,

    -- * Re-exports
    -- $reexport
    module Control.Concurrent,
    module Control.Concurrent.STM,
    module System.Mem
    ) where

import Control.Applicative (
    Alternative(empty, (<|>)), Applicative(pure, (*>), (<*>)), (<*), (<$>) )
import Control.Concurrent (forkIO)
import Control.Concurrent.STM (atomically, STM, mkWeakTVar, newTVarIO, readTVar)
import Control.Exception (bracket)
import Control.Monad (when,void, MonadPlus(..))
import Data.Functor.Contravariant (Contravariant(contramap))
import Data.Functor.Contravariant.Divisible (
    Divisible(divide, conquer), Decidable(lose, choose))
import Data.Monoid (Monoid(mempty, mappend))
import Data.Semigroup
import Data.Void (absurd)
import Pipes (MonadIO(liftIO), yield, await, Producer', Consumer')
import Prelude hiding (read)
import System.Mem (performGC)

import qualified Control.Concurrent.Async
import qualified Control.Concurrent.STM   as S
import qualified Control.Exception

{-| An exhaustible source of values

    'recv' returns 'Nothing' if the source is exhausted
-}
newtype Input a = Input {
    Input a -> STM (Maybe a)
recv :: S.STM (Maybe a) }

instance Functor Input where
    fmap :: (a -> b) -> Input a -> Input b
fmap a -> b
f Input a
m = STM (Maybe b) -> Input b
forall a. STM (Maybe a) -> Input a
Input ((Maybe a -> Maybe b) -> STM (Maybe a) -> STM (Maybe b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((a -> b) -> Maybe a -> Maybe b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap a -> b
f) (Input a -> STM (Maybe a)
forall a. Input a -> STM (Maybe a)
recv Input a
m))

instance Applicative Input where
    pure :: a -> Input a
pure a
r    = STM (Maybe a) -> Input a
forall a. STM (Maybe a) -> Input a
Input (Maybe a -> STM (Maybe a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (a -> Maybe a
forall (f :: * -> *) a. Applicative f => a -> f a
pure a
r))
    Input (a -> b)
mf <*> :: Input (a -> b) -> Input a -> Input b
<*> Input a
mx = STM (Maybe b) -> Input b
forall a. STM (Maybe a) -> Input a
Input (Maybe (a -> b) -> Maybe a -> Maybe b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
(<*>) (Maybe (a -> b) -> Maybe a -> Maybe b)
-> STM (Maybe (a -> b)) -> STM (Maybe a -> Maybe b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Input (a -> b) -> STM (Maybe (a -> b))
forall a. Input a -> STM (Maybe a)
recv Input (a -> b)
mf STM (Maybe a -> Maybe b) -> STM (Maybe a) -> STM (Maybe b)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Input a -> STM (Maybe a)
forall a. Input a -> STM (Maybe a)
recv Input a
mx)

instance Monad Input where
    return :: a -> Input a
return a
r = STM (Maybe a) -> Input a
forall a. STM (Maybe a) -> Input a
Input (Maybe a -> STM (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> Maybe a
forall (m :: * -> *) a. Monad m => a -> m a
return a
r))
    Input a
m >>= :: Input a -> (a -> Input b) -> Input b
>>= a -> Input b
f  = STM (Maybe b) -> Input b
forall a. STM (Maybe a) -> Input a
Input (STM (Maybe b) -> Input b) -> STM (Maybe b) -> Input b
forall a b. (a -> b) -> a -> b
$ do
        Maybe a
ma <- Input a -> STM (Maybe a)
forall a. Input a -> STM (Maybe a)
recv Input a
m
        case Maybe a
ma of
            Maybe a
Nothing -> Maybe b -> STM (Maybe b)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe b
forall a. Maybe a
Nothing
            Just a
a  -> Input b -> STM (Maybe b)
forall a. Input a -> STM (Maybe a)
recv (a -> Input b
f a
a)

instance Alternative Input where
    empty :: Input a
empty   = STM (Maybe a) -> Input a
forall a. STM (Maybe a) -> Input a
Input (Maybe a -> STM (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing)
    Input a
x <|> :: Input a -> Input a -> Input a
<|> Input a
y = STM (Maybe a) -> Input a
forall a. STM (Maybe a) -> Input a
Input (STM (Maybe a) -> Input a) -> STM (Maybe a) -> Input a
forall a b. (a -> b) -> a -> b
$ do
        (Input a
i, Maybe a
ma) <- (Maybe a -> (Input a, Maybe a))
-> STM (Maybe a) -> STM (Input a, Maybe a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((,) Input a
y) (Input a -> STM (Maybe a)
forall a. Input a -> STM (Maybe a)
recv Input a
x) STM (Input a, Maybe a)
-> STM (Input a, Maybe a) -> STM (Input a, Maybe a)
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> (Maybe a -> (Input a, Maybe a))
-> STM (Maybe a) -> STM (Input a, Maybe a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((,) Input a
x)(Input a -> STM (Maybe a)
forall a. Input a -> STM (Maybe a)
recv Input a
y)
        case Maybe a
ma of
            Maybe a
Nothing -> Input a -> STM (Maybe a)
forall a. Input a -> STM (Maybe a)
recv Input a
i
            Just a
a  -> Maybe a -> STM (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> Maybe a
forall a. a -> Maybe a
Just a
a)

instance MonadPlus Input where
    mzero :: Input a
mzero = Input a
forall (f :: * -> *) a. Alternative f => f a
empty
    mplus :: Input a -> Input a -> Input a
mplus = Input a -> Input a -> Input a
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
(<|>)

instance Data.Semigroup.Semigroup (Input a) where
    <> :: Input a -> Input a -> Input a
(<>) = Input a -> Input a -> Input a
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
(<|>)

instance Monoid (Input a) where
    mempty :: Input a
mempty = Input a
forall (f :: * -> *) a. Alternative f => f a
empty

#if !(MIN_VERSION_base(4,11,0))
    mappend = (<>)
#endif

{-| An exhaustible sink of values

    'send' returns 'False' if the sink is exhausted
-}
newtype Output a = Output {
    Output a -> a -> STM Bool
send :: a -> S.STM Bool }

instance Data.Semigroup.Semigroup (Output a) where
    Output a
i1 <> :: Output a -> Output a -> Output a
<> Output a
i2 = (a -> STM Bool) -> Output a
forall a. (a -> STM Bool) -> Output a
Output (\a
a -> Bool -> Bool -> Bool
(||) (Bool -> Bool -> Bool) -> STM Bool -> STM (Bool -> Bool)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Output a -> a -> STM Bool
forall a. Output a -> a -> STM Bool
send Output a
i1 a
a STM (Bool -> Bool) -> STM Bool -> STM Bool
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Output a -> a -> STM Bool
forall a. Output a -> a -> STM Bool
send Output a
i2 a
a)

instance Monoid (Output a) where
    mempty :: Output a
mempty  = (a -> STM Bool) -> Output a
forall a. (a -> STM Bool) -> Output a
Output (\a
_ -> Bool -> STM Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False)

#if !(MIN_VERSION_base(4,11,0))
    mappend = (<>)
#endif

-- | This instance is useful for creating new tagged address, similar to elm's
-- Signal.forwardTo. In fact elm's forwardTo is just 'flip contramap'
instance Contravariant Output where
    contramap :: (a -> b) -> Output b -> Output a
contramap a -> b
f (Output b -> STM Bool
a) = (a -> STM Bool) -> Output a
forall a. (a -> STM Bool) -> Output a
Output (b -> STM Bool
a (b -> STM Bool) -> (a -> b) -> a -> STM Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> b
f)

instance Divisible Output where
    conquer :: Output a
conquer = (a -> STM Bool) -> Output a
forall a. (a -> STM Bool) -> Output a
Output (\a
_ -> Bool -> STM Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False)
    divide :: (a -> (b, c)) -> Output b -> Output c -> Output a
divide a -> (b, c)
f Output b
i1 Output c
i2 = (a -> STM Bool) -> Output a
forall a. (a -> STM Bool) -> Output a
Output ((a -> STM Bool) -> Output a) -> (a -> STM Bool) -> Output a
forall a b. (a -> b) -> a -> b
$ \a
a -> case a -> (b, c)
f a
a of
        (b
b, c
c) -> Bool -> Bool -> Bool
(||) (Bool -> Bool -> Bool) -> STM Bool -> STM (Bool -> Bool)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Output b -> b -> STM Bool
forall a. Output a -> a -> STM Bool
send Output b
i1 b
b STM (Bool -> Bool) -> STM Bool -> STM Bool
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Output c -> c -> STM Bool
forall a. Output a -> a -> STM Bool
send Output c
i2 c
c

instance Decidable Output where
    lose :: (a -> Void) -> Output a
lose a -> Void
f = (a -> STM Bool) -> Output a
forall a. (a -> STM Bool) -> Output a
Output (Void -> STM Bool
forall a. Void -> a
absurd (Void -> STM Bool) -> (a -> Void) -> a -> STM Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Void
f)
    choose :: (a -> Either b c) -> Output b -> Output c -> Output a
choose a -> Either b c
f Output b
i1 Output c
i2 = (a -> STM Bool) -> Output a
forall a. (a -> STM Bool) -> Output a
Output ((a -> STM Bool) -> Output a) -> (a -> STM Bool) -> Output a
forall a b. (a -> b) -> a -> b
$ \a
a -> case a -> Either b c
f a
a of
        Left b
b -> Output b -> b -> STM Bool
forall a. Output a -> a -> STM Bool
send Output b
i1 b
b
        Right c
c -> Output c -> c -> STM Bool
forall a. Output a -> a -> STM Bool
send Output c
i2 c
c

{-| Combines a source of values and a sink of values

    'fromMailbox' uses 'Mailbox' as 'Pipes.Producer'
    'toMailbox' uses 'Mailbox' as 'Pipes.Consumer'
    'send\'' puts a value in the 'Mailbox'
    'recv\'' obtains a value from the 'Mailbox'
-}
type Mailbox a = (Output a, Input a)

{-| Convert a 'Mailbox' to a 'Pipes.Producer'

    'fromMailbox' terminates when the 'Mailbox' source of values is exhausted.
-}
fromMailbox :: (MonadIO m) => Mailbox a -> Producer' a m ()
fromMailbox :: Mailbox a -> Producer' a m ()
fromMailbox (Output a
_, Input a
input) = Input a -> Producer' a m ()
forall (m :: * -> *) a. MonadIO m => Input a -> Producer' a m ()
fromInput Input a
input

{-| Convert a 'Mailbox' to a 'Pipes.Consumer'

    'toMailbox' terminates when the 'Mailbox' sink of values is exhausted.
-}
toMailbox :: (MonadIO m) => Mailbox a -> Consumer' a m ()
toMailbox :: Mailbox a -> Consumer' a m ()
toMailbox (Output a
output, Input a
_) = Output a -> Consumer' a m ()
forall (m :: * -> *) a. MonadIO m => Output a -> Consumer' a m ()
toOutput Output a
output

{-| Put a value in a 'Mailbox'

    'send' returns 'False' if the 'Mailbox' sink is exhausted
-}
send' :: Mailbox a -> a -> STM Bool
send' :: Mailbox a -> a -> STM Bool
send' = Output a -> a -> STM Bool
forall a. Output a -> a -> STM Bool
send (Output a -> a -> STM Bool)
-> (Mailbox a -> Output a) -> Mailbox a -> a -> STM Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Mailbox a -> Output a
forall a b. (a, b) -> a
fst

{-| Obtain a value from a 'Mailbox'

    'recv' returns 'Nothing' if the 'Mailbox' source is exhausted
-}
recv' :: Mailbox a -> STM (Maybe a)
recv' :: Mailbox a -> STM (Maybe a)
recv' = Input a -> STM (Maybe a)
forall a. Input a -> STM (Maybe a)
recv (Input a -> STM (Maybe a))
-> (Mailbox a -> Input a) -> Mailbox a -> STM (Maybe a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Mailbox a -> Input a
forall a b. (a, b) -> b
snd

{-| Convert an 'Output' to a 'Pipes.Consumer'

    'toOutput' terminates when the 'Output' is exhausted.
-}
toOutput :: (MonadIO m) => Output a -> Consumer' a m ()
toOutput :: Output a -> Consumer' a m ()
toOutput Output a
output = Proxy () a y' y m ()
Consumer' a m ()
loop
  where
    loop :: Proxy () a y' y m ()
loop = do
        a
a     <- Proxy () a y' y m a
forall (m :: * -> *) a. Functor m => Consumer' a m a
await
        Bool
alive <- IO Bool -> Proxy () a y' y m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> Proxy () a y' y m Bool)
-> IO Bool -> Proxy () a y' y m Bool
forall a b. (a -> b) -> a -> b
$ STM Bool -> IO Bool
forall a. STM a -> IO a
S.atomically (STM Bool -> IO Bool) -> STM Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ Output a -> a -> STM Bool
forall a. Output a -> a -> STM Bool
send Output a
output a
a
        Bool -> Proxy () a y' y m () -> Proxy () a y' y m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
alive Proxy () a y' y m ()
loop
{-# INLINABLE toOutput #-}

{-| Convert an 'Input' to a 'Pipes.Producer'

    'fromInput' terminates when the 'Input' is exhausted.
-}
fromInput :: (MonadIO m) => Input a -> Producer' a m ()
fromInput :: Input a -> Producer' a m ()
fromInput Input a
input = Proxy x' x () a m ()
Producer' a m ()
loop
  where
    loop :: Proxy x' x () a m ()
loop = do
        Maybe a
ma <- IO (Maybe a) -> Proxy x' x () a m (Maybe a)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe a) -> Proxy x' x () a m (Maybe a))
-> IO (Maybe a) -> Proxy x' x () a m (Maybe a)
forall a b. (a -> b) -> a -> b
$ STM (Maybe a) -> IO (Maybe a)
forall a. STM a -> IO a
S.atomically (STM (Maybe a) -> IO (Maybe a)) -> STM (Maybe a) -> IO (Maybe a)
forall a b. (a -> b) -> a -> b
$ Input a -> STM (Maybe a)
forall a. Input a -> STM (Maybe a)
recv Input a
input
        case Maybe a
ma of
            Maybe a
Nothing -> () -> Proxy x' x () a m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
            Just a
a  -> do
                a -> Proxy x' x () a m ()
forall (m :: * -> *) a x' x. Functor m => a -> Proxy x' x () a m ()
yield a
a
                Proxy x' x () a m ()
loop
{-# INLINABLE fromInput #-}

{-| Spawn a mailbox using the specified 'Buffer' to store messages

    Using 'send' on the 'Output'

        * fails and returns 'False' if the mailbox is sealed, otherwise it:

        * retries if the mailbox is full, or:

        * adds a message to the mailbox and returns 'True'.

    Using 'recv' on the 'Input':

        * retrieves a message from the mailbox wrapped in 'Just' if the mailbox
          is not empty, otherwise it:

        * retries if the mailbox is not sealed, or:

        * fails and returns 'Nothing'.

    If either the 'Input' or 'Output' is garbage collected the mailbox will
    become sealed.
-}
spawn :: Buffer a -> IO (Output a, Input a)
spawn :: Buffer a -> IO (Output a, Input a)
spawn Buffer a
buffer = ((Output a, Input a, STM ()) -> (Output a, Input a))
-> IO (Output a, Input a, STM ()) -> IO (Output a, Input a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Output a, Input a, STM ()) -> (Output a, Input a)
forall a b c. (a, b, c) -> (a, b)
simplify (Buffer a -> IO (Output a, Input a, STM ())
forall a. Buffer a -> IO (Output a, Input a, STM ())
spawn' Buffer a
buffer)
  where
    simplify :: (a, b, c) -> (a, b)
simplify (a
output, b
input, c
_) = (a
output, b
input)
{-# INLINABLE spawn #-}

{-| Like 'spawn', but also returns an action to manually @seal@ the mailbox
    early:

> (output, input, seal) <- spawn' buffer
> ...

    Use the @seal@ action to allow early cleanup of readers and writers to the
    mailbox without waiting for the next garbage collection cycle.
-}
spawn' :: Buffer a -> IO (Output a, Input a, STM ())
spawn' :: Buffer a -> IO (Output a, Input a, STM ())
spawn' Buffer a
buffer = do
    (a -> STM ()
write, STM a
read) <- case Buffer a
buffer of
        Bounded Int
n -> do
            TBQueue a
q <- Natural -> IO (TBQueue a)
forall a. Natural -> IO (TBQueue a)
S.newTBQueueIO (Int -> Natural
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
n)
            (a -> STM (), STM a) -> IO (a -> STM (), STM a)
forall (m :: * -> *) a. Monad m => a -> m a
return (TBQueue a -> a -> STM ()
forall a. TBQueue a -> a -> STM ()
S.writeTBQueue TBQueue a
q, TBQueue a -> STM a
forall a. TBQueue a -> STM a
S.readTBQueue TBQueue a
q)
        Buffer a
Unbounded -> do
            TQueue a
q <- IO (TQueue a)
forall a. IO (TQueue a)
S.newTQueueIO
            (a -> STM (), STM a) -> IO (a -> STM (), STM a)
forall (m :: * -> *) a. Monad m => a -> m a
return (TQueue a -> a -> STM ()
forall a. TQueue a -> a -> STM ()
S.writeTQueue TQueue a
q, TQueue a -> STM a
forall a. TQueue a -> STM a
S.readTQueue TQueue a
q)
        Buffer a
Single    -> do
            TMVar a
m <- IO (TMVar a)
forall a. IO (TMVar a)
S.newEmptyTMVarIO
            (a -> STM (), STM a) -> IO (a -> STM (), STM a)
forall (m :: * -> *) a. Monad m => a -> m a
return (TMVar a -> a -> STM ()
forall a. TMVar a -> a -> STM ()
S.putTMVar TMVar a
m, TMVar a -> STM a
forall a. TMVar a -> STM a
S.takeTMVar TMVar a
m)
        Latest a
a  -> do
            TVar a
t <- a -> IO (TVar a)
forall a. a -> IO (TVar a)
S.newTVarIO a
a
            (a -> STM (), STM a) -> IO (a -> STM (), STM a)
forall (m :: * -> *) a. Monad m => a -> m a
return (TVar a -> a -> STM ()
forall a. TVar a -> a -> STM ()
S.writeTVar TVar a
t, TVar a -> STM a
forall a. TVar a -> STM a
S.readTVar TVar a
t)
        Buffer a
New       -> do
            TMVar a
m <- IO (TMVar a)
forall a. IO (TMVar a)
S.newEmptyTMVarIO
            (a -> STM (), STM a) -> IO (a -> STM (), STM a)
forall (m :: * -> *) a. Monad m => a -> m a
return (\a
x -> TMVar a -> STM (Maybe a)
forall a. TMVar a -> STM (Maybe a)
S.tryTakeTMVar TMVar a
m STM (Maybe a) -> STM () -> STM ()
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> TMVar a -> a -> STM ()
forall a. TMVar a -> a -> STM ()
S.putTMVar TMVar a
m a
x, TMVar a -> STM a
forall a. TMVar a -> STM a
S.takeTMVar TMVar a
m)
        Newest Int
n  -> do
            TBQueue a
q <- Natural -> IO (TBQueue a)
forall a. Natural -> IO (TBQueue a)
S.newTBQueueIO (Int -> Natural
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
n)
            let write :: a -> STM ()
write a
x = TBQueue a -> a -> STM ()
forall a. TBQueue a -> a -> STM ()
S.writeTBQueue TBQueue a
q a
x STM () -> STM () -> STM ()
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> (TBQueue a -> STM (Maybe a)
forall a. TBQueue a -> STM (Maybe a)
S.tryReadTBQueue TBQueue a
q STM (Maybe a) -> STM () -> STM ()
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> a -> STM ()
write a
x)
            (a -> STM (), STM a) -> IO (a -> STM (), STM a)
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> STM ()
write, TBQueue a -> STM a
forall a. TBQueue a -> STM a
S.readTBQueue TBQueue a
q)

    TVar Bool
sealed <- Bool -> IO (TVar Bool)
forall a. a -> IO (TVar a)
S.newTVarIO Bool
False
    let seal :: STM ()
seal = TVar Bool -> Bool -> STM ()
forall a. TVar a -> a -> STM ()
S.writeTVar TVar Bool
sealed Bool
True

    {- Use weak TVars to keep track of whether the 'Input' or 'Output' has been
       garbage collected.  Seal the mailbox when either of them becomes garbage
       collected.
    -}
    TVar ()
rSend <- () -> IO (TVar ())
forall a. a -> IO (TVar a)
newTVarIO ()
    IO (Weak (TVar ())) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Weak (TVar ())) -> IO ()) -> IO (Weak (TVar ())) -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar () -> IO () -> IO (Weak (TVar ()))
forall a. TVar a -> IO () -> IO (Weak (TVar a))
mkWeakTVar TVar ()
rSend (STM () -> IO ()
forall a. STM a -> IO a
S.atomically STM ()
seal)
    TVar ()
rRecv <- () -> IO (TVar ())
forall a. a -> IO (TVar a)
newTVarIO ()
    IO (Weak (TVar ())) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Weak (TVar ())) -> IO ()) -> IO (Weak (TVar ())) -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar () -> IO () -> IO (Weak (TVar ()))
forall a. TVar a -> IO () -> IO (Weak (TVar a))
mkWeakTVar TVar ()
rRecv (STM () -> IO ()
forall a. STM a -> IO a
S.atomically STM ()
seal)

    let sendOrEnd :: a -> STM Bool
sendOrEnd a
a = do
            Bool
b <- TVar Bool -> STM Bool
forall a. TVar a -> STM a
S.readTVar TVar Bool
sealed
            if Bool
b
                then Bool -> STM Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
                else do
                    a -> STM ()
write a
a
                    Bool -> STM Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
        readOrEnd :: STM (Maybe a)
readOrEnd = (a -> Maybe a
forall a. a -> Maybe a
Just (a -> Maybe a) -> STM a -> STM (Maybe a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> STM a
read) STM (Maybe a) -> STM (Maybe a) -> STM (Maybe a)
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> (do
            Bool
b <- TVar Bool -> STM Bool
forall a. TVar a -> STM a
S.readTVar TVar Bool
sealed
            Bool -> STM ()
S.check Bool
b
            Maybe a -> STM (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing )
        _send :: a -> STM Bool
_send a
a = a -> STM Bool
sendOrEnd a
a STM Bool -> STM () -> STM Bool
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f a
<* TVar () -> STM ()
forall a. TVar a -> STM a
readTVar TVar ()
rSend
        _recv :: STM (Maybe a)
_recv   = STM (Maybe a)
readOrEnd   STM (Maybe a) -> STM () -> STM (Maybe a)
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f a
<* TVar () -> STM ()
forall a. TVar a -> STM a
readTVar TVar ()
rRecv
    (Output a, Input a, STM ()) -> IO (Output a, Input a, STM ())
forall (m :: * -> *) a. Monad m => a -> m a
return ((a -> STM Bool) -> Output a
forall a. (a -> STM Bool) -> Output a
Output a -> STM Bool
_send, STM (Maybe a) -> Input a
forall a. STM (Maybe a) -> Input a
Input STM (Maybe a)
_recv, STM ()
seal)
{-# INLINABLE spawn' #-}

{-| 'withSpawn' passes its enclosed action an 'Output' and 'Input' like you'd get from 'spawn',
    but automatically @seal@s them after the action completes.  This can be used when you need the
    @seal@ing behavior available from 'spawn\'', but want to work at a bit higher level:

> withSpawn buffer $ \(output, input) -> ...

    'withSpawn' is exception-safe, since it uses 'bracket' internally.
-}
withSpawn :: Buffer a -> ((Output a, Input a) -> IO r) -> IO r
withSpawn :: Buffer a -> ((Output a, Input a) -> IO r) -> IO r
withSpawn Buffer a
buffer (Output a, Input a) -> IO r
action = IO (Output a, Input a, STM ())
-> ((Output a, Input a, STM ()) -> IO ())
-> ((Output a, Input a, STM ()) -> IO r)
-> IO r
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket
    (Buffer a -> IO (Output a, Input a, STM ())
forall a. Buffer a -> IO (Output a, Input a, STM ())
spawn' Buffer a
buffer)
    (\(Output a
_, Input a
_, STM ()
seal) -> STM () -> IO ()
forall a. STM a -> IO a
atomically STM ()
seal)
    (\(Output a
output, Input a
input, STM ()
_) -> (Output a, Input a) -> IO r
action (Output a
output, Input a
input))

-- | A more restrictive alternative to `withSpawn` that prevents deadlocks
withBuffer
    :: Buffer a
    -> (Output a -> IO l)
    -> (Input  a -> IO r)
    -> IO (l, r)
withBuffer :: Buffer a -> (Output a -> IO l) -> (Input a -> IO r) -> IO (l, r)
withBuffer Buffer a
buffer Output a -> IO l
fOutput Input a -> IO r
fInput = IO (Output a, Input a, STM ())
-> ((Output a, Input a, STM ()) -> IO ())
-> ((Output a, Input a, STM ()) -> IO (l, r))
-> IO (l, r)
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket
  (Buffer a -> IO (Output a, Input a, STM ())
forall a. Buffer a -> IO (Output a, Input a, STM ())
spawn' Buffer a
buffer)
  (\(Output a
_, Input a
_, STM ()
seal) -> STM () -> IO ()
forall a. STM a -> IO a
atomically STM ()
seal)
  (\(Output a
output, Input a
input, STM ()
seal) ->
    IO l -> IO r -> IO (l, r)
forall a b. IO a -> IO b -> IO (a, b)
Control.Concurrent.Async.concurrently
      (Output a -> IO l
fOutput Output a
output IO l -> IO () -> IO l
forall a b. IO a -> IO b -> IO a
`Control.Exception.finally` STM () -> IO ()
forall a. STM a -> IO a
atomically STM ()
seal)
      (Input a -> IO r
fInput  Input a
input  IO r -> IO () -> IO r
forall a b. IO a -> IO b -> IO a
`Control.Exception.finally` STM () -> IO ()
forall a. STM a -> IO a
atomically STM ()
seal)
  )

-- | 'Buffer' specifies how to buffer messages stored within the mailbox
data Buffer a
    = Unbounded
    | Bounded Int
    | Single
    | Latest a
    | Newest Int
    | New

{-# DEPRECATED Unbounded "Use `unbounded` instead" #-}
{-# DEPRECATED Bounded "Use `bounded` instead" #-}
{-# DEPRECATED Single "Use @`bounded` 1@ instead" #-}
{-# DEPRECATED Latest "Use `latest` instead" #-}
{-# DEPRECATED Newest "Use `newest` instead" #-}
{-# DEPRECATED New "Use @`newest` 1@ instead" #-}

-- | Store an unbounded number of messages in a FIFO queue
unbounded :: Buffer a
unbounded :: Buffer a
unbounded = Buffer a
forall a. Buffer a
Unbounded

-- | Store a bounded number of messages, specified by the 'Int' argument
bounded :: Int -> Buffer a
bounded :: Int -> Buffer a
bounded Int
1 = Buffer a
forall a. Buffer a
Single
bounded Int
n = Int -> Buffer a
forall a. Int -> Buffer a
Bounded Int
n

{-| Only store the 'Latest' message, beginning with an initial value

    'Latest' is never empty nor full.
-}
latest :: a -> Buffer a
latest :: a -> Buffer a
latest = a -> Buffer a
forall a. a -> Buffer a
Latest

{-| Like @Bounded@, but 'send' never fails (the buffer is never full).
    Instead, old elements are discarded to make room for new elements
-}
newest :: Int -> Buffer a
newest :: Int -> Buffer a
newest Int
1 = Buffer a
forall a. Buffer a
New
newest Int
n = Int -> Buffer a
forall a. Int -> Buffer a
Newest Int
n

{- $reexport
    @Control.Concurrent@ re-exports 'forkIO', although I recommend using the
    @async@ library instead.

    @Control.Concurrent.STM@ re-exports 'atomically' and 'STM'.

    @System.Mem@ re-exports 'performGC'.
-}