streaming-0.1.0.9: A free monad transformer optimized for streaming applications.

Safe HaskellNone
LanguageHaskell2010

Streaming.Prelude

Contents

Description

This module is very closely modeled on Pipes.Prelude; it attempts to simplify and optimize the conception of Producer manipulation contained in Pipes.Group, Pipes.Parse and the like. This is very simple and unmysterious; it is independent of piping and conduiting, and can be used with any rational "streaming IO" system.

Some interoperation incantations would be e.g.

Pipes.unfoldr Streaming.next        :: Stream (Of a) m r   -> Producer a m r
Streaming.unfoldr Pipes.next        :: Producer a m r      -> Stream (Of a) m r                     
Streaming.reread IOStreams.read     :: InputStream a       -> Stream (Of a) IO ()
IOStreams.unfoldM Streaming.uncons  :: Stream (Of a) IO () -> IO (InputStream a)
Conduit.unfoldM Streaming.uncons    :: Stream (Of a) m ()  -> Source m a

Import qualified thus:

import Streaming
import qualified Streaming.Prelude as S

For the examples below, one sometimes needs

import Streaming.Prelude (each, yield, stdoutLn, stdinLn)
import qualified Control.Foldl as L -- cabal install foldl
import qualified Pipes as P
import qualified Pipes.Prelude as P
import qualified System.IO as IO

Synopsis

Types

data Of a b Source

A left-strict pair; the base functor for streams of individual elements.

Constructors

!a :> b infixr 4 

Instances

Functor (Of a) Source 
Foldable (Of a) Source 
Traversable (Of a) Source 
(Eq a, Eq b) => Eq (Of a b) Source 
(Data a, Data b) => Data (Of a b) Source 
(Ord a, Ord b) => Ord (Of a b) Source 
(Read a, Read b) => Read (Of a b) Source 
(Show a, Show b) => Show (Of a b) Source 

lazily :: Of a b -> (a, b) Source

strictly :: (a, b) -> Of a b Source

fst' :: Of a b -> a Source

snd' :: Of a b -> b Source

Introducing streams of elements

yield :: Monad m => a -> Stream (Of a) m () Source

A singleton stream

>>> stdoutLn $ yield "hello"
hello
>>> S.sum $ do {yield 1; yield 2}
3
>>> S.sum $ do {yield 1;  lift $ putStrLn "# 1 was yielded";  yield 2;  lift $ putStrLn "# 2 was yielded"}
# 1 was yielded
# 2 was yielded
3
>>> let prompt :: IO Int; prompt = putStrLn "Enter a number:" >> readLn
>>> S.sum $ do {lift prompt >>= yield ; lift prompt >>= yield ; lift prompt >>= yield}
Enter a number:
3<Enter>
Enter a number:
20<Enter>
Enter a number:
100<Enter>
123

each :: (Monad m, Foldable f) => f a -> Stream (Of a) m () Source

Stream the elements of a foldable container.

>>> S.print $ S.map (*100) $ each [1..3] >> yield 4
0
100
200
300
400
>>> S.print $ S.map (*100) $ each [1..3] >> lift readLn >>= yield
100
200
300
4<Enter>
400

layers :: (Monad m, Functor f) => Stream (Of a) m r -> (a -> f x) -> Stream f m r Source

unfoldr :: Monad m => (s -> m (Either r (a, s))) -> s -> Stream (Of a) m r Source

Build a Stream by unfolding steps starting from a seed.

The seed can of course be anything, but this is one natural way to consume a pipes Producer. Consider:

>>> S.stdoutLn $ S.take 2 (S.unfoldr P.next P.stdinLn)
hello<Enter>
hello
goodbye<Enter>
goodbye
>>> S.stdoutLn $ S.unfoldr P.next (P.stdinLn P.>-> P.take 2)
hello<Enter>
hello
goodbye<Enter>
goodbye
>>> S.drain $ S.unfoldr P.next (P.stdinLn P.>-> P.take 2 P.>-> P.stdoutLn)
hello<Enter>
hello
goodbye<Enter>
goodbye

If the intended "coalgebra" is complicated it might be pleasant to write it with the state monad:

\state seed -> S.unfoldr  (runExceptT  . runStateT state) seed :: Monad m => StateT s (ExceptT r m) a -> s -> P.Producer a m r
>>> let state = do {n <- get ; if n >= 3 then lift (throwE "Got to three"); else put (n+1); return n}
>>> S.print $ S.unfoldr (runExceptT  . runStateT state) 0
0
1
2
"Got to three"

stdinLn :: MonadIO m => Stream (Of String) m () Source

repeatedly stream lines as String from stdin

>>> stdoutLn $ S.show (S.each [1..3])
1
2
3
>>> stdoutLn stdinLn
hello<Enter>
hello
world<Enter>
world
^CInterrupted.
>>> stdoutLn $ S.map reverse stdinLn
hello<Enter>
olleh
world<Enter>
dlrow
^CInterrupted.

readLn :: (MonadIO m, Read a) => Stream (Of a) m () Source

Read values from stdin, ignoring failed parses

>>> S.sum $ S.take 2 S.readLn :: IO Int
3<Enter>
#$%^&\^?<Enter>
1000<Enter>
1003

fromHandle :: MonadIO m => Handle -> Stream (Of String) m () Source

Read Strings from a Handle using hGetLine

Terminates on end of input

>>> withFile "distribute.hs" ReadMode $ stdoutLn . S.take 3 . fromHandle
import Streaming
import qualified Streaming.Prelude as S
import Control.Monad.Trans.State.Strict

iterate :: (a -> a) -> a -> Stream (Of a) m r Source

Iterate a pure function from a seed value, streaming the results forever

repeat :: a -> Stream (Of a) m r Source

Repeat an element ad inf. .

>>> S.print $ S.take 3 $ S.repeat 1
1
1
1

cycle :: (Monad m, Functor f) => Stream f m r -> Stream f m s Source

Cycle repeatedly through the layers of a stream, ad inf. This function is functor-general

cycle = forever
>>> rest <- S.print $ S.splitAt 3 $ S.cycle (yield True >> yield False)
True
False
True
>>> S.print $ S.take 3 rest
False
True
False

repeatM :: Monad m => m a -> Stream (Of a) m r Source

Repeat a monadic action ad inf., streaming its results.

>>> S.toListM $ S.take 2 (repeatM getLine)
hello<Enter>
world<Enter>
["hello","world"]

replicateM :: Monad m => Int -> m a -> Stream (Of a) m () Source

Repeat an action several times, streaming the results.

>>> S.print $ S.replicateM 2 getCurrentTime
2015-08-18 00:57:36.124508 UTC
2015-08-18 00:57:36.124785 UTC

enumFrom :: (Monad m, Enum n) => n -> Stream (Of n) m r Source

enumFromThen :: (Monad m, Enum a) => a -> a -> Stream (Of a) m r Source

Consuming streams of elements

stdoutLn :: MonadIO m => Stream (Of String) m () -> m () Source

Write Strings to stdout using putStrLn; terminates on a broken output pipe

>>> S.stdoutLn $ S.show (S.each [1..3])
1
2
3

stdoutLn' :: MonadIO m => Stream (Of String) m r -> m r Source

Write Strings to stdout using putStrLn

This does not handle a broken output pipe, but has a polymorphic return value, which makes this possible:

>>> rest <- stdoutLn' $ S.splitAt 3 $ S.show (each [1..5])
1
2
3
>>> stdoutLn' rest
4
5

Or indeed:

>>> rest <- stdoutLn' $ S.show $ S.splitAt 3 (each [1..5])
1
2
3
>>> S.sum rest
9

mapM_ :: Monad m => (a -> m b) -> Stream (Of a) m r -> m r Source

Reduce a stream to its return value with a monadic action.

>>> mapM_ Prelude.print $ each [1..3] >> return True
1
2
3
True

print :: (MonadIO m, Show a) => Stream (Of a) m r -> m r Source

toHandle :: MonadIO m => Handle -> Stream (Of String) m r -> m r Source

drain :: Monad m => Stream (Of a) m r -> m r Source

Reduce a stream, performing its actions but ignoring its elements.

>>> let stream = do {yield 1; lift (putStrLn "Effect!"); yield 2; lift (putStrLn "Effect!"); return (2^100)}
>>> S.drain stream
Effect!
Effect!
1267650600228229401496703205376
>>> S.drain $ S.takeWhile (<2) stream
Effect!

Stream transformers

map :: Monad m => (a -> b) -> Stream (Of a) m r -> Stream (Of b) m r Source

Standard map on the elements of a stream.

mapM :: Monad m => (a -> m b) -> Stream (Of a) m r -> Stream (Of b) m r Source

Replace each element of a stream with the result of a monadic action

maps :: (Monad m, Functor f) => (forall x. f x -> g x) -> Stream f m r -> Stream g m r Source

Map layers of one functor to another with a transformation

sequence :: Monad m => Stream (Of (m a)) m r -> Stream (Of a) m r Source

Like the sequence but streaming. The result type is a stream of a's, but is not accumulated; the effects of the elements of the original stream are interleaved in the resulting stream. Compare:

sequence :: Monad m =>       [m a]           -> m [a]
sequence :: Monad m => Stream (Of (m a)) m r -> Stream (Of a) m r

mapFoldable :: (Monad m, Foldable t) => (a -> t b) -> Stream (Of a) m r -> Stream (Of b) m r Source

For each element of a stream, stream a foldable container of elements instead

>>> S.print $ S.mapFoldable show $ yield 12
'1'
'2'

filter :: Monad m => (a -> Bool) -> Stream (Of a) m r -> Stream (Of a) m r Source

Skip elements of a stream that fail a predicate

filterM :: Monad m => (a -> m Bool) -> Stream (Of a) m r -> Stream (Of a) m r Source

Skip elements of a stream that fail a monadic test

for :: (Monad m, Functor f) => Stream (Of a) m r -> (a -> Stream f m x) -> Stream f m r Source

for replaces each element of a stream with an associated stream. Note that the associated stream may layer any functor.

take :: (Monad m, Functor f) => Int -> Stream f m r -> Stream f m () Source

End a stream after n elements; the original return value is thus lost. splitAt preserves this information. Note that, like splitAt, this function is functor-general, so that, for example, you can take not just a number of items from a stream of elements, but a number of substreams and the like.

>>> S.print $ mapsM sum' $ S.take 2 $ chunksOf 3 $ each [1..]
6   -- sum of first group of 3
15  -- sum of second group of 3
>>> S.print $ mapsM S.sum' $ S.take 2 $ chunksOf 3 $ S.each [1..4] >> S.readLn
6     -- sum of first group of 3, which is already in [1..4]
100   -- user input
10000 -- user input
10104 -- sum of second group of 3

takeWhile :: Monad m => (a -> Bool) -> Stream (Of a) m r -> Stream (Of a) m () Source

End stream when an element fails a condition; the original return value is lost span preserves this information.

drop :: Monad m => Int -> Stream (Of a) m r -> Stream (Of a) m r Source

Ignore the first n elements of a stream, but carry out the actions

dropWhile :: Monad m => (a -> Bool) -> Stream (Of a) m r -> Stream (Of a) m r Source

Ignore elements of a stream until a test succeeds.

>>> IO.withFile "distribute.hs" IO.ReadMode $ S.stdoutLn . S.take 2 . S.dropWhile (isPrefixOf "import") . S.fromHandle
main :: IO ()
main = do

concat :: (Monad m, Foldable f) => Stream (Of (f a)) m r -> Stream (Of a) m r Source

Make a stream of traversable containers into a stream of their separate elements

>>> S.print $ S.concat (each ["xy","z"])
'x'
'y'
'z'
>>> S.print $ S.concat (S.each [Just 1, Nothing, Just 2])
1
2
>>> S.print $  S.concat (S.each [Right 1, Left "Error!", Right 2])
1
2

Not to be confused with the functor-general

concats :: (Monad m, Functor f) => Stream (Stream f m) m r -> Stream f m r -- specializing
>>> S.stdoutLn $ concats $ maps (<* yield "--\n--") $ chunksOf 2 $ S.show (each [1..5])
1
2
--
--
3
4
--
--
5
--
--

scan :: Monad m => (x -> a -> x) -> x -> (x -> b) -> Stream (Of a) m r -> Stream (Of b) m r Source

Strict left scan, streaming, e.g. successive partial results.

Control.Foldl.purely scan :: Monad m => Fold a b -> Stream (Of a) m r -> Stream (Of b) m r
>>> S.print $ L.purely S.scan L.list $ each [3..5]
[]
[3]
[3,4]
[3,4,5]

scanM :: Monad m => (x -> a -> m x) -> m x -> (x -> m b) -> Stream (Of a) m r -> Stream (Of b) m r Source

Strict, monadic left scan

Control.Foldl.impurely scanM :: Monad m => FoldM a m b -> Stream (Of a) m r -> Stream (Of b) m r
>>> let v =  L.impurely scanM L.vector $ each [1..4::Int] :: Stream (Of (U.Vector Int)) IO ()
>>> S.print v
fromList []
fromList [1]
fromList [1,2]
fromList [1,2,3]
fromList [1,2,3,4]

chain :: Monad m => (a -> m ()) -> Stream (Of a) m r -> Stream (Of a) m r Source

Apply an action to all values flowing downstream

>>> S.product (chain print (S.each [2..4])) >>= print
2
3
4
24

read :: (Monad m, Read a) => Stream (Of String) m r -> Stream (Of a) m r Source

Make a stream of strings into a stream of parsed values, skipping bad cases

show :: (Monad m, Show a) => Stream (Of a) m r -> Stream (Of String) m r Source

cons :: Monad m => a -> Stream (Of a) m r -> Stream (Of a) m r Source

The natural cons for a Stream (Of a).

cons a stream = yield a >> stream

Useful for interoperation:

Data.Text.foldr S.cons (return ()) :: Text -> Stream (Of Char) m ()
Lazy.foldrChunks S.cons (return ()) :: Lazy.ByteString -> Stream (Of Strict.ByteString) m ()

and so on.

Splitting and inspecting streams of elements

next :: Monad m => Stream (Of a) m r -> m (Either r (a, Stream (Of a) m r)) Source

The standard way of inspecting the first item in a stream of elements, if the stream is still 'running'. The Right case contains a Haskell pair, where the more general inspect would return a left-strict pair. There is no reason to prefer inspect since, if the Right case is exposed, the first element in the pair will have been evaluated to whnf.

next :: Monad m => Stream (Of a) m r -> m (Either r (a, Stream (Of a) m r))
inspect :: Monad m => Stream (Of a) m r -> m (Either r (Of a (Stream (Of a) m r)))

Interoperate with pipes producers thus:

Pipes.unfoldr Stream.next :: Stream (Of a) m r -> Producer a m r
Stream.unfoldr Pipes.next :: Producer a m r -> Stream (Of a) m r 

Similarly:

IOStreams.unfoldM (liftM (either (const Nothing) Just) . next) :: Stream (Of a) IO b -> IO (InputStream a)
Conduit.unfoldM (liftM (either (const Nothing) Just) . next)   :: Stream (Of a) m r -> Source a m r

But see uncons

uncons :: Monad m => Stream (Of a) m () -> m (Maybe (a, Stream (Of a) m ())) Source

Inspect the first item in a stream of elements, without a return value. uncons provides convenient exit into another streaming type:

IOStreams.unfoldM uncons :: Stream (Of a) IO b -> IO (InputStream a)
Conduit.unfoldM uncons   :: Stream (Of a) m r -> Conduit.Source m a

splitAt :: (Monad m, Functor f) => Int -> Stream f m r -> Stream f m (Stream f m r) Source

Split a succession of layers after some number, returning a streaming or -- effectful pair. This function is the same as the splitsAt exported by the -- Streaming module, but since this module is imported qualified, it can -- usurp a Prelude name. It specializes to:

 splitAt :: (Monad m, Functor f) => Int -> Stream (Of a) m r -> Stream (Of a) m (Stream (Of a) m r)

break :: Monad m => (a -> Bool) -> Stream (Of a) m r -> Stream (Of a) m (Stream (Of a) m r) Source

Break a sequence when a element falls under a predicate, keeping the rest of the stream as the return value.

>>> rest <- S.print $ S.break even $ each [1,1,2,3]
1
1
>>> S.print rest
2
3

span :: Monad m => (a -> Bool) -> Stream (Of a) m r -> Stream (Of a) m (Stream (Of a) m r) Source

Stream elements until one fails the condition, return the rest.

group :: (Monad m, Eq a) => Stream (Of a) m r -> Stream (Stream (Of a) m) m r Source

groupBy :: Monad m => (a -> a -> Bool) -> Stream (Of a) m r -> Stream (Stream (Of a) m) m r Source

Folds

Use these to fold the elements of a Stream.

>>> S.fold (+) 0 id $ S.each [1..0]
50

The general folds fold, fold'', foldM and 'foldM\'' are arranged for use with Foldl

>>> L.purely fold L.sum $ each [1..10]
55
>>> L.purely fold (liftA3 (,,) L.sum L.product L.list) $ each [1..10]
(55,3628800,[1,2,3,4,5,6,7,8,9,10])

All functions marked with a single quote (e.g. fold', sum' carry the stream's return value in a left-strict pair. These are convenient for mapsM-ing over a Stream (Stream (Of a) m) m r, which is to be compared with [[a]]. Specializing, we have e.g.

 mapsM sum' :: (Monad m, Num n) => Stream (Stream (Of Int)) IO () -> Stream (Of n) IO ()
 mapsM (fold' mappend mempty id) :: Stream (Stream (Of Int)) IO () -> Stream (Of Int) IO ()
>>> S.print $ mapsM sum' $ chunksOf 3 $ each [1..10]
6
15
24
10
>>> let three_folds = L.purely S.fold' (liftA3 (,,) L.sum L.product L.list)
>>> S.print $ mapsM three_folds $ chunksOf 3 (each [1..10])
(6,6,[1,2,3])
(15,120,[4,5,6])
(24,504,[7,8,9])
(10,10,[10])

fold :: Monad m => (x -> a -> x) -> x -> (x -> b) -> Stream (Of a) m () -> m b Source

Strict fold of a Stream of elements

Control.Foldl.purely fold :: Monad m => Fold a b -> Stream (Of a) m () -> m b

fold' :: Monad m => (x -> a -> x) -> x -> (x -> b) -> Stream (Of a) m r -> m (Of b r) Source

Strict fold of a Stream of elements that preserves the return value.

>>> S.sum' $ each [1..10]
55 :> ()
>>> (n :> rest)  <- sum' $ S.splitAt 3 (each [1..10])
>>> print n
6
>>> (m :> rest') <- sum' $ S.splitAt 3 rest
>>> print m
15
>>> S.print rest'
7
8
9

The type provides for interoperation with the foldl library.

Control.Foldl.purely fold' :: Monad m => Fold a b -> Stream (Of a) m r -> m (Of b r)

Thus, specializing a bit:

L.purely fold' L.sum :: Stream (Of Int) Int r -> m (Of Int r)
maps (L.purely fold' L.sum) :: Stream (Stream (Of Int)) IO r -> Stream (Of Int) IO r
>>> S.print $ mapsM (L.purely S.fold' (liftA2 (,) L.list L.sum)) $ chunksOf 3 $ each [1..10]
([1,2,3],6)
([4,5,6],15)
([7,8,9],24)
([10],10)

foldM :: Monad m => (x -> a -> m x) -> m x -> (x -> m b) -> Stream (Of a) m () -> m b Source

Strict, monadic fold of the elements of a 'Stream (Of a)'

Control.Foldl.impurely foldM :: Monad m => FoldM a b -> Stream (Of a) m () -> m b

foldM' :: Monad m => (x -> a -> m x) -> m x -> (x -> m b) -> Stream (Of a) m r -> m (Of b r) Source

Strict, monadic fold of the elements of a 'Stream (Of a)'

Control.Foldl.impurely foldM' :: Monad m => FoldM a b -> Stream (Of a) m r -> m (b, r)

sum :: (Monad m, Num a) => Stream (Of a) m () -> m a Source

Fold a Stream of numbers into their sum

sum' :: (Monad m, Num a) => Stream (Of a) m r -> m (Of a r) Source

Fold a Stream of numbers into their sum with the return value

 maps' sum' :: Stream (Stream (Of Int)) m r -> Stream (Of Int) m r

product :: (Monad m, Num a) => Stream (Of a) m () -> m a Source

Fold a Stream of numbers into their product

product' :: (Monad m, Num a) => Stream (Of a) m r -> m (Of a r) Source

Fold a Stream of numbers into their product with the return value

 maps' product' :: Stream (Stream (Of Int)) m r -> Stream (Of Int) m r

length :: Monad m => Stream (Of a) m () -> m Int Source

length' :: Monad m => Stream (Of a) m r -> m (Of Int r) Source

toList :: Stream (Of a) Identity () -> [a] Source

Convert a pure Stream (Of a) into a list of as

toListM :: Monad m => Stream (Of a) m () -> m [a] Source

Convert an effectful 'Stream (Of a)' into a list of as

Note: Needless to say this function does not stream properly. It is basically the same as mapM which, like replicateM, sequence and similar operations on traversable containers is a leading cause of space leaks.

toListM' :: Monad m => Stream (Of a) m r -> m (Of [a] r) Source

Convert an effectful Stream into a list alongside the return value

 maps' toListM' :: Stream (Stream (Of a)) m r -> Stream (Of [a]) m 

foldrM :: Monad m => (a -> m r -> m r) -> Stream (Of a) m r -> m r Source

A natural right fold for consuming a stream of elements. See also the more general iterT in the Streaming module and the still more general destroy

foldrT :: (Monad m, MonadTrans t, Monad (t m)) => (a -> t m r -> t m r) -> Stream (Of a) m r -> t m r Source

A natural right fold for consuming a stream of elements. See also the more general iterTM in the Streaming module and the still more general destroy

foldrT (\a p -> Pipes.yield a >> p) :: Monad m => Stream (Of a) m r -> Producer a m r
foldrT (\a p -> Conduit.yield a >> p) :: Monad m => Stream (Of a) m r -> Conduit a m r

Zips

zip :: Monad m => Stream (Of a) m r -> Stream (Of b) m r -> Stream (Of (a, b)) m r Source

Zip two Streamss

zipWith :: Monad m => (a -> b -> c) -> Stream (Of a) m r -> Stream (Of b) m r -> Stream (Of c) m r Source

Zip two Streamss using the provided combining function

Interoperation

reread :: Monad m => (s -> m (Maybe a)) -> s -> Stream (Of a) m () Source

Read an IORef (Maybe a) or a similar device until it reads Nothing. reread provides convenient exit from the io-streams library

reread readIORef    :: IORef (Maybe a) -> Stream (Of a) IO ()
reread Streams.read :: System.IO.Streams.InputStream a -> Stream (Of a) IO ()

Basic Type

data Stream f m r Source

Instances

Functor f => MFunctor (Stream f) Source 
Functor f => MMonad (Stream f) Source 
Functor f => MonadTrans (Stream f) Source 
(Functor f, Monad m) => Monad (Stream f m) Source 
(Functor f, Monad m) => Functor (Stream f m) Source 
(Functor f, Monad m) => Applicative (Stream f m) Source 
(MonadIO m, Functor f) => MonadIO (Stream f m) Source 
(Eq r, Eq (m (Stream f m r)), Eq (f (Stream f m r))) => Eq (Stream f m r) Source 
(Typeable (* -> *) f, Typeable (* -> *) m, Data r, Data (m (Stream f m r)), Data (f (Stream f m r))) => Data (Stream f m r) Source 
(Show r, Show (m (Stream f m r)), Show (f (Stream f m r))) => Show (Stream f m r) Source