module Pipes.Misc.Util where
import Control.Applicative
import Control.Arrow
import Control.Monad.Except
import qualified Data.List.NonEmpty as NE
import qualified Pipes as P
import qualified Pipes.Concurrent as PC
import qualified Pipes.Prelude as PP
import qualified Pipes.Shaft as PS
import Pipes.Internal (Proxy(..))
batch :: PC.Input a -> PC.Input (NE.NonEmpty a)
batch (PC.Input xs) = PC.Input $ do
x <- xs
case x of
Nothing -> pure Nothing
Just x' -> do
xs' <- runExceptT . tryNext $ x' NE.:| []
case xs' of
Left ys -> pure (Just ys)
Right ys -> pure (Just ys)
where
tryNext ys = do
ys' <- ExceptT $ (tryCons ys <$> xs) <|> pure (Left ys)
tryNext ys'
tryCons ys x = case x of
Nothing -> Left ys
Just x' -> Right $ x' NE.<| ys
buffer :: Monad m => Int -> [a] -> P.Pipe a [a] m r
buffer n as = do
a <- P.await
let as' = take n $ a : as
case forceSpine as' of
() -> do
P.yield as'
buffer n as'
where
forceSpine = foldr (const id) ()
locally ::
Monad m =>
(s -> a)
-> (b -> s -> t)
-> P.Pipe a b m r
-> P.Pipe s t m r
locally viewf modifyf p =
PP.map (\s -> (s, s))
P.>-> PS.runShaft (first $ PS.Shaft $ PP.map viewf P.>-> p)
P.>-> PP.map (uncurry modifyf)
compare :: Monad m => (a -> a -> b) -> a -> P.Pipe a b m r
compare f i = do
a <- P.await
P.yield (f a i)
go a
where
go a = do
b <- P.await
P.yield (f b a)
go b
compare' :: Monad m => (a -> a -> b) -> P.Pipe a b m r
compare' f = do
i <- P.await
P.yield (f i i)
go i
where
go i = forever $ do
a <- P.await
P.yield (f a i)
always :: Monad m => a -> P.Producer a m r
always = forever . P.yield
lastOr :: Monad m => a -> P.Producer a m () -> P.Producer a m a
lastOr = go
where
go i p =
case p of
Request a' fa -> Request a' (go i . fa)
Respond b fb' -> Respond b (go b . fb')
M m -> M (m >>= pure . go i)
Pure () -> Pure i