Safe Haskell | None |
---|---|

Language | Haskell2010 |

- data Stream f m r
- construct :: (forall b. (f b -> b) -> (m b -> b) -> (r -> b) -> b) -> Stream f m r
- unfold :: (Monad m, Functor f) => (s -> m (Either r (f s))) -> s -> Stream f m r
- replicates :: (Monad m, Functor f) => Int -> f () -> Stream f m ()
- repeats :: (Monad m, Functor f) => f () -> Stream f m r
- repeatsM :: (Monad m, Functor f) => m (f ()) -> Stream f m r
- delay :: (Monad m, Functor f) => m (Stream f m r) -> Stream f m r
- wrap :: (Monad m, Functor f) => f (Stream f m r) -> Stream f m r
- layer :: (Monad m, Functor f) => f r -> Stream f m r
- intercalates :: (Monad m, Monad (t m), MonadTrans t) => t m a -> Stream (t m) m b -> t m b
- concats :: (Monad m, Functor f) => Stream (Stream f m) m r -> Stream f m r
- iterT :: (Functor f, Monad m) => (f (m a) -> m a) -> Stream f m a -> m a
- iterTM :: (Functor f, Monad m, MonadTrans t, Monad (t m)) => (f (t m a) -> t m a) -> Stream f m a -> t m a
- destroy :: (Functor f, Monad m) => Stream f m r -> (f b -> b) -> (m b -> b) -> (r -> b) -> b
- destroyWith :: (Functor f, Monad m) => (m b -> b) -> (r -> b) -> (f b -> b) -> Stream f m r -> b
- inspect :: (Functor f, Monad m) => Stream f m r -> m (Either r (f (Stream f m r)))
- maps :: (Monad m, Functor f) => (forall x. f x -> g x) -> Stream f m r -> Stream g m r
- mapsM :: (Monad m, Functor f) => (forall x. f x -> m (g x)) -> Stream f m r -> Stream g m r
- decompose :: (Monad m, Functor f) => Stream (Compose m f) m r -> Stream f m r
- mapsM_ :: (Functor f, Monad m) => (forall x. f x -> m x) -> Stream f m r -> m r
- eithers :: (Monad m, Applicative h) => (forall x. f x -> h x) -> (forall x. g x -> h x) -> Stream (Sum f g) m r -> Stream h m r
- run :: Monad m => Stream m m r -> m r
- distribute :: (Monad m, Functor f, MonadTrans t, MFunctor t, Monad (t (Stream f m))) => Stream f (t m) r -> t (Stream f m) r
- chunksOf :: (Monad m, Functor f) => Int -> Stream f m r -> Stream (Stream f m) m r
- splitsAt :: (Monad m, Functor f) => Int -> Stream f m r -> Stream f m (Stream f m r)
- takes :: (Monad m, Functor f) => Int -> Stream f m r -> Stream f m ()
- zipsWith :: (Monad m, Functor h) => (forall x y. f x -> g y -> h (x, y)) -> Stream f m r -> Stream g m r -> Stream h m r
- zips :: (Monad m, Functor f, Functor g) => Stream f m r -> Stream g m r -> Stream (Compose f g) m r
- interleaves :: (Monad m, Applicative h) => Stream h m r -> Stream h m r -> Stream h m r
- unexposed :: (Functor f, Monad m) => Stream f m r -> Stream f m r
- hoistExposed :: (Monad m1, Functor f) => (m1 (Stream f m r) -> m (Stream f m r)) -> Stream f m1 r -> Stream f m r
- mapsExposed :: (Monad m, Functor f) => (forall x. f x -> g x) -> Stream f m r -> Stream g m r
- mapsMExposed :: (Monad m, Functor f1) => (f1 (Stream f m r) -> m (f (Stream f m r))) -> Stream f1 m r -> Stream f m r
- destroyExposed :: (Monad m, Functor f) => Stream f m t -> (f b -> b) -> (m b -> b) -> (t -> b) -> b

# The free monad transformer

The `Stream`

data type is equivalent to `FreeT`

and can represent any effectful
succession of steps, where the form of the steps or `commands`

is
specified by the first (functor) parameter.

data Stream f m r = Step !(f (Stream f m r)) | Delay (m (Stream f m r)) | Return r

The *producer* concept uses the simple functor ` (a,_) `

- or the stricter
` Of a _ `

. Then the news at each step or layer is just: an individual item of type `a`

.
Since `Stream (Of a) m r`

is equivalent to `Pipe.Producer a m r`

, much of
the `pipes`

`Prelude`

can easily be mirrored in a `streaming`

`Prelude`

. Similarly,
a simple `Consumer a m r`

or `Parser a m r`

concept arises when the base functor is
` (a -> _) `

. `Stream ((->) input) m result`

consumes `input`

until it returns a
`result`

.

To avoid breaking reasoning principles, the constructors
should not be used directly. A pattern-match should go by way of `inspect`

- or, in the producer case, `next`

The constructors are exported by the `Internal`

module.

Functor f => MFunctor (Stream f) Source | |

Functor f => MMonad (Stream f) Source | |

Functor f => MonadTrans (Stream f) Source | |

(Functor f, Monad m) => Monad (Stream f m) Source | |

(Functor f, Monad m) => Functor (Stream f m) Source | |

(Functor f, Monad m) => Applicative (Stream f m) Source | |

(MonadIO m, Functor f) => MonadIO (Stream f m) Source | |

(Eq r, Eq (m (Stream f m r)), Eq (f (Stream f m r))) => Eq (Stream f m r) Source | |

(Typeable (* -> *) f, Typeable (* -> *) m, Data r, Data (m (Stream f m r)), Data (f (Stream f m r))) => Data (Stream f m r) Source | |

(Show r, Show (m (Stream f m r)), Show (f (Stream f m r))) => Show (Stream f m r) Source |

# Introducing a stream

construct :: (forall b. (f b -> b) -> (m b -> b) -> (r -> b) -> b) -> Stream f m r Source

Reflect a church-encoded stream; cp. `GHC.Exts.build`

unfold :: (Monad m, Functor f) => (s -> m (Either r (f s))) -> s -> Stream f m r Source

Build a `Stream`

by unfolding steps starting from a seed. See also
the specialized `unfoldr`

in the prelude.

unfold inspect = id -- modulo the quotient we work with unfold Pipes.next :: Monad m => Producer a m r -> Stream ((,) a) m r unfold (curry (:>) . Pipes.next) :: Monad m => Producer a m r -> Stream (Of a) m r

replicates :: (Monad m, Functor f) => Int -> f () -> Stream f m () Source

Repeat a functorial layer, command or instruct several times.

repeats :: (Monad m, Functor f) => f () -> Stream f m r Source

Repeat a functorial layer, command or instruction forever.

layer :: (Monad m, Functor f) => f r -> Stream f m r Source

Lift for items in the base functor. Makes a singleton or one-layer succession.`

# Eliminating a stream

intercalates :: (Monad m, Monad (t m), MonadTrans t) => t m a -> Stream (t m) m b -> t m b Source

Interpolate a layer at each segment. This specializes to e.g.

intercalates :: (Monad m, Functor f) => Stream f m () -> Stream (Stream f m) m r -> Stream f m r

concats :: (Monad m, Functor f) => Stream (Stream f m) m r -> Stream f m r Source

Dissolves the segmentation into layers of `Stream f m`

layers.

concats stream = destroy stream join (join . lift) return

`>>>`

1776 1 2 1776 3 4 1776 5`S.print $ concats $ maps (cons 1776) $ chunksOf 2 (each [1..5])`

iterT :: (Functor f, Monad m) => (f (m a) -> m a) -> Stream f m a -> m a Source

Specialized fold

iterT alg stream = destroy stream alg join return

iterTM :: (Functor f, Monad m, MonadTrans t, Monad (t m)) => (f (t m a) -> t m a) -> Stream f m a -> t m a Source

Specialized fold

iterTM alg stream = destroy stream alg (join . lift) return

destroy :: (Functor f, Monad m) => Stream f m r -> (f b -> b) -> (m b -> b) -> (r -> b) -> b Source

Map a stream directly to its church encoding; compare `Data.List.foldr`

It permits distinctions that should be hidden, as can be seen from
e.g.

isPure stream = destroy_ (const True) (const False) (const True)

and similar nonsense. The crucial
constraint is that the `m x -> x`

argument is an *Eilenberg-Moore algebra*.
See Atkey "Reasoning about Stream Processing with Effects"

The destroy exported by the safe modules is

destroy str = destroy (observe str)

destroyWith :: (Functor f, Monad m) => (m b -> b) -> (r -> b) -> (f b -> b) -> Stream f m r -> b Source

`destroyWith`

reorders the arguments of `destroy`

to be more akin
to `foldr`

It is more convenient to query in ghci to figure out
what kind of 'algebra' you need to write.

`>>>`

(Monad m, Functor f) => (f (m a) -> m a) -> Stream f m a -> m a -- iterT`:t destroyWith join return`

`>>>`

(Monad m, Monad (t m), Functor f, MonadTrans t) => (f (t m a) -> t m a) -> Stream f m a -> t m a -- iterTM`:t destroyWith (join . lift) return`

`>>>`

(Monad m, Functor f, Functor f1) => (f (Stream f1 m r) -> Stream f1 m r) -> Stream f m r -> Stream f1 m r`:t destroyWith delay return`

`>>>`

Monad m => Stream (Of a) m r -> Stream ((,) a) m r`:t destroyWith delay return (wrap . lazily)`

`>>>`

Monad m => Stream ((,) a) m r -> Stream (Of a) m r`:t destroyWith delay return (wrap . strictly)`

`>>>`

(Monad m, Functor f) => (f (ByteString m r) -> ByteString m r) -> Stream f m r -> ByteString m r`:t destroyWith Data.ByteString.Streaming.delay return`

`>>>`

Monad m => Stream (Of B.ByteString) m r -> ByteString m r -- fromChunks`:t destroyWith Data.ByteString.Streaming.delay return (\(a:>b) -> consChunk a b)`

# Inspecting a stream wrap by wrap

inspect :: (Functor f, Monad m) => Stream f m r -> m (Either r (f (Stream f m r))) Source

Inspect the first stage of a freely layered sequence.
Compare `Pipes.next`

and the replica `Streaming.Prelude.next`

.
This is the `uncons`

for the general `unfold`

.

unfold inspect = id Streaming.Prelude.unfoldr StreamingPrelude.next = id

# Transforming streams

maps :: (Monad m, Functor f) => (forall x. f x -> g x) -> Stream f m r -> Stream g m r Source

Map layers of one functor to another with a transformation

mapsM :: (Monad m, Functor f) => (forall x. f x -> m (g x)) -> Stream f m r -> Stream g m r Source

Map layers of one functor to another with a transformation involving the base monad
`maps`

is more fundamental than `mapsM`

, which is best understood as a convenience
for effecting this frequent composition:

mapsM phi = decompose . maps (Compose . phi)

decompose :: (Monad m, Functor f) => Stream (Compose m f) m r -> Stream f m r Source

Resort a succession of layers of the form `m (f x)`

. Though `mapsM`

is best understood as:

mapsM phi = decompose . maps (Compose . phi)

we could as well define `decompose`

by `mapsM`

:

decompose = mapsM getCompose

mapsM_ :: (Functor f, Monad m) => (forall x. f x -> m x) -> Stream f m r -> m r Source

Map each layer to an effect in the base monad, and run them all.

eithers :: (Monad m, Applicative h) => (forall x. f x -> h x) -> (forall x. g x -> h x) -> Stream (Sum f g) m r -> Stream h m r Source

distribute :: (Monad m, Functor f, MonadTrans t, MFunctor t, Monad (t (Stream f m))) => Stream f (t m) r -> t (Stream f m) r Source

Make it possible to 'run' the underlying transformed monad.

# Splitting streams

chunksOf :: (Monad m, Functor f) => Int -> Stream f m r -> Stream (Stream f m) m r Source

Break a stream into substreams each with n functorial layers.

`>>>`

2 2 2 1`S.print $ maps' sum' $ chunksOf 2 $ each [1,1,1,1,1,1,1]`

splitsAt :: (Monad m, Functor f) => Int -> Stream f m r -> Stream f m (Stream f m r) Source

Split a succession of layers after some number, returning a streaming or effectful pair.

`>>>`

1`rest <- S.print $ S.splitAt 1 $ each [1..3]`

`>>>`

2 3`S.print rest`

# Zipping streams

zipsWith :: (Monad m, Functor h) => (forall x y. f x -> g y -> h (x, y)) -> Stream f m r -> Stream g m r -> Stream h m r Source

zips :: (Monad m, Functor f, Functor g) => Stream f m r -> Stream g m r -> Stream (Compose f g) m r Source

interleaves :: (Monad m, Applicative h) => Stream h m r -> Stream h m r -> Stream h m r Source

Interleave functor layers, with the effects of the first preceding the effects of the second.

interleaves = zipsWith (liftA2 (,))

`>>>`

`let paste = \a b -> interleaves (Q.lines a) (maps (Q.cons' '\t') (Q.lines b))`

`>>>`

hello goodbye world world`Q.stdout $ Q.unlines $ paste "hello\nworld\n" "goodbye\nworld\n"`

# For use in implementation

unexposed :: (Functor f, Monad m) => Stream f m r -> Stream f m r Source

This is akin to the `observe`

of `Pipes.Internal`

. It redelays the layering
in instances of `Stream f m r`

so that it replicates that of
`FreeT`

.

hoistExposed :: (Monad m1, Functor f) => (m1 (Stream f m r) -> m (Stream f m r)) -> Stream f m1 r -> Stream f m r Source

mapsExposed :: (Monad m, Functor f) => (forall x. f x -> g x) -> Stream f m r -> Stream g m r Source

mapsMExposed :: (Monad m, Functor f1) => (f1 (Stream f m r) -> m (f (Stream f m r))) -> Stream f1 m r -> Stream f m r Source

destroyExposed :: (Monad m, Functor f) => Stream f m t -> (f b -> b) -> (m b -> b) -> (t -> b) -> b Source