'Qv      !"#$%&'()*+,-./0123456789:;<=>?@ABCDEFGHIJKLMNOPQRSTUVWXYZ[\]^_`abcdefghijklmnopqrstuNone*+79IN6Map a stream directly to its church encoding; compare Data.List.foldrQ It permits distinctions that should be hidden, as can be seen from e.g. @isPure stream = destroy_ (const True) (const False) (const True)?and similar nonsense. The crucial constraint is that the m x -> x argument is an Eilenberg-Moore algebra@. See Atkey "Reasoning about Stream Processing with Effects",The destroy exported by the safe modules is #destroy str = destroy (observe str) reorders the arguments of  to be more akin to foldrd It is more convenient to query in ghci to figure out what kind of 'algebra' you need to write.:t destroyWith join return(Monad m, Functor f) => < (f (m a) -> m a) -> Stream f m a -> m a -- iterT#:t destroyWith (join . lift) return2(Monad m, Monad (t m), Functor f, MonadTrans t) =>= (f (t m a) -> t m a) -> Stream f m a -> t m a -- iterTM:t destroyWith wrap return#(Monad m, Functor f, Functor f1) =>J (f (Stream f1 m r) -> Stream f1 m r) -> Stream f m r -> Stream f1 m r*:t destroyWith wrap return (step . lazily) Monad m => , Stream (Of a) m r -> Stream ((,) a) m r,:t destroyWith wrap return (step . strictly) Monad m => , Stream ((,) a) m r -> Stream (Of a) m r4:t destroyWith Data.ByteString.Streaming.wrap return(Monad m, Functor f) =>M (f (ByteString m r) -> ByteString m r) -> Stream f m r -> ByteString m rO:t destroyWith Data.ByteString.Streaming.wrap return (\(a:>b) -> consChunk a b) Monad m => A Stream (Of B.ByteString) m r -> ByteString m r -- fromChunks%Reflect a church-encoded stream; cp. GHC.Exts.buildCInspect the first stage of a freely layered sequence. Compare  Pipes.next and the replica Streaming.Prelude.next. This is the uncons for the general . Hunfold inspect = id Streaming.Prelude.unfoldr StreamingPrelude.next = idBuild a StreamG by unfolding steps starting from a seed. See also the specialized  in the prelude. unfold inspect = id -- modulo the quotient we work with unfold Pipes.next :: Monad m => Producer a m r -> Stream ((,) a) m r unfold (curry (:>) . Pipes.next) :: Monad m => Producer a m r -> Stream (Of a) m r:Map layers of one functor to another with a transformationSMap layers of one functor to another with a transformation involving the base monad7Run the effects in a stream that merely layers effects.@Map each layer to an effect in the base monad, and run them all.SLift for items in the base functor. Makes a singleton or one-layer succession.`=Interpolate a layer at each segment. This specializes to e.g. `intercalates :: (Monad m, Functor f) => Stream f m () -> Stream (Stream f m) m r -> Stream f m rSpecialized fold ;iterTM alg stream = destroy stream alg (join . lift) return Specialized fold 1iterT alg stream = destroy stream alg join return!*Dissolves the segmentation into layers of  Stream f m layers. 9concats stream = destroy stream join (join . lift) return?S.print $ concats $ maps (cons 1776) $ chunksOf 2 (each [1..5])17761217763417765"\Split a succession of layers after some number, returning a streaming or effectful pair.+rest <- S.print $ S.splitAt 1 $ each [1..3]1 S.print rest23#>Break a stream into substreams each with n functorial layers. 8S.print $ maps' sum' $ chunksOf 2 $ each [1,1,1,1,1,1,1]2221$cMake it possible to 'run' the underlying transformed monad. A simple minded example might be:  debugFibs = flip runStateT 1 $ distribute $ loop 1 where loop n = do S.yield n s <- lift get liftIO $ putStr "Current state is: " >> print s lift $ put (s + n :: Int) loop s*S.print $ S.take 4 $ S.drop 4 $ debugFibs Current state is: 1Current state is: 2Current state is: 3Current state is: 55Current state is: 88Current state is: 1313Current state is: 2121%:Repeat a functorial layer, command or instruction forever.'=Repeat a functorial layer, command or instruct several times.v4Construct an infinite stream by cycling a finite one cycles = forever*S.print $ S.take 3 $ forever $ S.each "hi"'h''i''h'-> S.sum $ S.take 13 $ forever $ S.each [1..3]25,This is akin to the observe of Pipes.Internal/ . It rewraps the layering in instances of  Stream f m r$ so that it replicates that of FreeT. 1aInterleave functor layers, with the effects of the first preceding the effects of the second. #interleaves = zipsWith (liftA2 (,))Mlet paste = \a b -> interleaves (Q.lines a) (maps (Q.cons' '\t') (Q.lines b))@Q.stdout $ Q.unlines $ paste "hello\nworld\n" "goodbye\nworld\n" hello goodbye world world* !"#$%&'v()*+,-./01wxyz{|}" !"#$%&'()*+,-./01"'%&-.! $#"/01,()*+' !"#$%&'v()*+,-./01wxyz{|}None+-./IN82HA left-strict pair; the base functor for streams of individual elements.8pBreak a sequence when a element falls under a predicate, keeping the rest of the stream as the return value./rest <- S.print $ S.break even $ each [1,1,2,3]11 S.print rest2390Apply an action to all values flowing downstream1S.product (chain print (S.each [2..4])) >>= print23424:PMake a stream of traversable containers into a stream of their separate elements$S.print $ S.concat (each ["xy","z"])'x''y''z'5S.print $ S.concat (S.each [Just 1, Nothing, Just 2])12>S.print $ S.concat (S.each [Right 1, Left "Error!", Right 2])12,Not to be confused with the functor-general  Zconcats :: (Monad m, Functor f) => Stream (Stream f m) m r -> Stream f m r -- specializingSS.stdoutLn $ concats $ maps (<* yield "--\n--") $ chunksOf 2 $ S.show (each [1..5]) 12----34----5----; The natural cons for a  Stream (Of a).  !cons a stream = yield a >> streamUseful for interoperation <1Cycle repeatedly through the layers of a stream, ad inf.& This function is functor-general cycle = foreverCrest <- S.print $ S.splitAt 3 $ S.cycle (yield True >> yield False)TrueFalseTrueS.print $ S.take 3 restFalseTrueFalse=BReduce a stream, performing its actions but ignoring its elements.hlet stream = do {yield 1; lift (putStrLn "Effect!"); yield 2; lift (putStrLn "Effect!"); return (2^100)}S.drain streamEffect!Effect!1267650600228229401496703205376!S.drain $ S.takeWhile (<2) streamEffect!>BIgnore the first n elements of a stream, but carry out the actions?2Ignore elements of a stream until a test succeeds.rIO.withFile "distribute.hs" IO.ReadMode $ S.stdoutLn . S.take 2 . S.dropWhile (isPrefixOf "import") . S.fromHandle main :: IO () main = do@,Stream the elements of a foldable container./S.print $ S.map (*100) $ each [1..3] >> yield 40100200300400=S.print $ S.map (*100) $ each [1..3] >> lift readLn >>= yield1002003004<Enter>400C/Skip elements of a stream that fail a predicateD2Skip elements of a stream that fail a monadic testEStrict fold of a  of elements MControl.Foldl.purely fold :: Monad m => Fold a b -> Stream (Of a) m () -> m bFStrict fold of a . of elements that preserves the return value. S.sum' $ each [1..10]55 :> ()1(n :> rest) <- sum' $ S.splitAt 3 (each [1..10])print n6'(m :> rest') <- sum' $ S.splitAt 3 restprint m15 S.print rest'789<The type provides for interoperation with the foldl library. TControl.Foldl.purely fold' :: Monad m => Fold a b -> Stream (Of a) m r -> m (Of b r)Thus, specializing a bit: L.purely fold' L.sum :: Stream (Of Int) Int r -> m (Of Int r) maps (L.purely fold' L.sum) :: Stream (Stream (Of Int)) IO r -> Stream (Of Int) IO rXS.print $ mapsM (L.purely S.fold' (liftA2 (,) L.list L.sum)) $ chunksOf 3 $ each [1..10] ([1,2,3],6) ([4,5,6],15) ([7,8,9],24) ([10],10)G9Strict, monadic fold of the elements of a 'Stream (Of a)' QControl.Foldl.impurely foldM :: Monad m => FoldM a b -> Stream (Of a) m () -> m bH9Strict, monadic fold of the elements of a 'Stream (Of a)' VControl.Foldl.impurely foldM' :: Monad m => FoldM a b -> Stream (Of a) m r -> m (b, r)IXA natural right fold for consuming a stream of elements. See also the more general  in the  Streaming( module and the still more general  foldrT (\a p -> Pipes.yield a >> p) :: Monad m => Stream (Of a) m r -> Producer a m r foldrT (\a p -> Conduit.yield a >> p) :: Monad m => Stream (Of a) m r -> Conduit a m rJWA natural right fold for consuming a stream of elements. See also the more general   in the  Streaming' module and the still more general Kforv replaces each element of a stream with an associated stream. Note that the associated stream may layer any functor. LHIterate a pure function from a seed value, streaming the results forever~KIterate a monadic function from a seed value, streaming the results foreverP)Standard map on the elements of a stream.QMFor each element of a stream, stream a foldable container of elements instead'S.print $ S.mapFoldable show $ yield 12'1''2'RDReplace each element of a stream with the result of a monadic actionS:Reduce a stream to its return value with a monadic action.0mapM_ Prelude.print $ each [1..3] >> return True123TrueTrThe standard way of inspecting the first item in a stream of elements, if the stream is still 'running'. The Right< case contains a Haskell pair, where the more general inspectE would return a left-strict pair. There is no reason to prefer inspect since, if the RightW case is exposed, the first element in the pair will have been evaluated to whnf. next :: Monad m => Stream (Of a) m r -> m (Either r (a, Stream (Of a) m r)) inspect :: Monad m => Stream (Of a) m r -> m (Either r (Of a (Stream (Of a) m r)))Interoperate with pipes producers thus: Pipes.unfoldr Stream.next :: Stream (Of a) m r -> Producer a m r Stream.unfoldr Pipes.next :: Producer a m r -> Stream (Of a) m r  Similarly:  IOStreams.unfoldM (liftM (either (const Nothing) Just) . next) :: Stream (Of a) IO b -> IO (InputStream a) Conduit.unfoldM (liftM (either (const Nothing) Just) . next) :: Stream (Of a) m r -> Source a m rBut see UUMInspect the first item in a stream of elements, without a return value. uncons6 provides convenient exit into another streaming type: IOStreams.unfoldM uncons :: Stream (Of a) IO b -> IO (InputStream a) Conduit.unfoldM uncons :: Stream (Of a) m r -> Conduit.Source m aVFold a  of numbers into their productWFold a 4 of numbers into their product with the return value F maps' product' :: Stream (Stream (Of Int)) m r -> Stream (Of Int) m rXKMake a stream of strings into a stream of parsed values, skipping bad casesYRepeat an element ad inf. .S.print $ S.take 3 $ S.repeat 1111ZRepeat a monadic action ad inf., streaming its results.&S.toListM $ S.take 2 (repeatM getLine) hello<Enter> world<Enter>["hello","world"]Repeat an element several times[6Repeat an action several times, streaming the results.'S.print $ S.replicateM 2 getCurrentTime2015-08-18 00:57:36.124508 UTC2015-08-18 00:57:36.124785 UTC\Read an IORef (Maybe a)$ or a similar device until it reads Nothing. reread# provides convenient exit from the  io-streams library reread readIORef :: IORef (Maybe a) -> Stream (Of a) IO () reread Streams.read :: System.IO.Streams.InputStream a -> Stream (Of a) IO ()]=Strict left scan, streaming, e.g. successive partial results. ZControl.Foldl.purely scan :: Monad m => Fold a b -> Stream (Of a) m r -> Stream (Of b) m rFStreaming.print $ Foldl.purely Streaming.scan Foldl.list $ each [3..5][][3][3,4][3,4,5]^Strict, monadic left scan `Control.Foldl.impurely scanM :: Monad m => FoldM a m b -> Stream (Of a) m r -> Stream (Of b) m rYlet v = L.impurely scanM L.vector $ each [1..4::Int] :: Stream (Of (U.Vector Int)) IO () S.print v fromList [] fromList [1]fromList [1,2]fromList [1,2,3]fromList [1,2,3,4]_ Like the 8 but streaming. The result type is a stream of a's, but is not accumulatedj; the effects of the elements of the original stream are interleaved in the resulting stream. Compare: wsequence :: Monad m => [m a] -> m [a] sequence :: Monad m => Stream (Of (m a)) m r -> Stream (Of a) m raFold a  of numbers into their sumbFold a 0 of numbers into their sum with the return value B maps' sum' :: Stream (Stream (Of Int)) m r -> Stream (Of Int) m rc?Stream elements until one fails the condition, return the rest.dSplit a succession of layers after some number, returning a streaming or -- effectful pair. This function is the same as the " exported by the --  Streamingk module, but since this module is imported qualified, it can -- usurp a Prelude name. It specializes to: c splitAt :: (Monad m, Functor f) => Int -> Stream (Of a) m r -> Stream (Of a) m (Stream (Of a) m r)eKEnd a stream after n elements; the original return value is thus lost. d- preserves this information. Note that, like splitAtF, this function is functor-general, so that, for example, you can takeh not just a number of items from a stream of elements, but a number of substreams and the like.9S.print $ mapsM sum' $ S.take 2 $ chunksOf 3 $ each [1..]6 -- sum of first group of 315 -- sum of second group of 3JS.print $ mapsM S.sum' $ S.take 2 $ chunksOf 3 $ S.each [1..4] >> S.readLn<6 -- sum of first group of 3, which is already in [1..4]100 -- user input10000 -- user input!10104 -- sum of second group of 3fQEnd stream when an element fails a condition; the original return value is lost c preserves this information.gConvert a pure  Stream (Of a) into a list of ash4Convert an effectful 'Stream (Of a)' into a list of as^Note: Needless to say this function does not stream properly. It is basically the same as R which, like [, _X and similar operations on traversable containers is a leading cause of space leaks.iConvert an effectful ' into a list alongside the return value C maps' toListM' :: Stream (Stream (Of a)) m r -> Stream (Of [a]) m jBuild a Stream* by unfolding steps starting from a seed. RThe seed can of course be anything, but this is one natural way to consume a pipes  . Consider:2S.stdoutLn $ S.take 2 (S.unfoldr P.next P.stdinLn) hello<Enter>hellogoodbye<Enter>goodbye8S.stdoutLn $ S.unfoldr P.next (P.stdinLn P.>-> P.take 2) hello<Enter>hellogoodbye<Enter>goodbyeFS.drain $ S.unfoldr P.next (P.stdinLn P.>-> P.take 2 P.>-> P.stdoutLn) hello<Enter>hellogoodbye<Enter>goodbyefIf the intended "coalgebra" is complicated it might be pleasant to write it with the state monad: ~\state seed -> S.unfoldr (runExceptT . runStateT state) seed :: Monad m => StateT s (ExceptT r m) a -> s -> P.Producer a m ralet state = do {n <- get ; if n >= 3 then lift (throwE "Got to three"); else put (n+1); return n}5S.print $ S.unfoldr (runExceptT . runStateT state) 0012"Got to three"kA singleton streamstdoutLn $ yield "hello"helloS.sum $ do {yield 1; yield 2}3fS.sum $ do {yield 1; lift $ putStrLn "# 1 was yielded"; yield 2; lift $ putStrLn "# 2 was yielded"}# 1 was yielded# 2 was yielded3Clet prompt :: IO Int; prompt = putStrLn "Enter a number:" >> readLnRS.sum $ do {lift prompt >>= yield ; lift prompt >>= yield ; lift prompt >>= yield}Enter a number:3<Enter>Enter a number: 20<Enter>Enter a number: 100<Enter>123lZip two Streamss mZip two Streams's using the provided combining functionnrepeatedly stream lines as  from stdin!stdoutLn $ S.show (S.each [1..3])123stdoutLn stdinLn hello<Enter>hello world<Enter>world^CInterrupted. stdoutLn $ S.map reverse stdinLn hello<Enter>olleh world<Enter>dlrow^CInterrupted.oRead values from , ignoring failed parses#S.sum $ S.take 2 S.readLn :: IO Int3<Enter>#$%^&\^?<Enter> 1000<Enter>1003pRead  s from a  using Terminates on end of inputDwithFile "distribute.hs" ReadMode $ stdoutLn . S.take 3 . fromHandleimport Streaming'import qualified Streaming.Prelude as S'import Control.Monad.Trans.State.StrictsWrite s to  using $; terminates on a broken output pipe#S.stdoutLn $ S.show (S.each [1..3])123tWrite s to  using mThis does not handle a broken output pipe, but has a polymorphic return value, which makes this possible:6rest <- stdoutLn' $ S.splitAt 3 $ S.show (each [1..5])123stdoutLn' rest45 Or indeed:6rest <- stdoutLn' $ S.show $ S.splitAt 3 (each [1..5])123 S.sum rest9E23456789:;<=>?@ABCDEFGHIJKL~MNOPQRSTUVWXYZ[\]^_`abcdefghijklmnopqrstE23456789:;<=>?@ABCDEFGHIJKLMNOPQRSTUVWXYZ[\]^_`abcdefghijklmnopqrstE234567k@MjnopLY<Z[ABstSrq=PR_QCDKef>?:]^9X`;TUd8cEFGHabVWNOghiJIlm\D23456789:;<=>?@ABCDEFGHIJKL~MNOPQRSTUVWXYZ[\]^_`abcdefghijklmnopqrst3NoneINu0  !"#$%&'-./012345KMu1KM'%&-.$0/1! "#!2345  uuu    !!"#$%&'()*+,-./0123456789:;<=>?@ABCDEFGHIJKLMNOPQRSTUVWXYZ[\]^_`abcdefghijklmnopqrstuvwxyz{|}~   strea_EJVL7AfmLmyBKFP2KemeTY StreamingStreaming.InternalStreaming.Preludeunfoldr Data.ListsequencePipesProducerbaseGHC.Basejoin Data.Function& Data.FunctorvoidliftA3liftA2mmorp_8dNpUU8QFPe77rrwKAb20bControl.Monad.MorphhoistMFunctorembedMMonadtrans_3eG64VdP2vzGjP6wJiCp5XControl.Monad.Trans.Class MonadTransliftControl.Monad.IO.ClassMonadIOliftIOData.Functor.Compose getComposeComposeStreamStepDelayReturndestroy destroyWith constructinspectunfoldmapsmapsM runEffectmapsM_layer intercalatesiterTMiterTconcatssplitsAtchunksOf distributerepeatsrepeatsM replicates hoistExposed mapsExposed mapsMExposeddestroyExposed unexposedwrapstepzipsWithzips interleavesOf:>lazilystrictlyfst'snd'breakchainconcatconscycledraindrop dropWhileeachenumFrom enumFromThenfilterfilterMfoldfold'foldMfoldM'foldrTfoldrMforiteratelayerslengthlength'map mapFoldablemapMmapM_nextunconsproductproduct'readrepeatrepeatM replicateMrereadscanscanMshowsumsum'spansplitAttake takeWhiletoListtoListMtoListM'yieldzipzipWithstdinLnreadLn fromHandletoHandleprintstdoutLn stdoutLn'-->cycles$fMonadIOStream$fMMonadStream$fMFunctorStream$fMonadTransStream$fApplicativeStream $fMonadStream$fFunctorStreamiterateM replicateStringGHC.IO.Handle.FDstdinGHC.IO.Handle.TypesHandleGHC.IO.Handle.TexthGetLinestdout System.IOputStrLn