module Pipes.Misc.Concurrent where
import Control.Concurrent
import Control.Concurrent.STM
import Control.Monad
import Control.Monad.Trans.Class
import Control.Monad.Trans.Maybe
import qualified Pipes as P
import qualified Pipes.Concurrent as PC
fromInputSTM :: PC.Input a -> P.Producer' a STM ()
fromInputSTM as = void $ runMaybeT $ forever $ do
a <- MaybeT $ lift $ PC.recv as
lift $ P.yield a
toOutputSTM :: PC.Output a -> P.Consumer' a STM ()
toOutputSTM output = void $ runMaybeT $ forever $ do
a <- lift P.await
p <- lift $ lift $ PC.send output a
guard p
toOutputMaybeT :: PC.Output a -> a -> MaybeT STM ()
toOutputMaybeT output = (MaybeT . fmap guard) <$> PC.send output
mkProducerSTM :: PC.Buffer a -> P.Producer a IO () -> IO (P.Producer a STM ())
mkProducerSTM b xs = do
(output, input) <- PC.spawn b
void . forkIO . void . forever . P.runEffect $ xs P.>-> PC.toOutput output
pure (fromInputSTM input)
mkProducerSTM' :: PC.Buffer a -> P.Producer a IO () -> IO (STM (), P.Producer a STM ())
mkProducerSTM' b xs = do
(output, input, seal) <- PC.spawn' b
void . forkIO . void . forever . P.runEffect $ xs P.>-> PC.toOutput output
pure (seal, fromInputSTM input)