The streaming package
This package contains two modules, Streaming and Streaming.Prelude. The principal module, Streaming.Prelude, exports an elementary streaming prelude focused on a simple "source" or "producer" type, namely Stream (Of a) m r. This is a sort of effectful version of ([a],r) in which successive elements of type a arise from some sort of monadic action before the succession ends with a value of type r. Everything in the library is organized to make programming with this type as simple as possible, by the simple expedient of making it as close to Prelude and Data.List as possible. Thus for example the trivial program
>>> S.sum $ S.take 3 (S.readLn :: Stream (Of Int) IO ()) 1<Enter> 2<Enter> 3<Enter> 6 :> ()
sums the first three valid integers from user input. Similarly,
>>> S.stdoutLn $ S.map (map toUpper) $ S.take 2 S.stdinLn hello<Enter> HELLO world!<Enter> WORLD!
upper-cases the first two lines from stdin as they arise, and sends them to stdout. And so on, with filtering, mapping, breaking, chunking, zipping, unzipping, replicating and so forth: we program with streams of Ints or Strings directly as if they constituted something like a list. That's because streams really do constitute something like a list, and the associated operations can mostly have the same names. (A few, like reverse, don't stream and thus disappear; others like unzip are here given properly streaming formulation for the first time.) And we everywhere oppose "extracting a pure list from IO", which is the origin of typical Haskell memory catastrophes. Basically any case where you are tempted to use mapM, replicateM, traverse or sequence with Haskell lists, you would do better to use something like Stream (Of a) m r. The type signatures are a little fancier, but the programs themselves are mostly the same. In fact, they are mostly simpler. Thus, consider the trivial demo program mentioned in this SO question
main = mapM newIORef [1..10^8::Int] >>= mapM readIORef >>= mapM_ print
The new user notices that this exhausts memory, and worries about the efficiency of Haskell IORefs. But of course it exhausts memory! Look what it says! The problem is immediately cured by writing
main = S.print $ S.mapM readIORef $ S.mapM newIORef $ S.each [1..10^8::Int]
which really does what the other program was meant to do, uses no more memory than hello-world, and is simpler anyway, since it doesn't involve the detour of "extracting a list from IO". Almost every use of list mapM, replicateM, traverse and sequence produces this problem on a smaller scale. People get used to it, as if it were characteristic of Haskell programs to use a lot of memory. But in truth "extracting a list or sequence from IO" is mostly just bad practice pure and simple. Of course, mapM, replicateM, traverse and sequence make sense for lists, under certain conditions! But unsafePerformIO also makes sense under certain conditions.
The Streaming module exports the general type, Stream f m r, which can be used to stream successive distinct steps characterized by any functor f, though we are mostly interested in organizing computations of the form Stream (Of a) m r. The streaming-IO libraries have various devices for dealing with effectful variants of [a] or ([a],r) in which the emergence of successive elements somehow depends on IO. But it is only with the general type Stream f m r, or some equivalent, that one can envisage (for example) the connected streaming of their sorts of stream - as one makes lists of lists in the Haskell Prelude and Data.List. One needs some such type if we are to express properly streaming equivalents of e.g.
group :: Ord a => [a] -> [[a]] chunksOf :: Int -> [a] -> [[a]] lines :: [Char] -> [[Char]] -- but similarly with byte streams, etc.
to mention a few obviously desirable operations. (This is explained more elaborately in the readme below.)
One could throw of course throw something like the present Stream type on top of a prior stream concept: this is how pipes and pipes-group (which are very much our model here) use FreeT. But once one grasps the iterable stream concept needed to express those functions then one will also see that, with it, one is already in possession of a complete elementary streaming library - since one possesses Stream ((,) a) m r or equivalently Stream (Of a) m r. This is the type of a 'generator' or 'producer' or 'source' or whatever you call an effectful stream of items. The present Streaming.Prelude is thus the simplest streaming library that can replicate anything like the API of the Prelude and Data.List.
The emphasis of the library is on interoperation; for the rest its advantages are: extreme simplicity, re-use of intuitions the user has gathered from mastery of Prelude and Data.List, and a total and systematic rejection of type synonyms. The two conceptual pre-requisites are some comprehension of monad transformers and some familiarity with 'rank 2 types'. It is hoped that experimentation with this simple material, starting with the ghci examples in Streaming.Prelude, will give people who are new to these concepts some intuition about their importance. The most fundamental purpose of the library is to express elementary streaming ideas without reliance on a complex framework, but in a way that integrates transparently with the rest of Haskell, using ideas - e.g. rank 2 types, which are here implicit or explicit in most mapping - that the user can carry elsewhere, rather than chaining her understanding to the curiosities of a so-called streaming IO framework (as necessary as that is for certain purposes.)
See the readme below for further explanation, including the examples linked there. Elementary usage can be divined from the ghci examples in Streaming.Prelude and perhaps from this rough beginning of a tutorial. Note also the streaming bytestring and streaming utils packages. Questions about usage can be put raised on StackOverflow with the tag [haskell-streaming], or as an issue on Github, or on the pipes list (the package understands itself as part of the pipes 'ecosystem'.)
The simplest form of interoperation with pipes is accomplished with this isomorphism:
Pipes.unfoldr Streaming.next :: Stream (Of a) m r -> Producer a m r Streaming.unfoldr Pipes.next :: Producer a m r -> Stream (Of a) m r
Interoperation with io-streams is thus:
Streaming.reread IOStreams.read :: InputStream a -> Stream (Of a) IO () IOStreams.unfoldM Streaming.uncons :: Stream (Of a) IO () -> IO (InputStream a)
With conduit one might use, e.g.:
Conduit.unfoldM Streaming.uncons :: Stream (Of a) m () -> Source m a \str -> Streaming.mapM_ Conduit.yield (hoist lift str) :: Stream (Of o) m r -> ConduitM i o m r \src -> hoist lift str $$ Conduit.mapM_ Streaming.yield :: Source m a -> Stream (Of a) m ()
These conversions should never be more expensive than a single >-> or =$=. The simplest interoperation with regular Haskell lists is provided by, say
Streaming.each :: [a] -> Stream (Of a) m () Streaming.toList_ :: Stream (Of a) m r -> m [a]
The latter of course accumulates the whole list in memory, and is mostly what we are trying to avoid. Every use of Prelude.mapM f should be reconceived as using the composition Streaming.toList_ . Streaming.mapM f . Streaming.each with a view to considering whether the accumulation required by Streaming.toList_ is really necessary.
Because these are microbenchmarks for individual functions, they represent a sort of "worst case"; many other factors can influence the speed of a complex program.
[Skip to Readme]
|Versions||0.1.0.0, 0.1.0.1, 0.1.0.3, 0.1.0.4, 0.1.0.5, 0.1.0.6, 0.1.0.7, 0.1.0.8, 0.1.0.9, 0.1.0.10, 0.1.0.11, 0.1.0.12, 0.1.0.13, 0.1.0.14, 0.1.0.15, 0.1.0.16, 0.1.0.17, 0.1.0.18, 0.1.0.19, 0.1.0.20, 0.1.1.0, 0.1.1.1, 0.1.2.0, 0.1.2.2, 0.1.3.0, 0.1.3.1, 0.1.3.2, 0.1.3.3, 0.1.3.4, 0.1.4.0, 0.1.4.1, 0.1.4.2, 0.1.4.3, 0.1.4.4, 0.1.4.5|
|Dependencies||base (>=4.6 && <5), containers, exceptions (>0.5 && <0.9), ghc-prim, mmorph (==1.0.*), monad-control (>=0.3.1 && <1.1), mtl (>=2.1 && <2.3), resourcet (>1.1.0 && <1.2), time, transformers (>=0.4 && <0.6), transformers-base (<0.5) [details]|
|Category||Data, Pipes, Streaming|
|Source repository||head: git clone https://github.com/michaelt/streaming|
|Uploaded||Tue Jan 17 08:10:41 UTC 2017 by MichaelThompson|
|Distributions||LTSHaskell:0.1.4.5, NixOS:0.1.4.5, Stackage:0.1.4.5, Tumbleweed:0.1.4.5|
|Downloads||3881 total (376 in the last 30 days)|
|Status||Docs uploaded by user
Build status unknown [no reports yet]
Hackage Matrix CI
For package maintainers and hackage trustees