The streaming package

[ Tags: bsd3, data, library, pipes, streaming ] [ Propose Tags ]

This package contains two modules, Streaming and Streaming.Prelude. The principal module, Streaming.Prelude, exports an elementary streaming prelude focused on a simple "source" or "producer" type, namely Stream (Of a) m r. This is a sort of effectful version of ([a],r) in which successive elements of type a arise from some sort of monadic action before the succession ends with a value of type r. Everything in the library is organized to make programming with this type as simple as possible, by the simple expedient of making it as close to Prelude and Data.List as possible. Thus for example the trivial program

>>> S.sum $ S.take 3 (S.readLn :: Stream (Of Int) IO ())
6 :> ()

sums the first three valid integers from user input. Similarly,

>>> S.stdoutLn $ (map toUpper) $ S.take 2 S.stdinLn

upper-cases the first two lines from stdin as they arise, and sends them to stdout. And so on, with filtering, mapping, breaking, chunking, zipping, unzipping, replicating and so forth: we program with streams of Ints or Strings directly as if they constituted something like a list. That's because streams really do constitute something like a list, and the associated operations can mostly have the same names. (A few, like reverse, don't stream and thus disappear; others like unzip are here given properly streaming formulation for the first time.) And we everywhere oppose "extracting a pure list from IO", which is the origin of typical Haskell memory catastrophes. Basically any case where you are tempted to use mapM, replicateM, traverse or sequence with Haskell lists, you would do better to use something like Stream (Of a) m r. The type signatures are a little fancier, but the programs themselves are mostly the same. In fact, they are mostly simpler. Thus, consider the trivial demo program mentioned in this SO question

main = mapM newIORef [1..10^8::Int] >>= mapM readIORef >>= mapM_ print

The new user notices that this exhausts memory, and worries about the efficiency of Haskell IORefs. But of course it exhausts memory! Look what it says! The problem is immediately cured by writing

main = S.print $ S.mapM readIORef $ S.mapM newIORef $ S.each [1..10^8::Int]

which really does what the other program was meant to do, uses no more memory than hello-world, and is simpler anyway, since it doesn't involve the detour of "extracting a list from IO". Almost every use of list mapM, replicateM, traverse and sequence produces this problem on a smaller scale. People get used to it, as if it were characteristic of Haskell programs to use a lot of memory. But in truth "extracting a list or sequence from IO" is mostly just bad practice pure and simple. Of course, mapM, replicateM, traverse and sequence make sense for lists, under certain conditions! But unsafePerformIO also makes sense under certain conditions.

The Streaming module exports the general type, Stream f m r, which can be used to stream successive distinct steps characterized by any functor f, though we are mostly interested in organizing computations of the form Stream (Of a) m r. The streaming-IO libraries have various devices for dealing with effectful variants of [a] or ([a],r) in which the emergence of successive elements somehow depends on IO. But it is only with the general type Stream f m r, or some equivalent, that one can envisage (for example) the connected streaming of their sorts of stream - as one makes lists of lists in the Haskell Prelude and Data.List. One needs some such type if we are to express properly streaming equivalents of e.g.

group :: Ord a => [a] -> [[a]]
chunksOf :: Int -> [a] -> [[a]]
lines :: [Char] -> [[Char]] -- but similarly with byte streams, etc.

to mention a few obviously desirable operations. (This is explained more elaborately in the readme below.)

One could throw of course throw something like the present Stream type on top of a prior stream concept: this is how pipes and pipes-group (which are very much our model here) use FreeT. But once one grasps the iterable stream concept needed to express those functions then one will also see that, with it, one is already in possession of a complete elementary streaming library - since one possesses Stream ((,) a) m r or equivalently Stream (Of a) m r. This is the type of a 'generator' or 'producer' or 'source' or whatever you call an effectful stream of items. The present Streaming.Prelude is thus the simplest streaming library that can replicate anything like the API of the Prelude and Data.List.

The emphasis of the library is on interoperation; for the rest its advantages are: extreme simplicity, re-use of intuitions the user has gathered from mastery of Prelude and Data.List, and a total and systematic rejection of type synonyms. The two conceptual pre-requisites are some comprehension of monad transformers and some familiarity with 'rank 2 types'. It is hoped that experimentation with this simple material, starting with the ghci examples in Streaming.Prelude, will give people who are new to these concepts some intuition about their importance. The most fundamental purpose of the library is to express elementary streaming ideas without reliance on a complex framework, but in a way that integrates transparently with the rest of Haskell, using ideas - e.g. rank 2 types, which are here implicit or explicit in most mapping - that the user can carry elsewhere, rather than chaining her understanding to the curiosities of a so-called streaming IO framework (as necessary as that is for certain purposes.)

See the readme below for further explanation, including the examples linked there. Elementary usage can be divined from the ghci examples in Streaming.Prelude and perhaps from this rough beginning of a tutorial. Note also the streaming bytestring and streaming utils packages. Questions about usage can be put raised on StackOverflow with the tag [haskell-streaming], or as an issue on Github, or on the pipes list (the package understands itself as part of the pipes 'ecosystem'.)

The simplest form of interoperation with pipes is accomplished with this isomorphism:

Pipes.unfoldr        :: Stream (Of a) m r   -> Producer a m r
Streaming.unfoldr        :: Producer a m r      -> Stream (Of a) m r

Interoperation with io-streams is thus:

Streaming.reread     :: InputStream a       -> Stream (Of a) IO ()
IOStreams.unfoldM Streaming.uncons  :: Stream (Of a) IO () -> IO (InputStream a)

With conduit one might use, e.g.:

Conduit.unfoldM Streaming.uncons                        :: Stream (Of a) m () -> Source m a
\str -> Streaming.mapM_ Conduit.yield (hoist lift str)  :: Stream (Of o) m r  -> ConduitM i o m r
\src -> hoist lift str $$ Conduit.mapM_ Streaming.yield :: Source m a         -> Stream (Of a) m ()

These conversions should never be more expensive than a single >-> or =$=. The simplest interoperation with regular Haskell lists is provided by, say

Streaming.each                                 :: [a]               -> Stream (Of a) m ()
Streaming.toList_                              :: Stream (Of a) m r -> m [a]

The latter of course accumulates the whole list in memory, and is mostly what we are trying to avoid. Every use of Prelude.mapM f should be reconceived as using the composition Streaming.toList_ . Streaming.mapM f . Streaming.each with a view to considering whether the accumulation required by Streaming.toList_ is really necessary.

Here are the results of some microbenchmarks based on the benchmarks included in the machines package:

Because these are microbenchmarks for individual functions, they represent a sort of "worst case"; many other factors can influence the speed of a complex program.

[Skip to Readme]


Dependencies base (>=4.6 && <5), containers, exceptions (>0.5 && <0.9), ghc-prim, mmorph (==1.0.*), monad-control (>=0.3.1 && <1.1), mtl (>=2.1 && <2.3), resourcet (>1.1.0 && <1.2), time, transformers (>=0.4 && <0.6), transformers-base (<0.5) [details]
License BSD3
Author michaelt
Category Data, Pipes, Streaming
Home page
Bug tracker
Source repository head: git clone
Uploaded Tue Jan 17 08:10:41 UTC 2017 by MichaelThompson
Distributions LTSHaskell:, NixOS:, Tumbleweed:
Downloads 4136 total (44 in the last 30 days)
Rating 2.5 (4 ratings) [clear rating]
  • λ
  • λ
  • λ
Status Docs uploaded by user
Build status unknown [no reports yet]
Hackage Matrix CI




Maintainer's Corner

For package maintainers and hackage trustees

Readme for streaming-

[back to package description]



§ 1. The freely generated stream on a streamable functor

§ 2. A freely generated stream of individual Haskell values is a Producer, Generator or Source

§ 3. Streaming.Prelude

§ 4. Mother's Prelude v. Streaming.Prelude

§ 5. How come there's not one of those fancy "ListT done right" implementations in here?

§ 6. Didn't I hear that free monads are a dog from the point of view of efficiency?

§ 7. Interoperation with the streaming-io libraries

§ 8. Where can I find examples of use?

§ 9. Problems

§ 10. Implementation and benchmarking notes

§ 1. The freely generated stream on a streamable functor

Stream can be used wherever FreeT or Coroutine are used. The compiler's standard range of optimizations work better for operations written in terms of Stream. Stream f m r, like FreeT f m r or Couroutine f m r - is of course extremely general, and many functor-general combinators are exported by the general module Streaming.

In the applications we are thinking of, the general type Stream f m r expresses a succession of steps arising in a monad m, with a shape determined by the 'functor' parameter f, and resulting in a final value r. In the first instance you might read Stream as Repeatedly, with the understanding that one way of doing something some number of times, is to do it no times at all.

Readings of f can be wildly various. Thus, for example,

 Stream Identity IO r

is the type of an indefinitely delayed IO r, or an extended IO process broken into stages marked by the Identity constructor. This is the Trampoline type of the "Coroutine Pipelines" tutorial, and the IterT of the free library (which is mysteriously not identified with FreeT Identity - all of the associated combinators are found within the general Streaming module.)

In particular, though, given readings of f and m we can, for example, always consider the type Stream (Stream f m) m r, in which steps of the form Stream f m are joined end to end. Such a stream-of-streams might arise in any number of ways; a crude (because hyper-general) way would be with

chunksOf :: Monad m, Functor f => Int -> Stream f m r -> Stream (Stream f m) m r

and we can always rejoin such a stream with

concats ::  Monad m, Functor f =>  Stream (Stream f m) m r -> Stream f m r

But other things can be chunked and concatenated in that sense; they need not themselves be explicitly represented in terms of Stream; indeed chunksOf and concats are modeled on those in pipes-group. In our variant of pipes-group, these have the types

chunksOf :: Monad m => Int -> Producer a m r -> Stream (Producer a m) m r
concats ::  Monad m =>  Stream (Producer a m) m r -> Producer a m r

§ 2. A freely generated stream of individual Haskell values is a Producer, Generator or Source

Of course, as soon as you grasp the general form of succession you are already in possession of the most basic concrete form: a simple succession of individual Haskell values one after another, the effectful list or sequence. This is just Stream ((,) a) m r. Here we prefer to write Stream (Of a) m r, strictifying the left element of the pair with

data Of a r = !a :> r deriving Functor

Either way, the pairing just links the present element with the rest of the stream. The primitive yield statement just expresses the pairing of the yielded item with the rest of the stream; or rather it is itself the trivial singleton stream.

yield 17  :: Stream (Of Int) IO ()

Streaming.Prelude is focused on the manipulation of this all-important stream-form, which appears in the streaming IO libraries under titles like:

io-streams: Generator a r
pipes:      Producer a m r
conduit:    ConduitM () o m r
streaming:  Stream (Of a) m r

The only difference is that in streaming the simple generator or producer concept is formulated explicitly in terms of the general concept of successive connection. But this is a concept you need and already possess anyway, as your comprehension of the streaming ABCs showed.

The special case of a stream of individual Haskell values that simply comes to an end without a special result is variously expressed thus:

io-streams: InputStream a 
pipes:      Producer a m ()
conduit:    Source m a
machines:   SourceT m a (= forall k. MachineT m k a)
streaming:  Stream (Of a) m ()

Note that the above libraries generally employ elaborate systems of type synonyms in order to intimate to the reader the meaning of specialized forms. io-streams is an exception. This libary is completely opposed to this tendency, and exports no synonyms.

§ 3. Streaming.Prelude

Streaming.Prelude closely follows Pipes.Prelude. But since it restricts itself to use only of the general idea of streaming, it cleverly omits the pipes:

ghci> S.stdoutLn $ S.take 2 S.stdinLn

Here's a little connect and resume, as the streaming-io experts call it:

ghci> rest <- S.print $ S.splitAt 3 $ S.each [1..10]
ghci> S.sum rest

Somehow, we didn't even need a four-character operator for that, nor advice about best practices! - just ordinary Haskell common sense.

§ 4. Mother's Prelude v. Streaming.Prelude

The effort of Streaming.Prelude is to leverage the intuition the user has acquired in mastering Prelude and Data.List and to elevate her understanding into a general comprehension of effectful streaming transformations. Unsurprisingly, it takes longer to type out the signatures. It cannot be emphasized enough, though, that the transpositions are totally mechanical:

Data.List.Split.chunksOf :: Int -> [a]          -> [[a]]
Streaming.chunksOf       :: Int -> Stream f m r -> Stream (Stream f m) m r

Prelude.splitAt   :: Int -> [a]          -> ([a],[a])
Streaming.splitAt :: Int -> Stream f m r -> Stream f m (Stream f m r)

These concepts are "functor general", in the jargon used in the documentation, and are thus exported by the main Streaming module. Something like break requires us to inspect individual values for their properties, so it is found in the Streaming.Prelude

Prelude.break           :: (a -> Bool) -> [a]               -> ([a],[a])
Streaming.Prelude.break :: (a -> Bool) -> Stream (Of a) m r -> Stream (Of a) m (Stream (Of a) m r)

It is easy to prove that resistance to these types is resistance to effectful streaming itself. I will labor this point a bit more below, but you can also find it developed, with greater skill, in the documentation for the pipes libraries.

§ 5. How come there's not one of those fancy "ListT done right" implementations in here?

The use of the final return value appears to be a complication, but in fact it is essentially contained in the idea of effectful streaming. This is why this library does not export a _ListT done right/, which would be simple enough - following pipes, as usual:

newtype ListT m a = ListT (Stream (Of a) m ())

The associated monad instance would wrap

yield :: (Monad m)            => a -> Stream (Of a) m ()
for   :: (Monad m, Functor f) => Stream (Of a) m r -> (a -> Stream f m ()) -> Stream f m r

To see the trouble, consider this signature for splitting a ListT very much done right. Here's what becomes of chunksOf. As long as we are trapped in some sort of ListT, however much rightly implemented, these operations can't be made to stream; something like a list must be accumulated. Similarly, try to imagine adding a splitAt or lines function to this API. It would accumulate strict text forever, just as this does and this doesn't and this doesn't The difference is simply that the latter libraries operate with the general concept of streaming, and the whole implementation is governed by it. The attractions of the various "ListT done right" implementations are superficial; the concept belongs to logic programming, not stream programming.

Note similarly that you can write a certain kind of take and drop with the machines library - as you can even with a "ListT done right". But I wish you luck writing splitAt! Similarly you can write a getContents; but I wish you luck dividing the resulting bytestream on its lines. This is - as usual! - because the library was not written with the general concept of effectful succession or streaming in view. Materials for sinking some elements of a stream in one way, and others in other ways - copying each line to a different file, as it might be, but without accumulation - are documented within. So are are myriad other elementary operations of streaming io.

§ 6. Didn't I hear that free monads are a dog from the point of view of efficiency?

We noted above that if we instantiate Stream f m r to Stream ((,) a) m r or the like, we get the standard idea of a producer or generator. If it is instantiated to Stream f Identity m r then we have the standard _free monad construction/. This construction is subject to certain familiar objections from an efficiency perspective; efforts have been made to substitute exotic cps-ed implementations and so forth. It is an interesting topic.

But in fact, the standard alarmist talk about retraversing binds and quadratic explosions and costly appends, and so on become transparent nonsense with Stream f m r
in its streaming use. The conceptual power needed to see this is basically nil: Where m is read as IO, or some transformed IO, then the dreaded retraversing of the binds in a stream expression would involve repeating all the past actions. Don't worry, to get e.g. the second chunk of bytes from a handle, you won't need to start over and get the first one again! The first chunk has vanished into an unrepeatable past.

All of the difficulties a streaming library is attempting to avoid are concentrated in the deep irrationality of

sequence :: (Monad m, Traversable t) => t (m a) -> m (t a)

In the streaming context, this becomes

sequence :: Monad m, Functor f => Stream f m r -> Stream f m r
sequence = id

It is of course easy enough to define

accumulate :: Monad m, Functor f => Stream f m r -> m (Stream f Identity r)

or reifyBindsRetraversingWherePossible or _ICan'tTakeThisStreamingAnymore, as you might call it. The types themselves teach the user how to avoid or control the sort of accumulation characteristic of sequence in its various guises e.g. mapM f = sequence . map f and traverse f = sequence . fmap f and replicateM n = sequence . replicate n. See for example the types of

Control.Monad.replicateM :: Int -> m a -> m [a]
Streaming.Prelude.replicateM :: Int -> m a -> Stream (Of a) m ()

If you want to tempt fate and replicate the irrationality of Control.Monad.replicateM, then sure, you can define the hermaphroditic chimera

accumulate . Streaming.Prelude.replicateM :: Int -> m a -> m (Stream (Of a) Identity ())

which is what we find in our diseased base libraries. But once you know how to operate with a stream directly you will see less and less point in what is called extracting the (structured) value from IO. Consider the apparently innocent distinction between

"getContents" :: String


getContents :: IO String 

Omitting consideration of eof, we might define getContents thus

getContents = sequence $ repeat getChar

There it is again! The very devil! By contrast there is no distinction between

"getContents" :: Stream (Of Char) m ()  -- the IsString instance is monad-general


getContents :: MonadIO m => Stream (Of Char) m ()

They unify just fine. That is, if I make the type synonym

type String m r = Stream (Of Char) m r

I get, for example:

"getLine"                              :: String m  ()
getLine                                :: String IO ()
"getLine" >> getLine                   :: String IO ()
splitAt 20 $ "getLine" >> getLine      :: String IO (String IO ())
length $ "getLine" >> getLine          :: IO Int

and can dispense with half the advice they will give you on #haskell. It is only a slight exaggeration to say that a stream should never be "extracted from IO".

With sequence and traverse, we accumulate a pure succession of pure values from a pure succession of monadic values. Why bother if you have intrinsically monadic conception of succession or traversal? Stream f m r gives you an immense body of such structures and a simple discipline for working with them. Spinkle id freely though your program, under various names, if you get homesick for sequence and company.

§ 7. Interoperation with the streaming-io libraries

The simplest form of interoperation with pipes is accomplished with this isomorphism:

Pipes.unfoldr        :: Stream (Of a) m r   -> Producer a m r
Streaming.unfoldr        :: Producer a m r      -> Stream (Of a) m r                     

Of course, streaming can be mixed with pipes wherever pipes itself employs Control.Monad.Trans.Free; speedups are frequently appreciable. (This was the original purpose of the main Streaming module, which just mechanically transposes a simple optimization employed in Pipes.Internal.) Interoperation with io-streams is thus:

Streaming.reread     :: InputStream a       -> Stream (Of a) IO ()
IOStreams.unfoldM Streaming.uncons  :: Stream (Of a) IO () -> IO (InputStream a)

A simple exit to conduit would be, e.g.:

Conduit.unfoldM Streaming.uncons    :: Stream (Of a) m ()  -> Source m a

These conversions should never be more expensive than a single >-> or =$=.

At a much more general level, we also of course have interoperation with free:

Free.iterTM  Stream.wrap              :: FreeT f m a -> Stream f m a
Stream.iterTM Free.wrap               :: Stream f m a -> FreeT f m a 

§ 8. Where can I find examples of use?

For some simple ghci examples, see the commentary throughout the Prelude module. For slightly more advanced usage see the commentary in the haddocks of streaming-bytestring and e.g. these replicas of shell-like programs from the io-streams tutorial. Here's a simple streaming GET request with intrinsically streaming byte streams. Here is a comically simple 'high - low' game

§ 9. Problems

Questions about this library can be put as issues through the github site or on the pipes mailing list. (This library understands itself as part of the pipes "ecosystem.")

§ 10. Implementation and benchmarking notes

This library defines an optimized FreeT with an eye to use with streaming libraries, namely:

data Stream f m r
     = Return r
     | Step !(f (Stream f m r))
     | Effect (m (Stream f m r))

in place of the standard FreeT that we find in the free library, which is approximately:

newtype FreeT f m r = FreeT {runFreeT :: m (Either r (f (FreeT f m r)))}

Rather than wrapping each step in a monadic 'layer', such a layer is put alongside separate 'pure' constructors for a functor 'layer' and a final return value. The maneuver is very friendly to the compiler, but requires a bit of subtlety to protect a sound monad instance. Just such an optimization is adopted internally by the pipes library. As in pipes, the constructors are here left in an Internal module; the main Streaming module exporting the type itself and various operations and instances.

I ran a simple benchmark (adjusting a script of John Weigly) using a very simple composition of functions:

. filter (\x -> x `mod` 2 == 0) 
. map (+1) 
. drop 1000 
. map (+1) 
. filter even 
. each

as it interpreted by various libraries - streaming, conduit, io-streams and machines.

The results were fairly pleasing:

benchmarking sum/streaming
time                 8.996 ms   (8.910 ms .. 9.068 ms)
                     0.999 R²   (0.998 R² .. 1.000 R²)
mean                 9.060 ms   (9.004 ms .. 9.122 ms)
std dev              164.6 μs   (123.9 μs .. 251.9 μs)

benchmarking sum/conduit
time                 15.77 ms   (15.66 ms .. 15.89 ms)
                     0.999 R²   (0.998 R² .. 1.000 R²)
mean                 15.78 ms   (15.70 ms .. 15.89 ms)
std dev              245.3 μs   (176.5 μs .. 379.7 μs)

benchmarking sum/pipes
time                 57.94 ms   (57.68 ms .. 58.27 ms)
                     1.000 R²   (1.000 R² .. 1.000 R²)
mean                 58.10 ms   (57.92 ms .. 58.27 ms)
std dev              324.2 μs   (214.1 μs .. 468.8 μs)

benchmarking sum/iostreams
time                 61.96 ms   (61.36 ms .. 62.53 ms)
                     1.000 R²   (0.999 R² .. 1.000 R²)
mean                 61.80 ms   (61.54 ms .. 62.08 ms)
std dev              543.7 μs   (375.1 μs .. 715.7 μs)

benchmarking sum/machine
time                 260.4 ms   (257.2 ms .. 263.6 ms)
                     1.000 R²   (0.999 R² .. 1.000 R²)
mean                 259.7 ms   (258.4 ms .. 260.6 ms)
std dev              1.284 ms   (565.9 μs .. 1.690 ms)
variance introduced by outliers: 16% (moderately inflated)

benchmarking basic/streaming
time                 74.86 ms   (70.07 ms .. 78.78 ms)
                     0.994 R²   (0.987 R² .. 0.999 R²)
mean                 78.25 ms   (75.55 ms .. 84.10 ms)
std dev              6.301 ms   (1.995 ms .. 10.17 ms)
variance introduced by outliers: 19% (moderately inflated)

benchmarking basic/conduit
time                 90.06 ms   (66.61 ms .. 114.4 ms)
                     0.876 R²   (0.658 R² .. 0.977 R²)
mean                 98.63 ms   (85.28 ms .. 116.5 ms)
std dev              23.06 ms   (10.61 ms .. 30.72 ms)
variance introduced by outliers: 65% (severely inflated)

benchmarking basic/pipes
time                 180.9 ms   (158.7 ms .. 201.3 ms)
                     0.989 R²   (0.971 R² .. 1.000 R²)
mean                 190.5 ms   (183.0 ms .. 197.8 ms)
std dev              10.16 ms   (4.910 ms .. 14.86 ms)
variance introduced by outliers: 14% (moderately inflated)

benchmarking basic/iostreams
time                 269.7 ms   (243.8 ms .. 303.9 ms)
                     0.995 R²   (0.985 R² .. 1.000 R²)
mean                 264.2 ms   (254.0 ms .. 272.0 ms)
std dev              10.87 ms   (5.762 ms .. 15.06 ms)
variance introduced by outliers: 16% (moderately inflated)

benchmarking basic/machine
time                 397.7 ms   (324.4 ms .. 504.8 ms)
                     0.992 R²   (0.977 R² .. 1.000 R²)
mean                 407.7 ms   (391.1 ms .. 420.3 ms)
std dev              19.40 ms   (0.0 s .. 21.88 ms)
variance introduced by outliers: 19% (moderately inflated)


This sequence of pre-packaged combinators is, I think, as friendly as it could possibly be to the more recent conduit fusion framework. That framework of course doesn't apply to user-defined operations; there we should expect times like those shown for pipes. Since the combinators from streaming are defined with naive recursion, more or less as the user might, we have reason to think this result is characteristic, but much more benchmarking is needed before anything can be said with certainty.