module Streaming.Pipes (
fromStream,
toStream,
toStreamingByteString,
fromStreamingByteString,
takes,
takes',
maps,
span,
break,
splitAt,
group,
groupBy,
groupsBy,
groupsBy',
groups,
split,
breaks,
concats,
intercalates,
folds,
foldsM,
) where
import Pipes
import Streaming hiding (concats, groups)
import qualified Streaming.Internal as SI
import qualified Pipes.Internal as PI
import qualified Pipes.Prelude as P
import qualified Pipes as P
import qualified Streaming.Prelude as S
import Control.Monad (liftM)
import Prelude hiding (span, splitAt, break)
import qualified Data.ByteString as B
import qualified Data.ByteString.Streaming as Q
import qualified Data.ByteString.Streaming.Internal as Q
fromStream :: Monad m => Stream (Of a) m r -> Producer' a m r
fromStream = loop where
loop stream = case stream of
SI.Return r -> PI.Pure r
SI.Effect m -> PI.M (liftM loop m)
SI.Step (a:>rest) -> PI.Respond a (\_ -> loop rest)
toStream :: Monad m => Producer a m r -> Stream (Of a) m r
toStream = loop where
loop stream = case stream of
PI.Pure r -> SI.Return r
PI.M m -> SI.Effect (liftM loop m)
PI.Respond a f -> SI.Step (a :> loop (f ()))
PI.Request x g -> PI.closed x
toStreamingByteString :: Monad m => Producer B.ByteString m r -> Q.ByteString m r
toStreamingByteString = loop where
loop stream = case stream of
PI.Pure r -> Q.Empty r
PI.M m -> Q.Go (liftM loop m)
PI.Respond a f -> Q.Chunk a (loop (f ()))
PI.Request x g -> PI.closed x
fromStreamingByteString :: Monad m => Q.ByteString m r -> Producer' B.ByteString m r
fromStreamingByteString = loop where
loop stream = case stream of
Q.Empty r -> PI.Pure r
Q.Go m -> PI.M (liftM loop m)
Q.Chunk a rest -> PI.Respond a (\_ -> loop rest)
span :: Monad m => (a -> Bool) -> Producer a m r -> Producer a m (Producer a m r)
span predicate = loop where
loop p = do
e <- lift (next p)
case e of
Left r -> return (return r)
Right (a, p') ->
if predicate a
then yield a >> loop p'
else return (yield a >> p')
break :: Monad m => (a -> Bool) -> Producer a m r -> Producer a m (Producer a m r)
break predicate = span (not . predicate)
split :: (Eq a, Monad m) =>
a -> Producer a m r -> Stream (Producer a m) m r
split t = loop where
loop stream = do
e <- lift $ next stream
case e of
Left r -> SI.Return r
Right (a, p') ->
if a /= t
then SI.Step $ fmap loop (yield a >> span (/= t) p')
else loop p'
breaks :: (Eq a, Monad m) =>
(a -> Bool) -> Producer a m r -> Stream (Producer a m) m r
breaks predicate = loop where
loop stream = do
e <- lift $ next stream
case e of
Left r -> SI.Return r
Right (a, p') ->
if not (predicate a)
then SI.Step $ fmap loop (yield a >> break (predicate) p')
else loop p'
splitAt
:: Monad m
=> Int -> Producer a m r -> Producer a m (Producer a m r)
splitAt = loop where
loop n p | n <= 0 = return p
loop n p = do
e <- lift (next p)
case e of
Left r -> return (return r)
Right (a, p') -> yield a >> loop (n 1) p'
groupBy
:: Monad m
=> (a -> a -> Bool) -> Producer a m r -> Producer a m (Producer a m r)
groupBy equals = loop where
loop p = do
x <- lift (next p)
case x of
Left r -> return (return r)
Right (a, p') -> span (equals a) (yield a >> p')
group
:: (Monad m, Eq a) => Producer a m r -> Producer a m (Producer a m r)
group = groupBy (==)
groupsBy
:: Monad m
=> (a -> a -> Bool)
-> Producer a m r -> Stream (Producer a m) m r
groupsBy equals = loop where
loop p = SI.Effect $ do
e <- next p
return $ case e of
Left r -> SI.Return r
Right (a, p') -> SI.Step (fmap loop (yield a >> span (equals a) p'))
groupsBy'
:: Monad m
=> (a -> a -> Bool) -> Producer a m r -> Stream (Producer a m) m r
groupsBy' equals = loop where
loop p = SI.Effect $ do
e <- next p
return $ case e of
Left r -> SI.Return r
Right (a, p') -> SI.Step (fmap loop (loop0 (yield a >> p')))
loop0 p1 = do
e <- lift (next p1)
case e of
Left r -> return (return r)
Right (a2, p2) -> do
yield a2
let loop1 a p = do
e' <- lift (next p)
case e' of
Left r -> return (return r)
Right (a', p') ->
if equals a a'
then do
yield a'
loop1 a' p'
else return (yield a' >> p')
loop1 a2 p2
groups:: (Monad m, Eq a)
=> Producer a m r -> Stream (Producer a m) m r
groups = groupsBy (==)
chunksOf
:: Monad m => Int -> Producer a m r -> Stream (Producer a m) m r
chunksOf n = loop where
loop p = SI.Effect $ do
e <- next p
return $ case e of
Left r -> SI.Return r
Right (a, p') -> SI.Step (fmap loop (splitAt n (yield a >> p')))
concats :: Monad m => Stream (Producer a m) m r -> Producer a m r
concats = loop where
loop stream = case stream of
SI.Return r -> return r
SI.Effect m -> PI.M $ liftM loop m
SI.Step p -> do
rest <- p
loop rest
folds
:: Monad m
=> (x -> a -> x)
-> x
-> (x -> b)
-> Stream (Producer a m) m r
-> Producer b m r
folds step begin done = loop where
loop stream = case stream of
SI.Return r -> return r
SI.Effect m -> PI.M $ liftM loop m
SI.Step p -> do
(stream', b) <- lift (fold p begin)
yield b
loop stream'
fold p x = do
y <- next p
case y of
Left f -> return (f, done x)
Right (a, p') -> fold p' $! step x a
foldsM
:: Monad m
=> (x -> a -> m x)
-> m x
-> (x -> m b)
-> Stream (Producer a m) m r
-> Producer b m r
foldsM step begin done = loop where
loop stream = case stream of
SI.Return r -> return r
SI.Effect m -> PI.M (liftM loop m)
SI.Step p -> do
(f', b) <- lift $ begin >>= foldM p
yield b
loop f'
foldM p x = do
y <- next p
case y of
Left f -> do
b <- done x
return (f, b)
Right (a, p') -> do
x' <- step x a
foldM p' $! x'
takes' :: Monad m => Int -> Stream (Producer a m) m r -> Stream (Producer a m) m r
takes' = loop where
loop !n stream | n <= 0 = drain_loop stream
loop n stream = case stream of
SI.Return r -> SI.Return r
SI.Effect m -> SI.Effect (liftM (loop n) m)
SI.Step p -> SI.Step (fmap (loop (n 1)) p)
drain_loop stream = case stream of
SI.Return r -> SI.Return r
SI.Effect m -> SI.Effect (liftM drain_loop m)
SI.Step p -> SI.Effect $ do
stream' <- runEffect (P.for p P.discard)
return $ drain_loop stream'