streaming- A free monad transformer optimized for streaming applications.

Safe HaskellNone




The free monad transformer

The Stream data type is equivalent to FreeT and can represent any effectful succession of steps, where the form of the steps or commands is specified by the first (functor) parameter.

data Stream f m r = Step !(f (Stream f m r)) | Delay (m (Stream f m r)) | Return r

The producer concept uses the simple functor (a,_) - or the stricter Of a _ . Then the news at each step or layer is just: an individual item of type a. Since Stream (Of a) m r is equivalent to Pipe.Producer a m r, much of the pipes Prelude can easily be mirrored in a streaming Prelude. Similarly, a simple Consumer a m r or Parser a m r concept arises when the base functor is (a -> _) . Stream ((->) input) m result consumes input until it returns a result.

To avoid breaking reasoning principles, the constructors should not be used directly. A pattern-match should go by way of inspect - or, in the producer case, next The constructors are exported by the Internal module.

data Stream f m r Source


Step !(f (Stream f m r)) 
Delay (m (Stream f m r)) 
Return r 


Functor f => MFunctor (Stream f) Source 
Functor f => MMonad (Stream f) Source 
Functor f => MonadTrans (Stream f) Source 
(Functor f, Monad m) => Monad (Stream f m) Source 
(Functor f, Monad m) => Functor (Stream f m) Source 
(Functor f, Monad m) => Applicative (Stream f m) Source 
(MonadIO m, Functor f) => MonadIO (Stream f m) Source 
(Eq r, Eq (m (Stream f m r)), Eq (f (Stream f m r))) => Eq (Stream f m r) Source 
(Typeable (* -> *) f, Typeable (* -> *) m, Data r, Data (m (Stream f m r)), Data (f (Stream f m r))) => Data (Stream f m r) Source 
(Show r, Show (m (Stream f m r)), Show (f (Stream f m r))) => Show (Stream f m r) Source 

Introducing a stream

construct :: (forall b. (f b -> b) -> (m b -> b) -> (r -> b) -> b) -> Stream f m r Source

Reflect a church-encoded stream; cp.

unfold :: (Monad m, Functor f) => (s -> m (Either r (f s))) -> s -> Stream f m r Source

Build a Stream by unfolding steps starting from a seed. See also the specialized unfoldr in the prelude.

unfold inspect = id -- modulo the quotient we work with
unfold :: Monad m => Producer a m r -> Stream ((,) a) m r
unfold (curry (:>) . :: Monad m => Producer a m r -> Stream (Of a) m r

replicates :: (Monad m, Functor f) => Int -> f () -> Stream f m () Source

Repeat a functorial layer, command or instruct several times.

repeats :: (Monad m, Functor f) => f () -> Stream f m r Source

Repeat a functorial layer, command or instruction forever.

repeatsM :: (Monad m, Functor f) => m (f ()) -> Stream f m r Source

Eliminating a stream

destroy :: (Functor f, Monad m) => Stream f m r -> (f b -> b) -> (m b -> b) -> (r -> b) -> b Source

Map a stream directly to its church encoding; compare Data.List.foldr It permits distinctions that should be hidden, as can be seen from e.g.

isPure stream = destroy_ (const True) (const False) (const True)

and similar nonsense. The crucial constraint is that the m x -> x argument is an Eilenberg-Moore algebra. See Atkey "Reasoning about Stream Processing with Effects"

The destroy exported by the safe modules is

destroy str = destroy (observe str)

concats :: (Monad m, Functor f) => Stream (Stream f m) m r -> Stream f m r Source

Dissolves the segmentation into layers of Stream f m layers.

concats stream = destroy stream join (join . lift) return
>>> S.print $ concats $ maps (cons 1776) $ chunksOf 2 (each [1..5])

intercalates :: (Monad m, Monad (t m), MonadTrans t) => t m a -> Stream (t m) m b -> t m b Source

Interpolate a layer at each segment. This specializes to e.g.

intercalates :: (Monad m, Functor f) => Stream f m () -> Stream (Stream f m) m r -> Stream f m r

iterT :: (Functor f, Monad m) => (f (m a) -> m a) -> Stream f m a -> m a Source

Specialized fold

iterT alg stream = destroy stream alg join return

iterTM :: (Functor f, Monad m, MonadTrans t, Monad (t m)) => (f (t m a) -> t m a) -> Stream f m a -> t m a Source

Specialized fold

iterTM alg stream = destroy stream alg (join . lift) return

Inspecting a stream step by step

inspect :: (Functor f, Monad m) => Stream f m r -> m (Either r (f (Stream f m r))) Source

Inspect the first stage of a freely layered sequence. Compare and the replica This is the uncons for the general unfold.

unfold inspect = id
Streaming.Prelude.unfoldr = id

Transforming streams

maps :: (Monad m, Functor f) => (forall x. f x -> g x) -> Stream f m r -> Stream g m r Source

Map layers of one functor to another with a transformation

mapsM :: (Monad m, Functor f) => (forall x. f x -> m (g x)) -> Stream f m r -> Stream g m r Source

Map layers of one functor to another with a transformation involving the base monad

distribute :: (Monad m, Functor f, MonadTrans t, MFunctor t, Monad (t (Stream f m))) => Stream f (t m) r -> t (Stream f m) r Source

Make it possible to 'run' the underlying transformed monad. A simple minded example might be:

debugFibs = flip runStateT 1 $ distribute $ loop 1 where
  loop n = do
    S.yield n
    s <- lift get 
    liftIO $ putStr "Current state is:  " >> print s
    lift $ put (s + n :: Int)
    loop s
>>> S.print $  S.take 4 $ S.drop 4 $ debugFibs
Current state is:  1
Current state is:  2
Current state is:  3
Current state is:  5
Current state is:  8
Current state is:  13
Current state is:  21

Splitting streams

chunksOf :: (Monad m, Functor f) => Int -> Stream f m r -> Stream (Stream f m) m r Source

Break a stream into substreams each with n functorial layers.

>>> S.print $ maps' sum' $ chunksOf 2 $ each [1,1,1,1,1,1,1]

splitsAt :: (Monad m, Functor f) => Int -> Stream f m r -> Stream f m (Stream f m r) Source

Split a succession of layers after some number, returning a streaming or effectful pair.

>>> rest <- S.print $ S.splitAt 1 $ each [1..3]
>>> S.print rest

For internal use

unexposed :: (Functor f, Monad m) => Stream f m r -> Stream f m r Source

hoistExposed :: (Monad m1, Functor f) => (m1 (Stream f m r) -> m (Stream f m r)) -> Stream f m1 r -> Stream f m r Source

mapsExposed :: (Monad m, Functor f) => (forall x. f x -> g x) -> Stream f m r -> Stream g m r Source

mapsMExposed :: (Monad m, Functor f1) => (f1 (Stream f m r) -> m (f (Stream f m r))) -> Stream f1 m r -> Stream f m r Source

destroyExposed :: (Monad m, Functor f) => Stream f m t -> (f b -> b) -> (m b -> b) -> (t -> b) -> b Source