Safe Haskell | None |
---|---|

Language | Haskell2010 |

The primary use of concurrent machines is to establish a pipelined architecture that can boost overall throughput by running each stage of the pipeline at the same time. The processing, or production, rate of each stage may not be identical, so facilities are provided to loosen the temporal coupling between pipeline stages using buffers.

This architecture also lends itself to operations where multiple
workers are available for procesisng inputs. If each worker is to
process the same set of inputs, consider `fanout`

and
`fanoutSteps`

. If each worker is to process a disjoint set of
inputs, consider `scatter`

.

- module Data.Machine
- (>~>) :: MonadBaseControl IO m => MachineT m k b -> ProcessT m b c -> MachineT m k c
- (<~<) :: MonadBaseControl IO m => ProcessT m b c -> MachineT m k b -> MachineT m k c
- buffer :: MonadBaseControl IO m => Int -> MachineT m k o -> MachineT m k o
- rolling :: MonadBaseControl IO m => Int -> MachineT m k o -> MachineT m k o
- bufferConnect :: MonadBaseControl IO m => Int -> MachineT m k b -> ProcessT m b c -> MachineT m k c
- rollingConnect :: MonadBaseControl IO m => Int -> MachineT m k b -> ProcessT m b c -> MachineT m k c
- fanout :: (MonadBaseControl IO m, Semigroup r) => [ProcessT m a r] -> ProcessT m a r
- fanoutSteps :: (MonadBaseControl IO m, Monoid r) => [ProcessT m a r] -> ProcessT m a r
- wye :: forall m a a' b b' c. MonadBaseControl IO m => ProcessT m a a' -> ProcessT m b b' -> WyeT m a' b' c -> WyeT m a b c
- tee :: forall m a a' b b' c. MonadBaseControl IO m => ProcessT m a a' -> ProcessT m b b' -> TeeT m a' b' c -> TeeT m a b c
- scatter :: MonadBaseControl IO m => [MachineT m k o] -> MachineT m k o
- splitSum :: forall m a b c d. MonadBaseControl IO m => ProcessT m a b -> ProcessT m c d -> ProcessT m (Either a c) (Either b d)
- mergeSum :: forall m a b r. MonadBaseControl IO m => ProcessT m a r -> ProcessT m b r -> ProcessT m (Either a b) r
- splitProd :: forall m a b r. MonadBaseControl IO m => ProcessT m a r -> ProcessT m b r -> ProcessT m (a, b) r

# Documentation

module Data.Machine

# Concurrent connection

(>~>) :: MonadBaseControl IO m => MachineT m k b -> ProcessT m b c -> MachineT m k c infixl 7 Source #

Flipped (`<~<`

).

(<~<) :: MonadBaseControl IO m => ProcessT m b c -> MachineT m k b -> MachineT m k c Source #

Build a new `Machine`

by adding a `Process`

to the output of an
old `Machine`

. The upstream machine is run concurrently with
downstream with the aim that upstream will have a yielded value
ready as soon as downstream awaits. This effectively creates a
buffer between upstream and downstream, or source and sink, that
can contain up to one value.

(`<~<`

) ::`Process`

b c ->`Process`

a b ->`Process`

a c (`<~<`

) ::`Process`

c d ->`Tee`

a b c ->`Tee`

a b d (`<~<`

) ::`Process`

b c ->`Machine`

k b ->`Machine`

k c

# Buffered machines

buffer :: MonadBaseControl IO m => Int -> MachineT m k o -> MachineT m k o Source #

Eagerly request values from the wrapped machine. Values are placed in a buffer of the given size. When the buffer is full (i.e. downstream is running behind), we stop pumping the wrapped machine.

rolling :: MonadBaseControl IO m => Int -> MachineT m k o -> MachineT m k o Source #

Eagerly request values from the wrapped machine. Values are placed in a rolling buffer of the given size. If downstream can not catch up, values yielded by the wrapped machine will be dropped.

bufferConnect :: MonadBaseControl IO m => Int -> MachineT m k b -> ProcessT m b c -> MachineT m k c Source #

rollingConnect :: MonadBaseControl IO m => Int -> MachineT m k b -> ProcessT m b c -> MachineT m k c Source #

# Concurrent processing of shared inputs

fanout :: (MonadBaseControl IO m, Semigroup r) => [ProcessT m a r] -> ProcessT m a r Source #

Share inputs with each of a list of processes in lockstep. Any values yielded by the processes for a given input are combined into a single yield from the composite process.

fanoutSteps :: (MonadBaseControl IO m, Monoid r) => [ProcessT m a r] -> ProcessT m a r Source #

Share inputs with each of a list of processes in lockstep. If
none of the processes yields a value, the composite process will
itself yield `mempty`

. The idea is to provide a handle on steps
only executed for their side effects. For instance, if you want to
run a collection of `ProcessT`

s that await but don't yield some
number of times, you can use 'fanOutSteps . map (fmap (const ()))'
followed by a `taking`

process.

# Concurrent multiple-input machines

wye :: forall m a a' b b' c. MonadBaseControl IO m => ProcessT m a a' -> ProcessT m b b' -> WyeT m a' b' c -> WyeT m a b c Source #

tee :: forall m a a' b b' c. MonadBaseControl IO m => ProcessT m a a' -> ProcessT m b b' -> TeeT m a' b' c -> TeeT m a b c Source #

Compose a pair of pipes onto the front of a Tee.

scatter :: MonadBaseControl IO m => [MachineT m k o] -> MachineT m k o Source #

Produces values from whichever source `MachineT`

yields
first. This operation may also be viewed as a *gather* operation in
that all values produced by the given machines are interleaved when
fed downstream. Note that inputs are *not* shared. The composite
machine will await an input when any constituent machine awaits an
input. That input will be supplied to the awaiting constituent and
no other.

Some examples of more specific useful types `scatter`

may be used
at,

scatter :: [ProcessT m a b] -> ProcessT m a b scatter :: [SourceT m a] -> SourceT m a

The former may be used to stream data through a collection of
worker `Process`

es, the latter may be used to intersperse values
from a collection of sources.

splitSum :: forall m a b c d. MonadBaseControl IO m => ProcessT m a b -> ProcessT m c d -> ProcessT m (Either a c) (Either b d) Source #

Similar to `+++`

: split the input between two
processes, retagging and merging their outputs.

The two processes are run concurrently whenever possible.

mergeSum :: forall m a b r. MonadBaseControl IO m => ProcessT m a r -> ProcessT m b r -> ProcessT m (Either a b) r Source #

Similar to `|||`

: split the input between two
processes and merge their outputs.

Connect two processes to the downstream tails of a `Machine`

that
produces `Either`

s. The two downstream consumers are run
concurrently when possible. When one downstream consumer stops, the
other is allowed to run until it stops or the upstream source
yields a value the remaining consumer can not handle.

`mergeSum sinkL sinkR`

produces a topology like this,

sinkL / \ a \ / \ source -- Either a b --> -- r --> \ / b / \ / sinkR

splitProd :: forall m a b r. MonadBaseControl IO m => ProcessT m a r -> ProcessT m b r -> ProcessT m (a, b) r Source #

Connect two processes to the downstream tails of a `Machine`

that
produces tuples. The two downstream consumers are run
concurrently. When one downstream consumer stops, the entire
pipeline is stopped.

`splitProd sink1 sink2`

produces a topology like this,

sink1 / \ a \ / \ source -- (a,b) --> -- r --> \ / b / \ / sink2