streaming-0.1.2.0: an elementary streaming prelude and a general monad transformer for streaming applications.

Safe HaskellNone
LanguageHaskell2010

Streaming.Internal

Contents

Synopsis

The free monad transformer

The Stream data type is equivalent to FreeT and can represent any effectful succession of steps, where the form of the steps or commands is specified by the first (functor) parameter.

data Stream f m r = Step !(f (Stream f m r)) | Delay (m (Stream f m r)) | Return r

The producer concept uses the simple functor (a,_) - or the stricter Of a _ . Then the news at each step or layer is just: an individual item of type a. Since Stream (Of a) m r is equivalent to Pipe.Producer a m r, much of the pipes Prelude can easily be mirrored in a streaming Prelude. Similarly, a simple Consumer a m r or Parser a m r concept arises when the base functor is (a -> _) . Stream ((->) input) m result consumes input until it returns a result.

To avoid breaking reasoning principles, the constructors should not be used directly. A pattern-match should go by way of inspect - or, in the producer case, next

The constructors are exported by the Internal module.

data Stream f m r Source

Constructors

Step !(f (Stream f m r)) 
Delay (m (Stream f m r)) 
Return r 

Instances

Functor f => MFunctor (Stream f) Source 
Functor f => MMonad (Stream f) Source 
Functor f => MonadTrans (Stream f) Source 
(Functor f, Monad m) => Monad (Stream f m) Source 
(Functor f, Monad m) => Functor (Stream f m) Source 
(Functor f, Monad m) => Applicative (Stream f m) Source 
(MonadIO m, Functor f) => MonadIO (Stream f m) Source 
(Eq r, Eq (m (Stream f m r)), Eq (f (Stream f m r))) => Eq (Stream f m r) Source 
(Typeable (* -> *) f, Typeable (* -> *) m, Data r, Data (m (Stream f m r)), Data (f (Stream f m r))) => Data (Stream f m r) Source 
(Show r, Show (m (Stream f m r)), Show (f (Stream f m r))) => Show (Stream f m r) Source 

Introducing a stream

construct :: (forall b. (f b -> b) -> (m b -> b) -> (r -> b) -> b) -> Stream f m r Source

Reflect a church-encoded stream; cp. GHC.Exts.build

unfold :: (Monad m, Functor f) => (s -> m (Either r (f s))) -> s -> Stream f m r Source

Build a Stream by unfolding steps starting from a seed. See also the specialized unfoldr in the prelude.

unfold inspect = id -- modulo the quotient we work with
unfold Pipes.next :: Monad m => Producer a m r -> Stream ((,) a) m r
unfold (curry (:>) . Pipes.next) :: Monad m => Producer a m r -> Stream (Of a) m r

replicates :: (Monad m, Functor f) => Int -> f () -> Stream f m () Source

Repeat a functorial layer, command or instruct several times.

repeats :: (Monad m, Functor f) => f () -> Stream f m r Source

Repeat a functorial layer, command or instruction forever.

repeatsM :: (Monad m, Functor f) => m (f ()) -> Stream f m r Source

mwrap :: (Monad m, Functor f) => m (Stream f m r) -> Stream f m r Source

wrap :: (Monad m, Functor f) => f (Stream f m r) -> Stream f m r Source

yields :: (Monad m, Functor f) => f r -> Stream f m r Source

Lift for items in the base functor. Makes a singleton or one-layer succession. It is named by similarity to lift:

lift :: (Monad m, Functor f)     => m r -> Stream f m r
yields ::  (Monad m, Functor f) => f r -> Stream f m r

Eliminating a stream

intercalates :: (Monad m, Monad (t m), MonadTrans t) => t m a -> Stream (t m) m b -> t m b Source

Interpolate a layer at each segment. This specializes to e.g.

intercalates :: (Monad m, Functor f) => Stream f m () -> Stream (Stream f m) m r -> Stream f m r

concats :: (Monad m, Functor f) => Stream (Stream f m) m r -> Stream f m r Source

Dissolves the segmentation into layers of Stream f m layers.

concats stream = destroy stream join (join . lift) return
>>> S.print $ concats $ maps (cons 1776) $ chunksOf 2 (each [1..5])
1776
1
2
1776
3
4
1776
5

iterT :: (Functor f, Monad m) => (f (m a) -> m a) -> Stream f m a -> m a Source

Specialized fold

iterT alg stream = destroy stream alg join return

iterTM :: (Functor f, Monad m, MonadTrans t, Monad (t m)) => (f (t m a) -> t m a) -> Stream f m a -> t m a Source

Specialized fold

iterTM alg stream = destroy stream alg (join . lift) return

destroy :: (Functor f, Monad m) => Stream f m r -> (f b -> b) -> (m b -> b) -> (r -> b) -> b Source

Map a stream directly to its church encoding; compare Data.List.foldr It permits distinctions that should be hidden, as can be seen from e.g.

isPure stream = destroy_ (const True) (const False) (const True)

and similar nonsense. The crucial constraint is that the m x -> x argument is an Eilenberg-Moore algebra. See Atkey "Reasoning about Stream Processing with Effects"

The destroy exported by the safe modules is

destroy str = destroy (observe str)

Inspecting a stream wrap by wrap

inspect :: (Functor f, Monad m) => Stream f m r -> m (Either r (f (Stream f m r))) Source

Inspect the first stage of a freely layered sequence. Compare Pipes.next and the replica Streaming.Prelude.next. This is the uncons for the general unfold.

unfold inspect = id
Streaming.Prelude.unfoldr StreamingPrelude.next = id

Transforming streams

maps :: (Monad m, Functor f) => (forall x. f x -> g x) -> Stream f m r -> Stream g m r Source

Map layers of one functor to another with a transformation

mapsM :: (Monad m, Functor f) => (forall x. f x -> m (g x)) -> Stream f m r -> Stream g m r Source

Map layers of one functor to another with a transformation involving the base monad maps is more fundamental than mapsM, which is best understood as a convenience for effecting this frequent composition:

mapsM phi = decompose . maps (Compose . phi)

decompose :: (Monad m, Functor f) => Stream (Compose m f) m r -> Stream f m r Source

Resort a succession of layers of the form m (f x). Though mapsM is best understood as:

mapsM phi = decompose . maps (Compose . phi)

we could as well define decompose by mapsM:

decompose = mapsM getCompose

mapsM_ :: (Functor f, Monad m) => (forall x. f x -> m x) -> Stream f m r -> m r Source

Map each layer to an effect in the base monad, and run them all.

run :: Monad m => Stream m m r -> m r Source

Run the effects in a stream that merely layers effects.

distribute :: (Monad m, Functor f, MonadTrans t, MFunctor t, Monad (t (Stream f m))) => Stream f (t m) r -> t (Stream f m) r Source

Make it possible to 'run' the underlying transformed monad.

separate :: (Monad m, Functor f, Functor g) => Stream (Sum f g) m r -> Stream f (Stream g m) r Source

Given a stream on a sum of functors, make it a stream on the left functor, with the streaming on the other functor as the governing monad. This is useful for acting on one or the other functor with a fold.

>>> let odd_even = S.maps (S.distinguish even) $ S.each [1..10]
>>> :t S.effects $ separate odd_even

Now, for example, it is convenient to fold on the left and right values separately:

>>> toListM' $ toList' (separate odd_even)
[2,4,6,8,10] :> ([1,3,5,7,9] :> ())
>>> S.toListM' $ S.print $ separate $  odd_even
1
3
5
7
9
[2,4,6,8,10] :> ()

We can easily use this device in place of filter:

filter = S.effects . separate . maps (distinguish f)
>>> :t hoist S.effects $ separate odd_even
hoist S.effects $ separate odd_even :: Monad n => Stream (Of Int) n ()
>>> S.print $ effects $ separate odd_even
2
4
6
8
10
>>> S.print $ hoist effects $ separate odd_even
1
3
5
7
9

unseparate :: (Monad m, Functor f, Functor g) => Stream f (Stream g m) r -> Stream (Sum f g) m r Source

groups :: (Monad m, Functor f, Functor g) => Stream (Sum f g) m r -> Stream (Sum (Stream f m) (Stream g m)) m r Source

Group layers in an alternating stream into adjoining sub-streams of one type or another. =

Splitting streams

chunksOf :: (Monad m, Functor f) => Int -> Stream f m r -> Stream (Stream f m) m r Source

Break a stream into substreams each with n functorial layers.

>>> S.print $ maps' sum' $ chunksOf 2 $ each [1,1,1,1,1,1,1]
2
2
2
1

splitsAt :: (Monad m, Functor f) => Int -> Stream f m r -> Stream f m (Stream f m r) Source

Split a succession of layers after some number, returning a streaming or effectful pair.

>>> rest <- S.print $ S.splitAt 1 $ each [1..3]
1
>>> S.print rest
2
3

takes :: (Monad m, Functor f) => Int -> Stream f m r -> Stream f m () Source

Zipping streams

zipsWith :: (Monad m, Functor h) => (forall x y. f x -> g y -> h (x, y)) -> Stream f m r -> Stream g m r -> Stream h m r Source

zips :: (Monad m, Functor f, Functor g) => Stream f m r -> Stream g m r -> Stream (Compose f g) m r Source

interleaves :: (Monad m, Applicative h) => Stream h m r -> Stream h m r -> Stream h m r Source

Interleave functor layers, with the effects of the first preceding the effects of the second.

interleaves = zipsWith (liftA2 (,))
>>> let paste = \a b -> interleaves (Q.lines a) (maps (Q.cons' '\t') (Q.lines b))
>>> Q.stdout $ Q.unlines $ paste "hello\nworld\n" "goodbye\nworld\n"
hello	goodbye
world	world

Assorted Data.Functor.x help

switch :: Sum f g r -> Sum g f r Source

Swap the order of functors in a sum of functors.

>>> S.toListM' $ S.print $ separate $ maps S.switch $ maps (S.distinguish (=='a')) $ S.each "banana"
'a'
'a'
'a'
"bnn" :> ()
>>> S.toListM' $ S.print $ separate $ maps (S.distinguish (=='a')) $ S.each "banana"
'b'
'n'
'n'
"aaa" :> ()

For use in implementation

unexposed :: (Functor f, Monad m) => Stream f m r -> Stream f m r Source

This is akin to the observe of Pipes.Internal . It reeffects the layering in instances of Stream f m r so that it replicates that of FreeT.

hoistExposed :: (Monad m1, Functor f) => (m1 (Stream f m r) -> m (Stream f m r)) -> Stream f m1 r -> Stream f m r Source

mapsExposed :: (Monad m, Functor f) => (forall x. f x -> g x) -> Stream f m r -> Stream g m r Source

mapsMExposed :: (Monad m, Functor f1) => (f1 (Stream f m r) -> m (f (Stream f m r))) -> Stream f1 m r -> Stream f m r Source

destroyExposed :: (Monad m, Functor f) => Stream f m t -> (f b -> b) -> (m b -> b) -> (t -> b) -> b Source