concurrent-machines-0.3.1.3: Concurrent networked stream transducers

Safe HaskellNone
LanguageHaskell2010

Data.Machine.Concurrent

Contents

Description

The primary use of concurrent machines is to establish a pipelined architecture that can boost overall throughput by running each stage of the pipeline at the same time. The processing, or production, rate of each stage may not be identical, so facilities are provided to loosen the temporal coupling between pipeline stages using buffers.

This architecture also lends itself to operations where multiple workers are available for procesisng inputs. If each worker is to process the same set of inputs, consider fanout and fanoutSteps. If each worker is to process a disjoint set of inputs, consider scatter.

Synopsis

Documentation

Concurrent connection

(>~>) :: MonadBaseControl IO m => MachineT m k b -> ProcessT m b c -> MachineT m k c infixl 7 Source #

Flipped (<~<).

(<~<) :: MonadBaseControl IO m => ProcessT m b c -> MachineT m k b -> MachineT m k c Source #

Build a new Machine by adding a Process to the output of an old Machine. The upstream machine is run concurrently with downstream with the aim that upstream will have a yielded value ready as soon as downstream awaits. This effectively creates a buffer between upstream and downstream, or source and sink, that can contain up to one value.

(<~<) :: Process b c -> Process a b -> Process a c
(<~<) :: Process c d -> Tee a b c -> Tee a b d
(<~<) :: Process b c -> Machine k b -> Machine k c

Buffered machines

bufferConnect :: MonadBaseControl IO m => Int -> MachineT m k b -> ProcessT m b c -> MachineT m k c Source #

Mediate a MachineT and a ProcessT with a bounded capacity buffer. The source machine runs concurrently with the sink process, and is only blocked when the buffer is full.

rollingConnect :: MonadBaseControl IO m => Int -> MachineT m k b -> ProcessT m b c -> MachineT m k c Source #

Mediate a MachineT and a ProcessT with a rolling buffer. The source machine runs concurrently with the sink process and is never blocked. If the sink process can not keep up with upstream, yielded values will be dropped.

Concurrent processing of shared inputs

fanout :: (MonadBaseControl IO m, Semigroup r) => [ProcessT m a r] -> ProcessT m a r Source #

Share inputs with each of a list of processes in lockstep. Any values yielded by the processes for a given input are combined into a single yield from the composite process.

fanoutSteps :: (MonadBaseControl IO m, Monoid r) => [ProcessT m a r] -> ProcessT m a r Source #

Share inputs with each of a list of processes in lockstep. If none of the processes yields a value, the composite process will itself yield mempty. The idea is to provide a handle on steps only executed for their side effects. For instance, if you want to run a collection of ProcessTs that await but don't yield some number of times, you can use 'fanOutSteps . map (fmap (const ()))' followed by a taking process.

Concurrent multiple-input machines

wye :: forall m a a' b b' c. MonadBaseControl IO m => ProcessT m a a' -> ProcessT m b b' -> WyeT m a' b' c -> WyeT m a b c Source #

Precompose a Process onto each input of a Wye (or WyeT).

When the choice of input is free (using the Z input descriptor) the two sources will be interleaved.

tee :: forall m a a' b b' c. MonadBaseControl IO m => ProcessT m a a' -> ProcessT m b b' -> TeeT m a' b' c -> TeeT m a b c Source #

Compose a pair of pipes onto the front of a Tee.

scatter :: MonadBaseControl IO m => [MachineT m k o] -> MachineT m k o Source #

Produces values from whichever source MachineT yields first. This operation may also be viewed as a gather operation in that all values produced by the given machines are interleaved when fed downstream. Note that inputs are not shared. The composite machine will await an input when any constituent machine awaits an input. That input will be supplied to the awaiting constituent and no other.

Some examples of more specific useful types scatter may be used at,

scatter :: [ProcessT m a b] -> ProcessT m a b
scatter :: [SourceT m a] -> SourceT m a

The former may be used to stream data through a collection of worker Processes, the latter may be used to intersperse values from a collection of sources.

splitSum :: forall m a b c d. MonadBaseControl IO m => ProcessT m a b -> ProcessT m c d -> ProcessT m (Either a c) (Either b d) Source #

Similar to +++: split the input between two processes, retagging and merging their outputs.

The two processes are run concurrently whenever possible.

mergeSum :: forall m a b r. MonadBaseControl IO m => ProcessT m a r -> ProcessT m b r -> ProcessT m (Either a b) r Source #

Similar to |||: split the input between two processes and merge their outputs.

Connect two processes to the downstream tails of a Machine that produces Eithers. The two downstream consumers are run concurrently when possible. When one downstream consumer stops, the other is allowed to run until it stops or the upstream source yields a value the remaining consumer can not handle.

mergeSum sinkL sinkR produces a topology like this,

                                sinkL
                               /      \
                             a          \
                            /            \
   source -- Either a b -->                -- r -->
                            \            /
                             b          /
                              \       /
                                sinkR 

splitProd :: forall m a b r. MonadBaseControl IO m => ProcessT m a r -> ProcessT m b r -> ProcessT m (a, b) r Source #

Connect two processes to the downstream tails of a Machine that produces tuples. The two downstream consumers are run concurrently. When one downstream consumer stops, the entire pipeline is stopped.

splitProd sink1 sink2 produces a topology like this,

                           sink1
                          /      \
                        a          \
                       /            \
   source -- (a,b) -->               -- r -->
                       \            /
                        b         /
                          \     /
                           sink2