{-| "Pipes.Group.Tutorial" is the correct introduction to the use of this module, which is mostly just an optimized @Pipes.Group@, replacing @FreeT@ with @Stream@. (See the introductory documentation for this package. The @pipes-group@ tutorial is framed as a hunt for a genuinely streaming @threeGroups@. The formulation it opts for in the end would be expressed here thus: > import Pipes > import Streaming.Pipes > import qualified Pipes.Prelude as P > > threeGroups :: (Monad m, Eq a) => Producer a m () -> Producer a m () > threeGroups = concats . takes 3 . groups The only difference is that this simple module omits the detour via lenses. The program splits the initial producer into a connected stream of producers containing "equal" values; it takes three of those; and then erases the effects of splitting. So for example >>> runEffect $ threeGroups (each "aabccoooooo") >-> P.print 'a' 'a' 'b' 'c' 'c' For the rest, only part of the tutorial that would need revision is the bit at the end about writing explicit @FreeT@ programs. Its examples use pattern matching, but the constructors of the @Stream@ type are necessarily hidden, so one would have replaced by the various inspection combinators provided by the @streaming@ library. -} {-#LANGUAGE RankNTypes, BangPatterns #-} module Streaming.Pipes ( -- * @Streaming@ \/ @Pipes@ interoperation fromStream, toStream, toStreamingByteString, fromStreamingByteString, -- * Transforming a connected stream of 'Producer's takes, takes', maps, -- * Streaming division of a 'Producer' into two span, break, splitAt, group, groupBy, -- * Splitting a 'Producer' into a connected stream of 'Producer's groupsBy, groupsBy', groups, split, breaks, -- * Rejoining a connected stream of 'Producer's concats, intercalates, -- * Folding over the separate layers of a connected stream of 'Producer's folds, foldsM, ) where import Pipes import Streaming hiding (concats, groups) import qualified Streaming.Internal as SI import qualified Pipes.Internal as PI import qualified Pipes.Prelude as P import qualified Pipes as P import qualified Streaming.Prelude as S import Control.Monad (liftM) import Prelude hiding (span, splitAt, break) import qualified Data.ByteString as B import qualified Data.ByteString.Streaming as Q import qualified Data.ByteString.Streaming.Internal as Q -- | Construct an ordinary pipes 'Producer' from a 'Stream' of elements fromStream :: Monad m => Stream (Of a) m r -> Producer' a m r fromStream = loop where loop stream = case stream of -- this should be rewritten without constructors SI.Return r -> PI.Pure r SI.Effect m -> PI.M (liftM loop m) SI.Step (a:>rest) -> PI.Respond a (\_ -> loop rest) {-# INLINABLE fromStream #-} -- | Construct a 'Stream' of elements from a @pipes@ 'Producer' toStream :: Monad m => Producer a m r -> Stream (Of a) m r toStream = loop where loop stream = case stream of PI.Pure r -> SI.Return r PI.M m -> SI.Effect (liftM loop m) PI.Respond a f -> SI.Step (a :> loop (f ())) PI.Request x g -> PI.closed x {-# INLINABLE toStream #-} toStreamingByteString :: Monad m => Producer B.ByteString m r -> Q.ByteString m r toStreamingByteString = loop where loop stream = case stream of PI.Pure r -> Q.Empty r PI.M m -> Q.Go (liftM loop m) PI.Respond a f -> Q.Chunk a (loop (f ())) PI.Request x g -> PI.closed x {-# INLINABLE toStreamingByteString #-} fromStreamingByteString :: Monad m => Q.ByteString m r -> Producer' B.ByteString m r fromStreamingByteString = loop where loop stream = case stream of -- this should be rewritten without constructors Q.Empty r -> PI.Pure r Q.Go m -> PI.M (liftM loop m) Q.Chunk a rest -> PI.Respond a (\_ -> loop rest) {-# INLINABLE fromStreamingByteString #-} {-| 'span' splits a 'Producer' into two 'Producer's; the outer 'Producer' is the longest consecutive group of elements that satisfy the predicate. Its inverse is 'Control.Monad.join' -} span :: Monad m => (a -> Bool) -> Producer a m r -> Producer a m (Producer a m r) span predicate = loop where loop p = do e <- lift (next p) case e of Left r -> return (return r) Right (a, p') -> if predicate a then yield a >> loop p' else return (yield a >> p') {-# INLINABLE span #-} break :: Monad m => (a -> Bool) -> Producer a m r -> Producer a m (Producer a m r) break predicate = span (not . predicate) split :: (Eq a, Monad m) => a -> Producer a m r -> Stream (Producer a m) m r split t = loop where loop stream = do e <- lift $ next stream case e of Left r -> SI.Return r Right (a, p') -> if a /= t then SI.Step $ fmap loop (yield a >> span (/= t) p') else loop p' {-#INLINABLE split #-} breaks :: (Eq a, Monad m) => (a -> Bool) -> Producer a m r -> Stream (Producer a m) m r breaks predicate = loop where loop stream = do e <- lift $ next stream case e of Left r -> SI.Return r Right (a, p') -> if not (predicate a) then SI.Step $ fmap loop (yield a >> break (predicate) p') else loop p' {-#INLINABLE breaks #-} {-| 'splitAt' divides a 'Producer' into two 'Producer's after a fixed number of elements. Its inverse is 'Control.Monad.join' -} splitAt :: Monad m => Int -> Producer a m r -> Producer a m (Producer a m r) splitAt = loop where loop n p | n <= 0 = return p loop n p = do e <- lift (next p) case e of Left r -> return (return r) Right (a, p') -> yield a >> loop (n - 1) p' {-# INLINABLE splitAt #-} {-| 'groupBy' splits a 'Producer' into two 'Producer's; the second producer begins where we meet an element that is different according to the equality predicate. Its inverse is 'Control.Monad.join' -} groupBy :: Monad m => (a -> a -> Bool) -> Producer a m r -> Producer a m (Producer a m r) groupBy equals = loop where loop p = do x <- lift (next p) case x of Left r -> return (return r) Right (a, p') -> span (equals a) (yield a >> p') {-# INLINABLE groupBy #-} -- | Like 'groupBy', where the equality predicate is ('==') group :: (Monad m, Eq a) => Producer a m r -> Producer a m (Producer a m r) group = groupBy (==) {-# INLINABLE group #-} groupsBy :: Monad m => (a -> a -> Bool) -> Producer a m r -> Stream (Producer a m) m r groupsBy equals = loop where loop p = SI.Effect $ do e <- next p return $ case e of Left r -> SI.Return r Right (a, p') -> SI.Step (fmap loop (yield a >> span (equals a) p')) {-# INLINABLE groupsBy #-} {-| `groupsBy'` splits a 'Producer' into a 'Stream' of 'Producer's grouped using the given equality predicate This differs from `groupsBy` by comparing successive elements for equality instead of comparing each element to the first member of the group >>> import Pipes (yield, each) >>> import Pipes.Prelude (toList) >>> let cmp c1 c2 = succ c1 == c2 >>> (toList . intercalates (yield '|') . groupsBy' cmp) (each "12233345") "12|23|3|345" >>> (toList . intercalates (yield '|') . groupsBy cmp) (each "12233345") "122|3|3|34|5" -} groupsBy' :: Monad m => (a -> a -> Bool) -> Producer a m r -> Stream (Producer a m) m r groupsBy' equals = loop where loop p = SI.Effect $ do e <- next p return $ case e of Left r -> SI.Return r Right (a, p') -> SI.Step (fmap loop (loop0 (yield a >> p'))) loop0 p1 = do e <- lift (next p1) case e of Left r -> return (return r) Right (a2, p2) -> do yield a2 let loop1 a p = do e' <- lift (next p) case e' of Left r -> return (return r) Right (a', p') -> if equals a a' then do yield a' loop1 a' p' else return (yield a' >> p') loop1 a2 p2 {-# INLINABLE groupsBy' #-} groups:: (Monad m, Eq a) => Producer a m r -> Stream (Producer a m) m r groups = groupsBy (==) chunksOf :: Monad m => Int -> Producer a m r -> Stream (Producer a m) m r chunksOf n = loop where loop p = SI.Effect $ do e <- next p return $ case e of Left r -> SI.Return r Right (a, p') -> SI.Step (fmap loop (splitAt n (yield a >> p'))) {-# INLINABLE chunksOf #-} -- | Join a stream of 'Producer's into a single 'Producer' concats :: Monad m => Stream (Producer a m) m r -> Producer a m r concats = loop where loop stream = case stream of SI.Return r -> return r SI.Effect m -> PI.M $ liftM loop m SI.Step p -> do rest <- p loop rest {-# INLINABLE concats #-} -- {-| Join a 'FreeT'-delimited stream of 'Producer's into a single 'Producer' by -- intercalating a 'Producer' in between them -- -} -- intercalates -- :: Monad m => Producer a m () -> Stream (Producer a m) m r -> Producer a m r -- intercalates sep = loop where -- loop stream = case stream of -- -- x <- lift (runFreeT f) -- case x of -- Pure r -> return r -- Free p -> do -- f' <- p -- go1 f' -- go1 f = do -- x <- lift (runFreeT f) -- case x of -- Pure r -> return r -- Free p -> do -- sep -- f' <- p -- go1 f' -- {-# INLINABLE intercalates #-} {- $folds These folds are designed to be compatible with the @foldl@ library. See the 'Control.Foldl.purely' and 'Control.Foldl.impurely' functions from that library for more details. For example, to count the number of 'Producer' layers in a 'Stream', you can write: > import Control.Applicative (pure) > import qualified Control.Foldl as L > import Pipes.Group > import qualified Pipes.Prelude as P > > count :: Monad m => Stream (Producer a m) m () -> m Int > count = P.sum . L.purely folds (pure 1) -} {-| Fold each 'Producer' in a producer 'Stream' > purely folds > :: Monad m => Fold a b -> Stream (Producer a m) m r -> Producer b m r -} folds :: Monad m => (x -> a -> x) -- ^ Step function -> x -- ^ Initial accumulator -> (x -> b) -- ^ Extraction function -> Stream (Producer a m) m r -- ^ -> Producer b m r folds step begin done = loop where loop stream = case stream of SI.Return r -> return r SI.Effect m -> PI.M $ liftM loop m SI.Step p -> do (stream', b) <- lift (fold p begin) yield b loop stream' fold p x = do y <- next p case y of Left f -> return (f, done x) Right (a, p') -> fold p' $! step x a {-# INLINABLE folds #-} {-| Fold each 'Producer' in a 'Producer' stream, monadically > impurely foldsM > :: Monad m => FoldM a b -> Stream (Producer a m) m r -> Producer b m r -} foldsM :: Monad m => (x -> a -> m x) -- ^ Step function -> m x -- ^ Initial accumulator -> (x -> m b) -- ^ Extraction function -> Stream (Producer a m) m r -- ^ -> Producer b m r foldsM step begin done = loop where loop stream = case stream of SI.Return r -> return r SI.Effect m -> PI.M (liftM loop m) SI.Step p -> do (f', b) <- lift $ begin >>= foldM p yield b loop f' foldM p x = do y <- next p case y of Left f -> do b <- done x return (f, b) Right (a, p') -> do x' <- step x a foldM p' $! x' {-# INLINABLE foldsM #-} {-| @(takes' n)@ only keeps the first @n@ 'Producer's of a linked 'Stream' of 'Producers' Unlike 'takes', 'takes'' is not functor-general - it is aware that a 'Producer' can be /drained/, as functors cannot generally be. Here, then, we drain the unused 'Producer's in order to preserve the return value. This makes it a suitable argument for 'maps'. -} takes' :: Monad m => Int -> Stream (Producer a m) m r -> Stream (Producer a m) m r takes' = loop where loop !n stream | n <= 0 = drain_loop stream loop n stream = case stream of SI.Return r -> SI.Return r SI.Effect m -> SI.Effect (liftM (loop n) m) SI.Step p -> SI.Step (fmap (loop (n - 1)) p) drain_loop stream = case stream of SI.Return r -> SI.Return r SI.Effect m -> SI.Effect (liftM drain_loop m) SI.Step p -> SI.Effect $ do stream' <- runEffect (P.for p P.discard) return $ drain_loop stream' {-# INLINABLE takes' #-}