module Pipes.Misc where
import Control.Arrow
import Control.Concurrent.STM
import Control.Lens
import Control.Monad
import Control.Monad.State.Strict
import Control.Monad.Trans.Maybe
import qualified Pipes as P
import qualified Pipes.Concurrent as PC
import qualified Pipes.Shaft as PS
import qualified Pipes.Prelude as PP
import qualified Data.List.NonEmpty as NE
import Control.Monad.Except
import Control.Applicative
fromInputSTM :: PC.Input a -> P.Producer' a STM ()
fromInputSTM as = void $ runMaybeT $ forever $ do
a <- MaybeT $ lift $ PC.recv as
lift $ P.yield a
toOutputSTM :: PC.Output a -> P.Consumer' a STM ()
toOutputSTM output = void $ runMaybeT $ forever $ do
a <- lift P.await
p <- lift $ lift $ PC.send output a
guard p
batch :: PC.Input a -> PC.Input (NE.NonEmpty a)
batch (PC.Input xs) = PC.Input $ do
x <- xs
case x of
Nothing -> pure Nothing
Just x' -> do
xs' <- runExceptT . tryNext $ x' NE.:| []
case xs' of
Left ys -> pure (Just ys)
Right ys -> pure (Just ys)
where
tryNext ys = do
ys' <- ExceptT $ (tryCons ys <$> xs) <|> pure (Left ys)
tryNext ys'
tryCons ys x = case x of
Nothing -> Left ys
Just x' -> Right $ x' NE.<| ys
buffer :: Monad m => Int -> [a] -> P.Pipe a [a] m r
buffer n as = do
a <- P.await
let as' = take n $ a : as
case forceSpine as' of
() -> do
P.yield as'
buffer n as'
where
forceSpine = foldr (const id) ()
store :: MonadState s m => Getter a b -> Setter' s b -> P.Pipe a a m r
store v s = forever $ do
a <- P.await
s .= view v a
P.yield a
retrieve :: MonadState s m => Getter s b -> P.Pipe a (b, a) m r
retrieve v = forever $ do
a <- P.await
s <- get
P.yield (view v s, a)
locally ::
Monad m =>
(s -> a)
-> (b -> s -> t)
-> P.Pipe a b m r
-> P.Pipe s t m r
locally viewf modifyf p =
PP.map (\s -> (s, s))
P.>-> PS.runShaft (first $ PS.Shaft $ PP.map viewf P.>-> p)
P.>-> PP.map (uncurry modifyf)