{-# LANGUAGE CPP, RankNTypes, Safe #-}
module Pipes.Concurrent (
Input(..),
Output(..),
fromInput,
toOutput,
Mailbox(),
fromMailbox,
toMailbox,
send',
recv',
spawn,
spawn',
withSpawn,
withBuffer,
Buffer(..),
unbounded,
bounded,
latest,
newest,
module Control.Concurrent,
module Control.Concurrent.STM,
module System.Mem
) where
import Control.Applicative (
Alternative(empty, (<|>)), Applicative(pure, (*>), (<*>)), (<*), (<$>) )
import Control.Concurrent (forkIO)
import Control.Concurrent.STM (atomically, STM, mkWeakTVar, newTVarIO, readTVar)
import Control.Exception (bracket)
import Control.Monad (when,void, MonadPlus(..))
import Data.Functor.Contravariant (Contravariant(contramap))
import Data.Functor.Contravariant.Divisible (
Divisible(divide, conquer), Decidable(lose, choose))
import Data.Monoid (Monoid(mempty, mappend))
import Data.Semigroup
import Data.Void (absurd)
import Pipes (MonadIO(liftIO), yield, await, Producer', Consumer')
import Prelude hiding (read)
import System.Mem (performGC)
import qualified Control.Concurrent.Async
import qualified Control.Concurrent.STM as S
import qualified Control.Exception
newtype Input a = Input {
Input a -> STM (Maybe a)
recv :: S.STM (Maybe a) }
instance Functor Input where
fmap :: (a -> b) -> Input a -> Input b
fmap a -> b
f Input a
m = STM (Maybe b) -> Input b
forall a. STM (Maybe a) -> Input a
Input ((Maybe a -> Maybe b) -> STM (Maybe a) -> STM (Maybe b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((a -> b) -> Maybe a -> Maybe b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap a -> b
f) (Input a -> STM (Maybe a)
forall a. Input a -> STM (Maybe a)
recv Input a
m))
instance Applicative Input where
pure :: a -> Input a
pure a
r = STM (Maybe a) -> Input a
forall a. STM (Maybe a) -> Input a
Input (Maybe a -> STM (Maybe a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (a -> Maybe a
forall (f :: * -> *) a. Applicative f => a -> f a
pure a
r))
Input (a -> b)
mf <*> :: Input (a -> b) -> Input a -> Input b
<*> Input a
mx = STM (Maybe b) -> Input b
forall a. STM (Maybe a) -> Input a
Input (Maybe (a -> b) -> Maybe a -> Maybe b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
(<*>) (Maybe (a -> b) -> Maybe a -> Maybe b)
-> STM (Maybe (a -> b)) -> STM (Maybe a -> Maybe b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Input (a -> b) -> STM (Maybe (a -> b))
forall a. Input a -> STM (Maybe a)
recv Input (a -> b)
mf STM (Maybe a -> Maybe b) -> STM (Maybe a) -> STM (Maybe b)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Input a -> STM (Maybe a)
forall a. Input a -> STM (Maybe a)
recv Input a
mx)
instance Monad Input where
return :: a -> Input a
return a
r = STM (Maybe a) -> Input a
forall a. STM (Maybe a) -> Input a
Input (Maybe a -> STM (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> Maybe a
forall (m :: * -> *) a. Monad m => a -> m a
return a
r))
Input a
m >>= :: Input a -> (a -> Input b) -> Input b
>>= a -> Input b
f = STM (Maybe b) -> Input b
forall a. STM (Maybe a) -> Input a
Input (STM (Maybe b) -> Input b) -> STM (Maybe b) -> Input b
forall a b. (a -> b) -> a -> b
$ do
Maybe a
ma <- Input a -> STM (Maybe a)
forall a. Input a -> STM (Maybe a)
recv Input a
m
case Maybe a
ma of
Maybe a
Nothing -> Maybe b -> STM (Maybe b)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe b
forall a. Maybe a
Nothing
Just a
a -> Input b -> STM (Maybe b)
forall a. Input a -> STM (Maybe a)
recv (a -> Input b
f a
a)
instance Alternative Input where
empty :: Input a
empty = STM (Maybe a) -> Input a
forall a. STM (Maybe a) -> Input a
Input (Maybe a -> STM (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing)
Input a
x <|> :: Input a -> Input a -> Input a
<|> Input a
y = STM (Maybe a) -> Input a
forall a. STM (Maybe a) -> Input a
Input (STM (Maybe a) -> Input a) -> STM (Maybe a) -> Input a
forall a b. (a -> b) -> a -> b
$ do
(Input a
i, Maybe a
ma) <- (Maybe a -> (Input a, Maybe a))
-> STM (Maybe a) -> STM (Input a, Maybe a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((,) Input a
y) (Input a -> STM (Maybe a)
forall a. Input a -> STM (Maybe a)
recv Input a
x) STM (Input a, Maybe a)
-> STM (Input a, Maybe a) -> STM (Input a, Maybe a)
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> (Maybe a -> (Input a, Maybe a))
-> STM (Maybe a) -> STM (Input a, Maybe a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((,) Input a
x)(Input a -> STM (Maybe a)
forall a. Input a -> STM (Maybe a)
recv Input a
y)
case Maybe a
ma of
Maybe a
Nothing -> Input a -> STM (Maybe a)
forall a. Input a -> STM (Maybe a)
recv Input a
i
Just a
a -> Maybe a -> STM (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> Maybe a
forall a. a -> Maybe a
Just a
a)
instance MonadPlus Input where
mzero :: Input a
mzero = Input a
forall (f :: * -> *) a. Alternative f => f a
empty
mplus :: Input a -> Input a -> Input a
mplus = Input a -> Input a -> Input a
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
(<|>)
instance Data.Semigroup.Semigroup (Input a) where
<> :: Input a -> Input a -> Input a
(<>) = Input a -> Input a -> Input a
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
(<|>)
instance Monoid (Input a) where
mempty :: Input a
mempty = Input a
forall (f :: * -> *) a. Alternative f => f a
empty
#if !(MIN_VERSION_base(4,11,0))
mappend = (<>)
#endif
newtype Output a = Output {
Output a -> a -> STM Bool
send :: a -> S.STM Bool }
instance Data.Semigroup.Semigroup (Output a) where
Output a
i1 <> :: Output a -> Output a -> Output a
<> Output a
i2 = (a -> STM Bool) -> Output a
forall a. (a -> STM Bool) -> Output a
Output (\a
a -> Bool -> Bool -> Bool
(||) (Bool -> Bool -> Bool) -> STM Bool -> STM (Bool -> Bool)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Output a -> a -> STM Bool
forall a. Output a -> a -> STM Bool
send Output a
i1 a
a STM (Bool -> Bool) -> STM Bool -> STM Bool
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Output a -> a -> STM Bool
forall a. Output a -> a -> STM Bool
send Output a
i2 a
a)
instance Monoid (Output a) where
mempty :: Output a
mempty = (a -> STM Bool) -> Output a
forall a. (a -> STM Bool) -> Output a
Output (\a
_ -> Bool -> STM Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False)
#if !(MIN_VERSION_base(4,11,0))
mappend = (<>)
#endif
instance Contravariant Output where
contramap :: (a -> b) -> Output b -> Output a
contramap a -> b
f (Output b -> STM Bool
a) = (a -> STM Bool) -> Output a
forall a. (a -> STM Bool) -> Output a
Output (b -> STM Bool
a (b -> STM Bool) -> (a -> b) -> a -> STM Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> b
f)
instance Divisible Output where
conquer :: Output a
conquer = (a -> STM Bool) -> Output a
forall a. (a -> STM Bool) -> Output a
Output (\a
_ -> Bool -> STM Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False)
divide :: (a -> (b, c)) -> Output b -> Output c -> Output a
divide a -> (b, c)
f Output b
i1 Output c
i2 = (a -> STM Bool) -> Output a
forall a. (a -> STM Bool) -> Output a
Output ((a -> STM Bool) -> Output a) -> (a -> STM Bool) -> Output a
forall a b. (a -> b) -> a -> b
$ \a
a -> case a -> (b, c)
f a
a of
(b
b, c
c) -> Bool -> Bool -> Bool
(||) (Bool -> Bool -> Bool) -> STM Bool -> STM (Bool -> Bool)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Output b -> b -> STM Bool
forall a. Output a -> a -> STM Bool
send Output b
i1 b
b STM (Bool -> Bool) -> STM Bool -> STM Bool
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Output c -> c -> STM Bool
forall a. Output a -> a -> STM Bool
send Output c
i2 c
c
instance Decidable Output where
lose :: (a -> Void) -> Output a
lose a -> Void
f = (a -> STM Bool) -> Output a
forall a. (a -> STM Bool) -> Output a
Output (Void -> STM Bool
forall a. Void -> a
absurd (Void -> STM Bool) -> (a -> Void) -> a -> STM Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Void
f)
choose :: (a -> Either b c) -> Output b -> Output c -> Output a
choose a -> Either b c
f Output b
i1 Output c
i2 = (a -> STM Bool) -> Output a
forall a. (a -> STM Bool) -> Output a
Output ((a -> STM Bool) -> Output a) -> (a -> STM Bool) -> Output a
forall a b. (a -> b) -> a -> b
$ \a
a -> case a -> Either b c
f a
a of
Left b
b -> Output b -> b -> STM Bool
forall a. Output a -> a -> STM Bool
send Output b
i1 b
b
Right c
c -> Output c -> c -> STM Bool
forall a. Output a -> a -> STM Bool
send Output c
i2 c
c
type Mailbox a = (Output a, Input a)
fromMailbox :: (MonadIO m) => Mailbox a -> Producer' a m ()
fromMailbox :: Mailbox a -> Producer' a m ()
fromMailbox = Input a -> Proxy x' x () a m ()
forall (m :: * -> *) a. MonadIO m => Input a -> Producer' a m ()
fromInput (Input a -> Proxy x' x () a m ())
-> (Mailbox a -> Input a) -> Mailbox a -> Proxy x' x () a m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Mailbox a -> Input a
forall a b. (a, b) -> b
snd
toMailbox :: (MonadIO m) => Mailbox a -> Consumer' a m ()
toMailbox :: Mailbox a -> Consumer' a m ()
toMailbox = Output a -> Proxy () a y' y m ()
forall (m :: * -> *) a. MonadIO m => Output a -> Consumer' a m ()
toOutput (Output a -> Proxy () a y' y m ())
-> (Mailbox a -> Output a) -> Mailbox a -> Proxy () a y' y m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Mailbox a -> Output a
forall a b. (a, b) -> a
fst
send' :: Mailbox a -> a -> STM Bool
send' :: Mailbox a -> a -> STM Bool
send' = Output a -> a -> STM Bool
forall a. Output a -> a -> STM Bool
send (Output a -> a -> STM Bool)
-> (Mailbox a -> Output a) -> Mailbox a -> a -> STM Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Mailbox a -> Output a
forall a b. (a, b) -> a
fst
recv' :: Mailbox a -> STM (Maybe a)
recv' :: Mailbox a -> STM (Maybe a)
recv' = Input a -> STM (Maybe a)
forall a. Input a -> STM (Maybe a)
recv (Input a -> STM (Maybe a))
-> (Mailbox a -> Input a) -> Mailbox a -> STM (Maybe a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Mailbox a -> Input a
forall a b. (a, b) -> b
snd
toOutput :: (MonadIO m) => Output a -> Consumer' a m ()
toOutput :: Output a -> Consumer' a m ()
toOutput Output a
output = Proxy () a y' y m ()
Consumer' a m ()
loop
where
loop :: Proxy () a y' y m ()
loop = do
a
a <- Proxy () a y' y m a
forall (m :: * -> *) a. Functor m => Consumer' a m a
await
Bool
alive <- IO Bool -> Proxy () a y' y m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> Proxy () a y' y m Bool)
-> IO Bool -> Proxy () a y' y m Bool
forall a b. (a -> b) -> a -> b
$ STM Bool -> IO Bool
forall a. STM a -> IO a
S.atomically (STM Bool -> IO Bool) -> STM Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ Output a -> a -> STM Bool
forall a. Output a -> a -> STM Bool
send Output a
output a
a
Bool -> Proxy () a y' y m () -> Proxy () a y' y m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
alive Proxy () a y' y m ()
loop
{-# INLINABLE toOutput #-}
fromInput :: (MonadIO m) => Input a -> Producer' a m ()
fromInput :: Input a -> Producer' a m ()
fromInput Input a
input = Proxy x' x () a m ()
Producer' a m ()
loop
where
loop :: Proxy x' x () a m ()
loop = do
Maybe a
ma <- IO (Maybe a) -> Proxy x' x () a m (Maybe a)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe a) -> Proxy x' x () a m (Maybe a))
-> IO (Maybe a) -> Proxy x' x () a m (Maybe a)
forall a b. (a -> b) -> a -> b
$ STM (Maybe a) -> IO (Maybe a)
forall a. STM a -> IO a
S.atomically (STM (Maybe a) -> IO (Maybe a)) -> STM (Maybe a) -> IO (Maybe a)
forall a b. (a -> b) -> a -> b
$ Input a -> STM (Maybe a)
forall a. Input a -> STM (Maybe a)
recv Input a
input
case Maybe a
ma of
Maybe a
Nothing -> () -> Proxy x' x () a m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Just a
a -> do
a -> Proxy x' x () a m ()
forall (m :: * -> *) a x' x. Functor m => a -> Proxy x' x () a m ()
yield a
a
Proxy x' x () a m ()
loop
{-# INLINABLE fromInput #-}
spawn :: Buffer a -> IO (Output a, Input a)
spawn :: Buffer a -> IO (Output a, Input a)
spawn Buffer a
buffer = ((Output a, Input a, STM ()) -> (Output a, Input a))
-> IO (Output a, Input a, STM ()) -> IO (Output a, Input a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Output a, Input a, STM ()) -> (Output a, Input a)
forall a b c. (a, b, c) -> (a, b)
simplify (Buffer a -> IO (Output a, Input a, STM ())
forall a. Buffer a -> IO (Output a, Input a, STM ())
spawn' Buffer a
buffer)
where
simplify :: (a, b, c) -> (a, b)
simplify (a
output, b
input, c
_) = (a
output, b
input)
{-# INLINABLE spawn #-}
spawn' :: Buffer a -> IO (Output a, Input a, STM ())
spawn' :: Buffer a -> IO (Output a, Input a, STM ())
spawn' Buffer a
buffer = do
(a -> STM ()
write, STM a
read) <- case Buffer a
buffer of
Bounded Int
n -> do
TBQueue a
q <- Natural -> IO (TBQueue a)
forall a. Natural -> IO (TBQueue a)
S.newTBQueueIO (Int -> Natural
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
n)
(a -> STM (), STM a) -> IO (a -> STM (), STM a)
forall (m :: * -> *) a. Monad m => a -> m a
return (TBQueue a -> a -> STM ()
forall a. TBQueue a -> a -> STM ()
S.writeTBQueue TBQueue a
q, TBQueue a -> STM a
forall a. TBQueue a -> STM a
S.readTBQueue TBQueue a
q)
Buffer a
Unbounded -> do
TQueue a
q <- IO (TQueue a)
forall a. IO (TQueue a)
S.newTQueueIO
(a -> STM (), STM a) -> IO (a -> STM (), STM a)
forall (m :: * -> *) a. Monad m => a -> m a
return (TQueue a -> a -> STM ()
forall a. TQueue a -> a -> STM ()
S.writeTQueue TQueue a
q, TQueue a -> STM a
forall a. TQueue a -> STM a
S.readTQueue TQueue a
q)
Buffer a
Single -> do
TMVar a
m <- IO (TMVar a)
forall a. IO (TMVar a)
S.newEmptyTMVarIO
(a -> STM (), STM a) -> IO (a -> STM (), STM a)
forall (m :: * -> *) a. Monad m => a -> m a
return (TMVar a -> a -> STM ()
forall a. TMVar a -> a -> STM ()
S.putTMVar TMVar a
m, TMVar a -> STM a
forall a. TMVar a -> STM a
S.takeTMVar TMVar a
m)
Latest a
a -> do
TVar a
t <- a -> IO (TVar a)
forall a. a -> IO (TVar a)
S.newTVarIO a
a
(a -> STM (), STM a) -> IO (a -> STM (), STM a)
forall (m :: * -> *) a. Monad m => a -> m a
return (TVar a -> a -> STM ()
forall a. TVar a -> a -> STM ()
S.writeTVar TVar a
t, TVar a -> STM a
forall a. TVar a -> STM a
S.readTVar TVar a
t)
Buffer a
New -> do
TMVar a
m <- IO (TMVar a)
forall a. IO (TMVar a)
S.newEmptyTMVarIO
(a -> STM (), STM a) -> IO (a -> STM (), STM a)
forall (m :: * -> *) a. Monad m => a -> m a
return (\a
x -> TMVar a -> STM (Maybe a)
forall a. TMVar a -> STM (Maybe a)
S.tryTakeTMVar TMVar a
m STM (Maybe a) -> STM () -> STM ()
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> TMVar a -> a -> STM ()
forall a. TMVar a -> a -> STM ()
S.putTMVar TMVar a
m a
x, TMVar a -> STM a
forall a. TMVar a -> STM a
S.takeTMVar TMVar a
m)
Newest Int
n -> do
TBQueue a
q <- Natural -> IO (TBQueue a)
forall a. Natural -> IO (TBQueue a)
S.newTBQueueIO (Int -> Natural
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
n)
let write :: a -> STM ()
write a
x = TBQueue a -> a -> STM ()
forall a. TBQueue a -> a -> STM ()
S.writeTBQueue TBQueue a
q a
x STM () -> STM () -> STM ()
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> (TBQueue a -> STM (Maybe a)
forall a. TBQueue a -> STM (Maybe a)
S.tryReadTBQueue TBQueue a
q STM (Maybe a) -> STM () -> STM ()
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> a -> STM ()
write a
x)
(a -> STM (), STM a) -> IO (a -> STM (), STM a)
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> STM ()
write, TBQueue a -> STM a
forall a. TBQueue a -> STM a
S.readTBQueue TBQueue a
q)
TVar Bool
sealed <- Bool -> IO (TVar Bool)
forall a. a -> IO (TVar a)
S.newTVarIO Bool
False
let seal :: STM ()
seal = TVar Bool -> Bool -> STM ()
forall a. TVar a -> a -> STM ()
S.writeTVar TVar Bool
sealed Bool
True
TVar ()
rSend <- () -> IO (TVar ())
forall a. a -> IO (TVar a)
newTVarIO ()
IO (Weak (TVar ())) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Weak (TVar ())) -> IO ()) -> IO (Weak (TVar ())) -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar () -> IO () -> IO (Weak (TVar ()))
forall a. TVar a -> IO () -> IO (Weak (TVar a))
mkWeakTVar TVar ()
rSend (STM () -> IO ()
forall a. STM a -> IO a
S.atomically STM ()
seal)
TVar ()
rRecv <- () -> IO (TVar ())
forall a. a -> IO (TVar a)
newTVarIO ()
IO (Weak (TVar ())) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Weak (TVar ())) -> IO ()) -> IO (Weak (TVar ())) -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar () -> IO () -> IO (Weak (TVar ()))
forall a. TVar a -> IO () -> IO (Weak (TVar a))
mkWeakTVar TVar ()
rRecv (STM () -> IO ()
forall a. STM a -> IO a
S.atomically STM ()
seal)
let sendOrEnd :: a -> STM Bool
sendOrEnd a
a = do
Bool
b <- TVar Bool -> STM Bool
forall a. TVar a -> STM a
S.readTVar TVar Bool
sealed
if Bool
b
then Bool -> STM Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
else do
a -> STM ()
write a
a
Bool -> STM Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
readOrEnd :: STM (Maybe a)
readOrEnd = (a -> Maybe a
forall a. a -> Maybe a
Just (a -> Maybe a) -> STM a -> STM (Maybe a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> STM a
read) STM (Maybe a) -> STM (Maybe a) -> STM (Maybe a)
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> (do
Bool
b <- TVar Bool -> STM Bool
forall a. TVar a -> STM a
S.readTVar TVar Bool
sealed
Bool -> STM ()
S.check Bool
b
Maybe a -> STM (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing )
_send :: a -> STM Bool
_send a
a = a -> STM Bool
sendOrEnd a
a STM Bool -> STM () -> STM Bool
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f a
<* TVar () -> STM ()
forall a. TVar a -> STM a
readTVar TVar ()
rSend
_recv :: STM (Maybe a)
_recv = STM (Maybe a)
readOrEnd STM (Maybe a) -> STM () -> STM (Maybe a)
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f a
<* TVar () -> STM ()
forall a. TVar a -> STM a
readTVar TVar ()
rRecv
(Output a, Input a, STM ()) -> IO (Output a, Input a, STM ())
forall (m :: * -> *) a. Monad m => a -> m a
return ((a -> STM Bool) -> Output a
forall a. (a -> STM Bool) -> Output a
Output a -> STM Bool
_send, STM (Maybe a) -> Input a
forall a. STM (Maybe a) -> Input a
Input STM (Maybe a)
_recv, STM ()
seal)
{-# INLINABLE spawn' #-}
withSpawn :: Buffer a -> ((Output a, Input a) -> IO r) -> IO r
withSpawn :: Buffer a -> ((Output a, Input a) -> IO r) -> IO r
withSpawn Buffer a
buffer (Output a, Input a) -> IO r
action = IO (Output a, Input a, STM ())
-> ((Output a, Input a, STM ()) -> IO ())
-> ((Output a, Input a, STM ()) -> IO r)
-> IO r
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket
(Buffer a -> IO (Output a, Input a, STM ())
forall a. Buffer a -> IO (Output a, Input a, STM ())
spawn' Buffer a
buffer)
(\(Output a
_, Input a
_, STM ()
seal) -> STM () -> IO ()
forall a. STM a -> IO a
atomically STM ()
seal)
(\(Output a
output, Input a
input, STM ()
_) -> (Output a, Input a) -> IO r
action (Output a
output, Input a
input))
withBuffer
:: Buffer a
-> (Output a -> IO l)
-> (Input a -> IO r)
-> IO (l, r)
withBuffer :: Buffer a -> (Output a -> IO l) -> (Input a -> IO r) -> IO (l, r)
withBuffer Buffer a
buffer Output a -> IO l
fOutput Input a -> IO r
fInput = IO (Output a, Input a, STM ())
-> ((Output a, Input a, STM ()) -> IO ())
-> ((Output a, Input a, STM ()) -> IO (l, r))
-> IO (l, r)
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket
(Buffer a -> IO (Output a, Input a, STM ())
forall a. Buffer a -> IO (Output a, Input a, STM ())
spawn' Buffer a
buffer)
(\(Output a
_, Input a
_, STM ()
seal) -> STM () -> IO ()
forall a. STM a -> IO a
atomically STM ()
seal)
(\(Output a
output, Input a
input, STM ()
seal) ->
IO l -> IO r -> IO (l, r)
forall a b. IO a -> IO b -> IO (a, b)
Control.Concurrent.Async.concurrently
(Output a -> IO l
fOutput Output a
output IO l -> IO () -> IO l
forall a b. IO a -> IO b -> IO a
`Control.Exception.finally` STM () -> IO ()
forall a. STM a -> IO a
atomically STM ()
seal)
(Input a -> IO r
fInput Input a
input IO r -> IO () -> IO r
forall a b. IO a -> IO b -> IO a
`Control.Exception.finally` STM () -> IO ()
forall a. STM a -> IO a
atomically STM ()
seal)
)
data Buffer a
= Unbounded
| Bounded Int
| Single
| Latest a
| Newest Int
| New
{-# DEPRECATED Unbounded "Use `unbounded` instead" #-}
{-# DEPRECATED Bounded "Use `bounded` instead" #-}
{-# DEPRECATED Single "Use @`bounded` 1@ instead" #-}
{-# DEPRECATED Latest "Use `latest` instead" #-}
{-# DEPRECATED Newest "Use `newest` instead" #-}
{-# DEPRECATED New "Use @`newest` 1@ instead" #-}
unbounded :: Buffer a
unbounded :: Buffer a
unbounded = Buffer a
forall a. Buffer a
Unbounded
bounded :: Int -> Buffer a
bounded :: Int -> Buffer a
bounded Int
1 = Buffer a
forall a. Buffer a
Single
bounded Int
n = Int -> Buffer a
forall a. Int -> Buffer a
Bounded Int
n
latest :: a -> Buffer a
latest :: a -> Buffer a
latest = a -> Buffer a
forall a. a -> Buffer a
Latest
newest :: Int -> Buffer a
newest :: Int -> Buffer a
newest Int
1 = Buffer a
forall a. Buffer a
New
newest Int
n = Int -> Buffer a
forall a. Int -> Buffer a
Newest Int
n