Safe Haskell | None |
---|---|
Language | Haskell2010 |
The primary use of concurrent machines is to establish a pipelined architecture that can boost overall throughput by running each stage of the pipeline at the same time. The processing, or production, rate of each stage may not be identical, so facilities are provided to loosen the temporal coupling between pipeline stages using buffers.
This architecture also lends itself to operations where multiple
workers are available for procesisng inputs. If each worker is to
process the same set of inputs, consider fanout
and
fanoutSteps
. If each worker is to process a disjoint set of
inputs, consider scatter
.
- module Data.Machine
- (>~>) :: MonadBaseControl IO m => MachineT m k b -> ProcessT m b c -> MachineT m k c
- (<~<) :: MonadBaseControl IO m => ProcessT m b c -> MachineT m k b -> MachineT m k c
- bufferConnect :: MonadBaseControl IO m => Int -> MachineT m k b -> ProcessT m b c -> MachineT m k c
- rollingConnect :: MonadBaseControl IO m => Int -> MachineT m k b -> ProcessT m b c -> MachineT m k c
- fanout :: (MonadBaseControl IO m, Semigroup r) => [ProcessT m a r] -> ProcessT m a r
- fanoutSteps :: (MonadBaseControl IO m, Monoid r) => [ProcessT m a r] -> ProcessT m a r
- wye :: forall m a a' b b' c. MonadBaseControl IO m => ProcessT m a a' -> ProcessT m b b' -> WyeT m a' b' c -> WyeT m a b c
- tee :: forall m a a' b b' c. MonadBaseControl IO m => ProcessT m a a' -> ProcessT m b b' -> TeeT m a' b' c -> TeeT m a b c
- scatter :: MonadBaseControl IO m => [MachineT m k o] -> MachineT m k o
- splitSum :: forall m a b c d. MonadBaseControl IO m => ProcessT m a b -> ProcessT m c d -> ProcessT m (Either a c) (Either b d)
- mergeSum :: forall m a b r. MonadBaseControl IO m => ProcessT m a r -> ProcessT m b r -> ProcessT m (Either a b) r
- splitProd :: forall m a b r. MonadBaseControl IO m => ProcessT m a r -> ProcessT m b r -> ProcessT m (a, b) r
Documentation
module Data.Machine
Concurrent connection
(>~>) :: MonadBaseControl IO m => MachineT m k b -> ProcessT m b c -> MachineT m k c infixl 7 Source #
Flipped (<~<
).
(<~<) :: MonadBaseControl IO m => ProcessT m b c -> MachineT m k b -> MachineT m k c Source #
Build a new Machine
by adding a Process
to the output of an
old Machine
. The upstream machine is run concurrently with
downstream with the aim that upstream will have a yielded value
ready as soon as downstream awaits. This effectively creates a
buffer between upstream and downstream, or source and sink, that
can contain up to one value.
(<~<
) ::Process
b c ->Process
a b ->Process
a c (<~<
) ::Process
c d ->Tee
a b c ->Tee
a b d (<~<
) ::Process
b c ->Machine
k b ->Machine
k c
Buffered machines
bufferConnect :: MonadBaseControl IO m => Int -> MachineT m k b -> ProcessT m b c -> MachineT m k c Source #
rollingConnect :: MonadBaseControl IO m => Int -> MachineT m k b -> ProcessT m b c -> MachineT m k c Source #
Concurrent processing of shared inputs
fanout :: (MonadBaseControl IO m, Semigroup r) => [ProcessT m a r] -> ProcessT m a r Source #
Share inputs with each of a list of processes in lockstep. Any values yielded by the processes for a given input are combined into a single yield from the composite process.
fanoutSteps :: (MonadBaseControl IO m, Monoid r) => [ProcessT m a r] -> ProcessT m a r Source #
Share inputs with each of a list of processes in lockstep. If
none of the processes yields a value, the composite process will
itself yield mempty
. The idea is to provide a handle on steps
only executed for their side effects. For instance, if you want to
run a collection of ProcessT
s that await but don't yield some
number of times, you can use 'fanOutSteps . map (fmap (const ()))'
followed by a taking
process.
Concurrent multiple-input machines
wye :: forall m a a' b b' c. MonadBaseControl IO m => ProcessT m a a' -> ProcessT m b b' -> WyeT m a' b' c -> WyeT m a b c Source #
tee :: forall m a a' b b' c. MonadBaseControl IO m => ProcessT m a a' -> ProcessT m b b' -> TeeT m a' b' c -> TeeT m a b c Source #
Compose a pair of pipes onto the front of a Tee.
scatter :: MonadBaseControl IO m => [MachineT m k o] -> MachineT m k o Source #
Produces values from whichever source MachineT
yields
first. This operation may also be viewed as a gather operation in
that all values produced by the given machines are interleaved when
fed downstream. Note that inputs are not shared. The composite
machine will await an input when any constituent machine awaits an
input. That input will be supplied to the awaiting constituent and
no other.
Some examples of more specific useful types scatter
may be used
at,
scatter :: [ProcessT m a b] -> ProcessT m a b scatter :: [SourceT m a] -> SourceT m a
The former may be used to stream data through a collection of
worker Process
es, the latter may be used to intersperse values
from a collection of sources.
splitSum :: forall m a b c d. MonadBaseControl IO m => ProcessT m a b -> ProcessT m c d -> ProcessT m (Either a c) (Either b d) Source #
Similar to +++
: split the input between two
processes, retagging and merging their outputs.
The two processes are run concurrently whenever possible.
mergeSum :: forall m a b r. MonadBaseControl IO m => ProcessT m a r -> ProcessT m b r -> ProcessT m (Either a b) r Source #
Similar to |||
: split the input between two
processes and merge their outputs.
Connect two processes to the downstream tails of a Machine
that
produces Either
s. The two downstream consumers are run
concurrently when possible. When one downstream consumer stops, the
other is allowed to run until it stops or the upstream source
yields a value the remaining consumer can not handle.
mergeSum sinkL sinkR
produces a topology like this,
sinkL / \ a \ / \ source -- Either a b --> -- r --> \ / b / \ / sinkR
splitProd :: forall m a b r. MonadBaseControl IO m => ProcessT m a r -> ProcessT m b r -> ProcessT m (a, b) r Source #
Connect two processes to the downstream tails of a Machine
that
produces tuples. The two downstream consumers are run
concurrently. When one downstream consumer stops, the entire
pipeline is stopped.
splitProd sink1 sink2
produces a topology like this,
sink1 / \ a \ / \ source -- (a,b) --> -- r --> \ / b / \ / sink2