{-# LANGUAGE RankNTypes #-}
module Pipes.Misc.Concurrent where
import Control.Applicative
import Control.Concurrent
import Control.Concurrent.STM
import Control.Monad
import Control.Monad.Morph
import Control.Monad.Trans.Except
import Control.Monad.Trans.Maybe
import qualified Data.List.NonEmpty as NE
import qualified Pipes as P
import qualified Pipes.Concurrent as PC
import qualified Pipes.Prelude as PP
fromInputSTM :: PC.Input a -> P.Producer' a STM ()
fromInputSTM as = void $ runMaybeT $ forever $ do
a <- MaybeT $ lift $ PC.recv as
lift $ P.yield a
toOutputSTM :: PC.Output a -> P.Consumer' a STM ()
toOutputSTM output = void $ runMaybeT $ forever $ do
a <- lift P.await
p <- lift $ lift $ PC.send output a
guard p
toOutputMaybeT :: PC.Output a -> a -> MaybeT STM ()
toOutputMaybeT output = (MaybeT . fmap guard) <$> PC.send output
mkProducerSTM :: PC.Buffer a -> P.Producer a IO () -> IO (P.Producer a STM ())
mkProducerSTM b xs = do
(output, input) <- PC.spawn b
void . forkIO . void . forever . P.runEffect $ xs P.>-> PC.toOutput output
pure (fromInputSTM input)
mkProducerSTM' :: PC.Buffer a -> P.Producer a IO () -> IO (STM (), P.Producer a STM ())
mkProducerSTM' b xs = do
(output, input, seal) <- PC.spawn' b
void . forkIO . void . forever . P.runEffect $ xs P.>-> PC.toOutput output
pure (seal, fromInputSTM input)
batch :: PC.Input a -> PC.Input (NE.NonEmpty a)
batch (PC.Input xs) = PC.Input $ do
x <- xs
case x of
Nothing -> pure Nothing
Just x' -> do
xs' <- runExceptT . tryNext $ x' NE.:| []
case xs' of
Left ys -> pure (Just ys)
Right ys -> pure (Just ys)
where
tryNext ys = do
ys' <- ExceptT $ (tryCons ys <$> xs) <|> pure (Left ys)
tryNext ys'
tryCons ys x = case x of
Nothing -> Left ys
Just x' -> Right $ x' NE.<| ys
execInput
:: (MonadTrans t, Monad (t STM))
=> PC.Input a -> (a -> (t STM) b) -> P.Producer' b (t STM) ()
execInput input f = hoist lift (fromInputSTM input) P.>-> PP.mapM f