concurrent-machines-0.3.1.4: Concurrent networked stream transducers
Safe HaskellNone
LanguageHaskell2010

Data.Machine.Concurrent

Description

The primary use of concurrent machines is to establish a pipelined architecture that can boost overall throughput by running each stage of the pipeline at the same time. The processing, or production, rate of each stage may not be identical, so facilities are provided to loosen the temporal coupling between pipeline stages using buffers.

This architecture also lends itself to operations where multiple workers are available for procesisng inputs. If each worker is to process the same set of inputs, consider fanout and fanoutSteps. If each worker is to process a disjoint set of inputs, consider scatter.

Synopsis

Documentation

capWye :: forall (m :: Type -> Type) a b c. Monad m => SourceT m a -> SourceT m b -> WyeT m a b c -> SourceT m c #

Tie off both inputs of a wye by connecting them to known sources.

capY :: forall (m :: Type -> Type) b a c. Monad m => SourceT m b -> WyeT m a b c -> ProcessT m a c #

Tie off one input of a wye by connecting it to a known source.

capX :: forall (m :: Type -> Type) a b c. Monad m => SourceT m a -> WyeT m a b c -> ProcessT m b c #

Tie off one input of a wye by connecting it to a known source.

addY :: forall (m :: Type -> Type) b c a d. Monad m => ProcessT m b c -> WyeT m a c d -> WyeT m a b d #

Precompose a pipe onto the right input of a wye.

addX :: forall (m :: Type -> Type) a b c d. Monad m => ProcessT m a b -> WyeT m b c d -> WyeT m a c d #

Precompose a pipe onto the left input of a wye.

data Y a b c where #

The input descriptor for a Wye or WyeT

Constructors

X :: forall a b. Y a b a 
Y :: forall a b. Y a b b 
Z :: forall a b. Y a b (Either a b) 

type Wye a b c = Machine (Y a b) c #

A Machine that can read from two input stream in a non-deterministic manner.

type WyeT (m :: Type -> Type) a b c = MachineT m (Y a b) c #

A Machine that can read from two input stream in a non-deterministic manner with monadic side-effects.

logMealy :: Semigroup a => Mealy a a #

Accumulate history.

unfoldMealy :: (s -> a -> (b, s)) -> s -> Mealy a b #

A Mealy machine modeled with explicit state.

newtype Mealy a b #

Mealy machines

Examples

We can enumerate inputs:

>>> let countingMealy = unfoldMealy (\i x -> ((i, x), i + 1)) 0
>>> run (auto countingMealy <~ source "word")
[(0,'w'),(1,'o'),(2,'r'),(3,'d')]

Constructors

Mealy 

Fields

Instances

Instances details
Arrow Mealy 
Instance details

Defined in Data.Machine.Mealy

Methods

arr :: (b -> c) -> Mealy b c #

first :: Mealy b c -> Mealy (b, d) (c, d) #

second :: Mealy b c -> Mealy (d, b) (d, c) #

(***) :: Mealy b c -> Mealy b' c' -> Mealy (b, b') (c, c') #

(&&&) :: Mealy b c -> Mealy b c' -> Mealy b (c, c') #

ArrowChoice Mealy 
Instance details

Defined in Data.Machine.Mealy

Methods

left :: Mealy b c -> Mealy (Either b d) (Either c d) #

right :: Mealy b c -> Mealy (Either d b) (Either d c) #

(+++) :: Mealy b c -> Mealy b' c' -> Mealy (Either b b') (Either c c') #

(|||) :: Mealy b d -> Mealy c d -> Mealy (Either b c) d #

ArrowApply Mealy 
Instance details

Defined in Data.Machine.Mealy

Methods

app :: Mealy (Mealy b c, b) c #

Profunctor Mealy 
Instance details

Defined in Data.Machine.Mealy

Methods

dimap :: (a -> b) -> (c -> d) -> Mealy b c -> Mealy a d #

lmap :: (a -> b) -> Mealy b c -> Mealy a c #

rmap :: (b -> c) -> Mealy a b -> Mealy a c #

(#.) :: forall a b c q. Coercible c b => q b c -> Mealy a b -> Mealy a c #

(.#) :: forall a b c q. Coercible b a => Mealy b c -> q a b -> Mealy a c #

Automaton Mealy 
Instance details

Defined in Data.Machine.Mealy

Methods

auto :: Mealy a b -> Process a b #

Corepresentable Mealy 
Instance details

Defined in Data.Machine.Mealy

Associated Types

type Corep Mealy :: Type -> Type #

Methods

cotabulate :: (Corep Mealy d -> c) -> Mealy d c #

Choice Mealy 
Instance details

Defined in Data.Machine.Mealy

Methods

left' :: Mealy a b -> Mealy (Either a c) (Either b c) #

right' :: Mealy a b -> Mealy (Either c a) (Either c b) #

Closed Mealy 
Instance details

Defined in Data.Machine.Mealy

Methods

closed :: Mealy a b -> Mealy (x -> a) (x -> b) #

Strong Mealy 
Instance details

Defined in Data.Machine.Mealy

Methods

first' :: Mealy a b -> Mealy (a, c) (b, c) #

second' :: Mealy a b -> Mealy (c, a) (c, b) #

Costrong Mealy 
Instance details

Defined in Data.Machine.Mealy

Methods

unfirst :: Mealy (a, d) (b, d) -> Mealy a b #

unsecond :: Mealy (d, a) (d, b) -> Mealy a b #

Cosieve Mealy NonEmpty 
Instance details

Defined in Data.Machine.Mealy

Methods

cosieve :: Mealy a b -> NonEmpty a -> b #

Functor (Mealy a) 
Instance details

Defined in Data.Machine.Mealy

Methods

fmap :: (a0 -> b) -> Mealy a a0 -> Mealy a b #

(<$) :: a0 -> Mealy a b -> Mealy a a0 #

Applicative (Mealy a) 
Instance details

Defined in Data.Machine.Mealy

Methods

pure :: a0 -> Mealy a a0 #

(<*>) :: Mealy a (a0 -> b) -> Mealy a a0 -> Mealy a b #

liftA2 :: (a0 -> b -> c) -> Mealy a a0 -> Mealy a b -> Mealy a c #

(*>) :: Mealy a a0 -> Mealy a b -> Mealy a b #

(<*) :: Mealy a a0 -> Mealy a b -> Mealy a a0 #

Distributive (Mealy a) 
Instance details

Defined in Data.Machine.Mealy

Methods

distribute :: Functor f => f (Mealy a a0) -> Mealy a (f a0) #

collect :: Functor f => (a0 -> Mealy a b) -> f a0 -> Mealy a (f b) #

distributeM :: Monad m => m (Mealy a a0) -> Mealy a (m a0) #

collectM :: Monad m => (a0 -> Mealy a b) -> m a0 -> Mealy a (m b) #

Representable (Mealy a) 
Instance details

Defined in Data.Machine.Mealy

Associated Types

type Rep (Mealy a) #

Methods

tabulate :: (Rep (Mealy a) -> a0) -> Mealy a a0 #

index :: Mealy a a0 -> Rep (Mealy a) -> a0 #

Pointed (Mealy a) 
Instance details

Defined in Data.Machine.Mealy

Methods

point :: a0 -> Mealy a a0 #

Extend (Mealy a) 
Instance details

Defined in Data.Machine.Mealy

Methods

duplicated :: Mealy a a0 -> Mealy a (Mealy a a0) #

extended :: (Mealy a a0 -> b) -> Mealy a a0 -> Mealy a b #

Category Mealy 
Instance details

Defined in Data.Machine.Mealy

Methods

id :: forall (a :: k). Mealy a a #

(.) :: forall (b :: k) (c :: k) (a :: k). Mealy b c -> Mealy a b -> Mealy a c #

Semigroup b => Semigroup (Mealy a b) 
Instance details

Defined in Data.Machine.Mealy

Methods

(<>) :: Mealy a b -> Mealy a b -> Mealy a b #

sconcat :: NonEmpty (Mealy a b) -> Mealy a b #

stimes :: Integral b0 => b0 -> Mealy a b -> Mealy a b #

Monoid b => Monoid (Mealy a b) 
Instance details

Defined in Data.Machine.Mealy

Methods

mempty :: Mealy a b #

mappend :: Mealy a b -> Mealy a b -> Mealy a b #

mconcat :: [Mealy a b] -> Mealy a b #

type Corep Mealy 
Instance details

Defined in Data.Machine.Mealy

type Rep (Mealy a) 
Instance details

Defined in Data.Machine.Mealy

type Rep (Mealy a) = NonEmpty a

unfoldMoore :: (s -> (b, a -> s)) -> s -> Moore a b #

Construct a Moore machine from a state valuation and transition function

logMoore :: Monoid m => Moore m m #

Accumulate the input as a sequence.

data Moore a b #

Moore machines

Constructors

Moore b (a -> Moore a b) 

Instances

Instances details
Profunctor Moore 
Instance details

Defined in Data.Machine.Moore

Methods

dimap :: (a -> b) -> (c -> d) -> Moore b c -> Moore a d #

lmap :: (a -> b) -> Moore b c -> Moore a c #

rmap :: (b -> c) -> Moore a b -> Moore a c #

(#.) :: forall a b c q. Coercible c b => q b c -> Moore a b -> Moore a c #

(.#) :: forall a b c q. Coercible b a => Moore b c -> q a b -> Moore a c #

Automaton Moore 
Instance details

Defined in Data.Machine.Moore

Methods

auto :: Moore a b -> Process a b #

Corepresentable Moore 
Instance details

Defined in Data.Machine.Moore

Associated Types

type Corep Moore :: Type -> Type #

Methods

cotabulate :: (Corep Moore d -> c) -> Moore d c #

Closed Moore 
Instance details

Defined in Data.Machine.Moore

Methods

closed :: Moore a b -> Moore (x -> a) (x -> b) #

Costrong Moore 
Instance details

Defined in Data.Machine.Moore

Methods

unfirst :: Moore (a, d) (b, d) -> Moore a b #

unsecond :: Moore (d, a) (d, b) -> Moore a b #

Cosieve Moore [] 
Instance details

Defined in Data.Machine.Moore

Methods

cosieve :: Moore a b -> [a] -> b #

Monad (Moore a)

slow diagonalization

Instance details

Defined in Data.Machine.Moore

Methods

(>>=) :: Moore a a0 -> (a0 -> Moore a b) -> Moore a b #

(>>) :: Moore a a0 -> Moore a b -> Moore a b #

return :: a0 -> Moore a a0 #

Functor (Moore a) 
Instance details

Defined in Data.Machine.Moore

Methods

fmap :: (a0 -> b) -> Moore a a0 -> Moore a b #

(<$) :: a0 -> Moore a b -> Moore a a0 #

MonadFix (Moore a) 
Instance details

Defined in Data.Machine.Moore

Methods

mfix :: (a0 -> Moore a a0) -> Moore a a0 #

Applicative (Moore a) 
Instance details

Defined in Data.Machine.Moore

Methods

pure :: a0 -> Moore a a0 #

(<*>) :: Moore a (a0 -> b) -> Moore a a0 -> Moore a b #

liftA2 :: (a0 -> b -> c) -> Moore a a0 -> Moore a b -> Moore a c #

(*>) :: Moore a a0 -> Moore a b -> Moore a b #

(<*) :: Moore a a0 -> Moore a b -> Moore a a0 #

Distributive (Moore a) 
Instance details

Defined in Data.Machine.Moore

Methods

distribute :: Functor f => f (Moore a a0) -> Moore a (f a0) #

collect :: Functor f => (a0 -> Moore a b) -> f a0 -> Moore a (f b) #

distributeM :: Monad m => m (Moore a a0) -> Moore a (m a0) #

collectM :: Monad m => (a0 -> Moore a b) -> m a0 -> Moore a (m b) #

Representable (Moore a) 
Instance details

Defined in Data.Machine.Moore

Associated Types

type Rep (Moore a) #

Methods

tabulate :: (Rep (Moore a) -> a0) -> Moore a a0 #

index :: Moore a a0 -> Rep (Moore a) -> a0 #

MonadZip (Moore a) 
Instance details

Defined in Data.Machine.Moore

Methods

mzip :: Moore a a0 -> Moore a b -> Moore a (a0, b) #

mzipWith :: (a0 -> b -> c) -> Moore a a0 -> Moore a b -> Moore a c #

munzip :: Moore a (a0, b) -> (Moore a a0, Moore a b) #

Comonad (Moore a) 
Instance details

Defined in Data.Machine.Moore

Methods

extract :: Moore a a0 -> a0 #

duplicate :: Moore a a0 -> Moore a (Moore a a0) #

extend :: (Moore a a0 -> b) -> Moore a a0 -> Moore a b #

ComonadApply (Moore a) 
Instance details

Defined in Data.Machine.Moore

Methods

(<@>) :: Moore a (a0 -> b) -> Moore a a0 -> Moore a b #

(@>) :: Moore a a0 -> Moore a b -> Moore a b #

(<@) :: Moore a a0 -> Moore a b -> Moore a a0 #

Pointed (Moore a) 
Instance details

Defined in Data.Machine.Moore

Methods

point :: a0 -> Moore a a0 #

Copointed (Moore a) 
Instance details

Defined in Data.Machine.Moore

Methods

copoint :: Moore a a0 -> a0 #

MonadReader [a] (Moore a) 
Instance details

Defined in Data.Machine.Moore

Methods

ask :: Moore a [a] #

local :: ([a] -> [a]) -> Moore a a0 -> Moore a a0 #

reader :: ([a] -> a0) -> Moore a a0 #

Semigroup b => Semigroup (Moore a b) 
Instance details

Defined in Data.Machine.Moore

Methods

(<>) :: Moore a b -> Moore a b -> Moore a b #

sconcat :: NonEmpty (Moore a b) -> Moore a b #

stimes :: Integral b0 => b0 -> Moore a b -> Moore a b #

Monoid b => Monoid (Moore a b) 
Instance details

Defined in Data.Machine.Moore

Methods

mempty :: Moore a b #

mappend :: Moore a b -> Moore a b -> Moore a b #

mconcat :: [Moore a b] -> Moore a b #

type Corep Moore 
Instance details

Defined in Data.Machine.Moore

type Corep Moore = []
type Rep (Moore a) 
Instance details

Defined in Data.Machine.Moore

type Rep (Moore a) = [a]

zipping :: Tee a b (a, b) #

Zip together two inputs, halting as soon as either input is exhausted.

zipWith :: (a -> b -> c) -> Tee a b c #

Zip together two inputs, then apply the given function, halting as soon as either input is exhausted. This implementation reads from the left, then the right

zipWithT :: forall a b c (m :: Type -> Type). (a -> b -> c) -> PlanT (T a b) c m () #

wait for both the left and the right sides of a T and then merge them with f.

capT :: forall (m :: Type -> Type) a b c. Monad m => SourceT m a -> SourceT m b -> TeeT m a b c -> SourceT m c #

Tie off both inputs to a tee by connecting them to known sources. This is recommended over capping each side separately, as it is far more efficient.

capR :: forall (m :: Type -> Type) b a c. Monad m => SourceT m b -> TeeT m a b c -> ProcessT m a c #

Tie off one input of a tee by connecting it to a known source.

capL :: forall (m :: Type -> Type) a b c. Monad m => SourceT m a -> TeeT m a b c -> ProcessT m b c #

Tie off one input of a tee by connecting it to a known source.

addR :: forall (m :: Type -> Type) b c a d. Monad m => ProcessT m b c -> TeeT m a c d -> TeeT m a b d #

Precompose a pipe onto the right input of a tee.

addL :: forall (m :: Type -> Type) a b c d. Monad m => ProcessT m a b -> TeeT m b c d -> TeeT m a c d #

Precompose a pipe onto the left input of a tee.

teeT :: forall (m :: Type -> Type) a b c (k :: Type -> Type). Monad m => TeeT m a b c -> MachineT m k a -> MachineT m k b -> MachineT m k c #

`teeT mt ma mb` Use a Tee to interleave or combine the outputs of ma and mb.

The resulting machine will draw from a single source.

Examples:

>>> import Data.Machine.Source
>>> run $ teeT zipping echo echo <~ source [1..5]
[(1,2),(3,4)]

data T a b c where #

The input descriptor for a Tee or TeeT

Constructors

L :: forall a b. T a b a 
R :: forall a b. T a b b 

type Tee a b c = Machine (T a b) c #

A Machine that can read from two input stream in a deterministic manner.

type TeeT (m :: Type -> Type) a b c = MachineT m (T a b) c #

A Machine that can read from two input stream in a deterministic manner with monadic side-effects.

unfoldT :: Monad m => (r -> m (Maybe (a, r))) -> r -> SourceT m a #

Effectful unfold variant.

unfold :: (r -> Maybe (a, r)) -> r -> Source a #

unfold k seed The function takes the element and returns Nothing if it is done producing values or returns Just (a,r), in which case, a is yielded and r is used as the next element in a recursive call.

enumerateFromTo :: Enum a => a -> a -> Source a #

Enumerate from a value to a final value, inclusive, via succ

Examples:

>>> run $ enumerateFromTo 1 3
[1,2,3]

replicated :: Int -> a -> Source a #

replicated n x is a source of x emitted n time(s)

iterated :: (a -> a) -> a -> Source a #

iterated f x returns an infinite source of repeated applications of f to x

plug :: forall (m :: Type -> Type) (k :: Type -> Type) o. Monad m => MachineT m k o -> SourceT m o #

You can transform any MachineT into a SourceT, blocking its input.

This is used by capT, and capWye, and allows an efficient way to plug together machines of different input languages.

cap :: Process a b -> Source a -> Source b #

You can transform a Source with a Process.

Alternately you can view this as capping the Source end of a Process, yielding a new Source.

cap l r = l <~ r

source :: Foldable f => f b -> Source b #

Generate a Source from any Foldable container.

This can be constructed from a plan with source :: Foldable f => f b -> Source b source = construct (traverse_ yield xs)

Examples:

>>> run $ source [1,2]
[1,2]

cycled :: Foldable f => f b -> Source b #

Loop through a Foldable container over and over.

This can be constructed from a plan with cycled :: Foldable f => f b -> Source b cycled = repeatedly (traverse_ yield xs)

Examples:

>>> run $ taking 5 <~ cycled [1,2]
[1,2,1,2,1]

repeated :: o -> Source o #

Repeat the same value, over and over.

This can be constructed from a plan with repeated :: o -> Source o repeated = repeatedly . yield

Examples:

>>> run $ taking 5 <~ repeated 1
[1,1,1,1,1]

type Source b = forall (k :: Type -> Type). Machine k b #

A Source never reads from its inputs.

type SourceT (m :: Type -> Type) b = forall (k :: Type -> Type). MachineT m k b #

A SourceT never reads from its inputs, but may have monadic side-effects.

strippingPrefix :: forall b (m :: Type -> Type) (k :: Type -> Type -> Type) a. (Eq b, Monad m) => MachineT m (k a) b -> MachineT m (k a) b -> MachineT m (k a) b #

strippingPrefix mp mb Drops the given prefix from mp. It stops if mb did not start with the prefix given, or continues streaming after the prefix, if mb did.

showing :: forall (k :: Type -> Type -> Type) a. (Category k, Show a) => Machine (k a) String #

Convert Showable values to Strings

reading :: forall (k :: Type -> Type -> Type) a. (Category k, Read a) => Machine (k String) a #

Parse Readable values, only emitting the value if the parse succceeds. This Machine stops at first parsing error

traversing :: forall (k :: Type -> Type -> Type) m a b. (Category k, Monad m) => (a -> m b) -> MachineT m (k a) b #

Apply an effectful to all values coming from the input.

Alias to autoM.

mapping :: forall (k :: Type -> Type -> Type) a b. Category k => (a -> b) -> Machine (k a) b #

Apply a function to all values coming from the input

This can be constructed from a plan with

mapping :: Category k => (a -> b) -> Machine (k a) b
mapping f = repeatedly $ await >>= yield . f

Examples:

>>> runT $ mapping (*2) <~ source [1..3]
[2,4,6]

sequencing :: forall (k :: Type -> Type -> Type) m a. (Category k, Monad m) => MachineT m (k (m a)) a #

Convert a stream of actions to a stream of values

This can be constructed from a plan with

sequencing :: Monad m => (a -> m b) -> ProcessT m a b
sequencing :: (Category k, Monad m) => MachineT m (k (m a)) a
sequencing = repeatedly $ do
  ma <- await
  a  <- lift ma
  yield a

Examples:

>>> runT $ sequencing <~ source [Just 3, Nothing]
Nothing
>>> runT $ sequencing <~ source [Just 3, Just 4]
Just [3,4]

smallest :: forall (k :: Type -> Type -> Type) a. (Category k, Ord a) => Machine (k a) a #

Return the minimum value from the input

largest :: forall (k :: Type -> Type -> Type) a. (Category k, Ord a) => Machine (k a) a #

Return the maximum value from the input

intersperse :: forall (k :: Type -> Type -> Type) a. Category k => a -> Machine (k a) a #

Intersperse an element between the elements of the input

intersperse :: a -> Process a a

finalOr :: forall (k :: Type -> Type -> Type) a. Category k => a -> Machine (k a) a #

Skip all but the final element of the input. If the input is empty, the default value is emitted

This can be constructed from a plan with

finalOr :: a -> Process a a
finalOr :: Category k => a -> Machine (k a) a
finalOr = construct . go where
  go prev = do
    next await <| yield prev *> stop
    go next

Examples:

>>> runT $ finalOr (-1) <~ source [1..10]
[10]
>>> runT $ finalOr (-1) <~ source []
[-1]

final :: forall (k :: Type -> Type -> Type) a. Category k => Machine (k a) a #

Skip all but the final element of the input

This can be constructed from a plan with

final :: Process a a
final :: Category k => Machine (k a) a
final = construct $ await >>= go where
  go prev = do
    next await <| yield prev *> stop
    go next

Examples:

>>> runT $ final <~ source [1..10]
[10]
>>> runT $ final <~ source []
[]

autoM :: forall (k :: Type -> Type -> Type) m a b. (Category k, Monad m) => (a -> m b) -> MachineT m (k a) b #

Apply a monadic function to each element of a ProcessT.

This can be constructed from a plan with

autoM :: Monad m => (a -> m b) -> ProcessT m a b
autoM :: (Category k, Monad m) => (a -> m b) -> MachineT m (k a) b
autoM f = repeatedly $ await >>= lift . f >>= yield

Examples:

>>> runT $ autoM Left <~ source [3, 4]
Left 3
>>> runT $ autoM Right <~ source [3, 4]
Right [3,4]

sinkPart_ :: forall (m :: Type -> Type) a b c. Monad m => (a -> (b, c)) -> ProcessT m c Void -> ProcessT m a b #

sinkPart_ toParts sink creates a process that uses the toParts function to break input into a tuple of (passAlong, sinkPart) for which the second projection is given to the supplied sink ProcessT (that produces no output) while the first projection is passed down the pipeline.

flattened :: forall (f :: Type -> Type) a. Foldable f => Process (f a) a #

Break each input into pieces that are fed downstream individually.

Alias for asParts

asParts :: forall (f :: Type -> Type) a. Foldable f => Process (f a) a #

Break each input into pieces that are fed downstream individually.

This can be constructed from a plan with

asParts :: Foldable f => Process (f a) a
asParts = repeatedly $ await >>= traverse_ yield

Examples:

>>> run $ asParts <~ source [[1..3],[4..6]]
[1,2,3,4,5,6]

fold1 :: forall (k :: Type -> Type -> Type) a. Category k => (a -> a -> a) -> Machine (k a) a #

fold1 is a variant of fold that has no starting value argument

This can be constructed from a plan with

fold1 :: Category k => (a -> a -> a) -> Machine (k a) a
fold1 func = construct $ await >>= go where
  go cur = do
    next await <| yield cur *> stop
    go $! func cur next

Examples:

>>> run $ fold1 (+) <~ source [1..5]
[15]

fold :: forall (k :: Type -> Type -> Type) a b. Category k => (a -> b -> a) -> a -> Machine (k b) a #

Construct a Process from a left-folding operation.

Like scan, but only yielding the final value.

It may be useful to consider this alternative signature

fold :: (a -> b -> a) -> a -> Process b a

This can be constructed from a plan with

fold :: Category k => (a -> b -> a) -> a -> Machine (k b) a
fold func seed = construct $ go seed where
  go cur = do
    next await <| yield cur *> stop
    go $! func cur next

Examples:

>>> run $ fold (+) 0 <~ source [1..5]
[15]
>>> run $ fold (\a _ -> a + 1) 0 <~ source [1..5]
[5]

scanMap :: forall (k :: Type -> Type -> Type) b a. (Category k, Monoid b) => (a -> b) -> Machine (k a) b #

Like scan only uses supplied function to map and uses Monoid for associative operation

Examples:

>>> run $ mapping getSum <~ scanMap Sum <~ source [1..5]
[0,1,3,6,10,15]

scan1 :: forall (k :: Type -> Type -> Type) a. Category k => (a -> a -> a) -> Machine (k a) a #

scan1 is a variant of scan that has no starting value argument

This can be constructed from a plan with

scan1 :: Category k => (a -> a -> a) -> Machine (k a) a
scan1 func = construct $ await >>= go where
  go cur = do
    yield cur
    next <- await
    go $! func cur next

Examples:

>>> run $ scan1 (+) <~ source [1..5]
[1,3,6,10,15]

scan :: forall (k :: Type -> Type -> Type) a b. Category k => (a -> b -> a) -> a -> Machine (k b) a #

Construct a Process from a left-scanning operation.

Like fold, but yielding intermediate values.

It may be useful to consider this alternative signature

scan :: (a -> b -> a) -> a -> Process b a

For stateful scan use auto with Data.Machine.Mealy machine. This can be constructed from a plan with

scan :: Category k => (a -> b -> a) -> a -> Machine (k b) a
scan func seed = construct $ go seed where
  go cur = do
    yield cur
    next <- await
    go $! func cur next

Examples:

>>> run $ scan (+) 0 <~ source [1..5]
[0,1,3,6,10,15]
>>> run $ scan (\a _ -> a + 1) 0 <~ source [1..5]
[0,1,2,3,4,5]

process :: forall (m :: Type -> Type) k i o. Monad m => (forall a. k a -> i -> a) -> MachineT m k o -> ProcessT m i o #

Convert a machine into a process, with a little bit of help.

choose :: T a b x -> (a, b) -> x
choose t = case t of
  L -> fst
  R -> snd

process choose :: Tee a b c -> Process (a, b) c
process choose :: Tee a b c -> Process (a, b) c
process (const id) :: Process a b -> Process a b

supply :: forall f (m :: Type -> Type) a b. (Foldable f, Monad m) => f a -> ProcessT m a b -> ProcessT m a b #

Feed a Process some input.

Examples:

>>> run $ supply [1,2,3] echo <~ source [4..6]
[1,2,3,4,5,6]

(~>) :: forall (m :: Type -> Type) (k :: Type -> Type) b c. Monad m => MachineT m k b -> ProcessT m b c -> MachineT m k c infixl 9 #

Flipped (<~).

(<~) :: forall (m :: Type -> Type) b c (k :: Type -> Type). Monad m => ProcessT m b c -> MachineT m k b -> MachineT m k c infixr 9 #

Build a new Machine by adding a Process to the output of an old Machine

(<~) :: Process b c -> Process a b -> Process a c
(<~) :: Process c d -> Tee a b c -> Tee a b d
(<~) :: Process b c -> Machine k b -> Machine k c

buffered :: Int -> Process a [a] #

Chunk up the input into n element lists.

Avoids returning empty lists and deals with the truncation of the final group.

An approximation of this can be constructed from a plan with

buffered :: Int -> Process a [a]
buffered = repeatedly . go [] where
  go acc 0 = yield (reverse acc)
  go acc n = do
    i await <| yield (reverse acc) *> stop
    go (i:acc) $! n-1

Examples:

>>> run $ buffered 3 <~ source [1..6]
[[1,2,3],[4,5,6]]
>>> run $ buffered 3 <~ source [1..5]
[[1,2,3],[4,5]]
>>> run $ buffered 3 <~ source []
[]

droppingWhile :: (a -> Bool) -> Process a a #

A Process that drops elements while a predicate holds

This can be constructed from a plan with

droppingWhile :: (a -> Bool) -> Process a a
droppingWhile p = before echo loop where
  loop = await >>= v -> if p v then loop else yield v

Examples:

>>> run $ droppingWhile (< 3) <~ source [1..5]
[3,4,5]

takingJusts :: Process (Maybe a) a #

A Process that passes through elements unwrapped from Just until a Nothing is found, then stops.

This can be constructed from a plan with

takingJusts :: Process (Maybe a) a
takingJusts = repeatedly $ await >>= maybe stop yield

Examples:

>>> run $ takingJusts <~ source [Just 1, Just 2, Nothing, Just 3, Just 4]
[1,2]

takingWhile :: (a -> Bool) -> Process a a #

A Process that passes through elements until a predicate ceases to hold, then stops

This can be constructed from a plan with

takingWhile :: (a -> Bool) -> Process a a
takingWhile p = repeatedly $ await >>= v -> if p v then yield v else stop

Examples:

>>> run $ takingWhile (< 3) <~ source [1..5]
[1,2]

taking :: Int -> Process a a #

A Process that passes through the first n elements from its input then stops

This can be constructed from a plan with

taking n = construct . replicateM_ n $ await >>= yield

Examples:

>>> run $ taking 3 <~ source [1..5]
[1,2,3]

dropping :: Int -> Process a a #

A Process that drops the first n, then repeats the rest.

This can be constructed from a plan with

dropping n = before echo $ replicateM_ n await

Examples:

>>> run $ dropping 3 <~ source [1..5]
[4,5]

filtered :: (a -> Bool) -> Process a a #

A Process that only passes through inputs that match a predicate.

This can be constructed from a plan with

filtered :: (a -> Bool) -> Process a a
filtered p = repeatedly $ do
  i <- await
  when (p i) $ yield i

Examples:

>>> run $ filtered even <~ source [1..5]
[2,4]

prepended :: Foldable f => f a -> Process a a #

A Process that prepends the elements of a Foldable onto its input, then repeats its input from there.

echo :: Process a a #

The trivial Process that simply repeats each input it receives.

This can be constructed from a plan with

echo :: Process a a
echo = repeatedly $ do
  i <- await
  yield i

Examples:

>>> run $ echo <~ source [1..5]
[1,2,3,4,5]

type Process a b = Machine (Is a) b #

A Process a b is a stream transducer that can consume values of type a from its input, and produce values of type b for its output.

type ProcessT (m :: Type -> Type) a b = MachineT m (Is a) b #

A ProcessT m a b is a stream transducer that can consume values of type a from its input, and produce values of type b and has side-effects in the Monad m.

class Automaton (k :: Type -> Type -> Type) where #

An Automaton can be automatically lifted into a Process

Methods

auto :: k a b -> Process a b #

Instances

Instances details
Automaton Mealy 
Instance details

Defined in Data.Machine.Mealy

Methods

auto :: Mealy a b -> Process a b #

Automaton Moore 
Instance details

Defined in Data.Machine.Moore

Methods

auto :: Moore a b -> Process a b #

Automaton Is 
Instance details

Defined in Data.Machine.Process

Methods

auto :: Is a b -> Process a b #

Automaton ((->) :: Type -> Type -> Type) 
Instance details

Defined in Data.Machine.Process

Methods

auto :: (a -> b) -> Process a b #

class AutomatonM (x :: (Type -> Type) -> Type -> Type -> Type) where #

Methods

autoT :: forall (m :: Type -> Type) a b. Monad m => x m a b -> ProcessT m a b #

Instances

Instances details
AutomatonM Kleisli 
Instance details

Defined in Data.Machine.Process

Methods

autoT :: forall (m :: Type -> Type) a b. Monad m => Kleisli m a b -> ProcessT m a b #

finishWith :: forall (m :: Type -> Type) o r (k :: Type -> Type). Monad m => (o -> Maybe r) -> MachineT m k o -> MachineT m k (Either r o) #

Use a function to produce and mark a yielded value as the terminal value of a Machine. All yielded values for which the given function returns Nothing are yielded down the pipeline, but the first value for which the function returns a Just value will be returned by a Plan created via deconstruct.

tagDone :: forall (m :: Type -> Type) o (k :: Type -> Type). Monad m => (o -> Bool) -> MachineT m k o -> MachineT m k (Either o o) #

Use a predicate to mark a yielded value as the terminal value of this Machine. This is useful in combination with deconstruct to combine Plans.

deconstruct :: forall (m :: Type -> Type) (k :: Type -> Type) a o. Monad m => MachineT m k (Either a o) -> PlanT k o m a #

stopped :: forall (k :: Type -> Type) b. Machine k b #

This is a stopped Machine

starve :: forall (m :: Type -> Type) (k0 :: Type -> Type) b (k :: Type -> Type). Monad m => MachineT m k0 b -> MachineT m k b -> MachineT m k b #

Run a machine with no input until it stops, then behave as another machine.

pass :: k o -> Machine k o #

Given a handle, ignore all other inputs and just stream input from that handle.

pass id :: Process a a
pass L  :: Tee a b a
pass R  :: Tee a b b
pass X  :: Wye a b a
pass Y  :: Wye a b b
pass Z  :: Wye a b (Either a b)

preplan :: forall (m :: Type -> Type) (k :: Type -> Type) o. Monad m => PlanT k o m (MachineT m k o) -> MachineT m k o #

Incorporate a Plan into the resulting machine.

before :: forall (m :: Type -> Type) (k :: Type -> Type) o a. Monad m => MachineT m k o -> PlanT k o m a -> MachineT m k o #

Evaluate a machine until it stops, and then yield answers according to the supplied model.

unfoldPlan :: forall (m :: Type -> Type) s (k :: Type -> Type) o. Monad m => s -> (s -> PlanT k o m s) -> MachineT m k o #

Unfold a stateful PlanT into a MachineT.

repeatedly :: forall (m :: Type -> Type) (k :: Type -> Type) o a. Monad m => PlanT k o m a -> MachineT m k o #

Generates a model that runs a machine until it stops, then start it up again.

repeatedly m = construct (forever m)

construct :: forall (m :: Type -> Type) (k :: Type -> Type) o a. Monad m => PlanT k o m a -> MachineT m k o #

Compile a machine to a model.

fitM :: forall m m' (k :: Type -> Type) o. (Monad m, Monad m') => (forall a. m a -> m' a) -> MachineT m k o -> MachineT m' k o #

fit :: forall (m :: Type -> Type) k k' o. Monad m => (forall a. k a -> k' a) -> MachineT m k o -> MachineT m k' o #

Connect different kinds of machines.

fit id = id

run :: forall (k :: Type -> Type) b. MachineT Identity k b -> [b] #

Run a pure machine and extract an answer.

runT :: forall m (k :: Type -> Type) b. Monad m => MachineT m k b -> m [b] #

Stop feeding input into model and extract an answer

runT_ :: forall m (k :: Type -> Type) b. Monad m => MachineT m k b -> m () #

Stop feeding input into model, taking only the effects.

stepMachine :: forall (m :: Type -> Type) (k :: Type -> Type) o (k' :: Type -> Type) o'. Monad m => MachineT m k o -> (Step k o (MachineT m k o) -> MachineT m k' o') -> MachineT m k' o' #

Transform a Machine by looking at a single step of that machine.

encased :: forall (m :: Type -> Type) (k :: Type -> Type) o. Monad m => Step k o (MachineT m k o) -> MachineT m k o #

Pack a Step of a Machine into a Machine.

data Step (k :: Type -> Type) o r #

This is the base functor for a Machine or MachineT.

Note: A Machine is usually constructed from Plan, so it does not need to be CPS'd.

Constructors

Stop 
Yield o r 
Await (t -> r) (k t) r 

Instances

Instances details
Functor (Step k o) 
Instance details

Defined in Data.Machine.Type

Methods

fmap :: (a -> b) -> Step k o a -> Step k o b #

(<$) :: a -> Step k o b -> Step k o a #

newtype MachineT (m :: Type -> Type) (k :: Type -> Type) o #

A MachineT reads from a number of inputs and may yield results before stopping with monadic side-effects.

Constructors

MachineT 

Fields

Instances

Instances details
Monad m => Functor (MachineT m k) 
Instance details

Defined in Data.Machine.Type

Methods

fmap :: (a -> b) -> MachineT m k a -> MachineT m k b #

(<$) :: a -> MachineT m k b -> MachineT m k a #

(Monad m, Appliance k) => Applicative (MachineT m k) 
Instance details

Defined in Data.Machine.Type

Methods

pure :: a -> MachineT m k a #

(<*>) :: MachineT m k (a -> b) -> MachineT m k a -> MachineT m k b #

liftA2 :: (a -> b -> c) -> MachineT m k a -> MachineT m k b -> MachineT m k c #

(*>) :: MachineT m k a -> MachineT m k b -> MachineT m k b #

(<*) :: MachineT m k a -> MachineT m k b -> MachineT m k a #

m ~ Identity => Foldable (MachineT m k)

This permits toList to be used on a Machine.

Instance details

Defined in Data.Machine.Type

Methods

fold :: Monoid m0 => MachineT m k m0 -> m0 #

foldMap :: Monoid m0 => (a -> m0) -> MachineT m k a -> m0 #

foldMap' :: Monoid m0 => (a -> m0) -> MachineT m k a -> m0 #

foldr :: (a -> b -> b) -> b -> MachineT m k a -> b #

foldr' :: (a -> b -> b) -> b -> MachineT m k a -> b #

foldl :: (b -> a -> b) -> b -> MachineT m k a -> b #

foldl' :: (b -> a -> b) -> b -> MachineT m k a -> b #

foldr1 :: (a -> a -> a) -> MachineT m k a -> a #

foldl1 :: (a -> a -> a) -> MachineT m k a -> a #

toList :: MachineT m k a -> [a] #

null :: MachineT m k a -> Bool #

length :: MachineT m k a -> Int #

elem :: Eq a => a -> MachineT m k a -> Bool #

maximum :: Ord a => MachineT m k a -> a #

minimum :: Ord a => MachineT m k a -> a #

sum :: Num a => MachineT m k a -> a #

product :: Num a => MachineT m k a -> a #

Monad m => Pointed (MachineT m k) 
Instance details

Defined in Data.Machine.Type

Methods

point :: a -> MachineT m k a #

Monad m => Semigroup (MachineT m k o) 
Instance details

Defined in Data.Machine.Type

Methods

(<>) :: MachineT m k o -> MachineT m k o -> MachineT m k o #

sconcat :: NonEmpty (MachineT m k o) -> MachineT m k o #

stimes :: Integral b => b -> MachineT m k o -> MachineT m k o #

Monad m => Monoid (MachineT m k o) 
Instance details

Defined in Data.Machine.Type

Methods

mempty :: MachineT m k o #

mappend :: MachineT m k o -> MachineT m k o -> MachineT m k o #

mconcat :: [MachineT m k o] -> MachineT m k o #

type Machine (k :: Type -> Type) o = forall (m :: Type -> Type). Monad m => MachineT m k o #

A Machine reads from a number of inputs and may yield results before stopping.

A Machine can be used as a MachineT m for any Monad m.

class Appliance (k :: Type -> Type) where #

An input type that supports merging requests from multiple machines.

Methods

applied :: forall (m :: Type -> Type) a b. Monad m => MachineT m k (a -> b) -> MachineT m k a -> MachineT m k b #

exhaust :: forall m a (k :: Type -> Type). Monad m => m (Maybe a) -> PlanT k a m () #

Run a monadic action repeatedly yielding its results, until it returns Nothing.

stop :: forall (k :: Type -> Type) o a. Plan k o a #

awaits :: k i -> Plan k o i #

Wait for a particular input.

awaits L  :: Plan (T a b) o a
awaits R  :: Plan (T a b) o b
awaits id :: Plan (Is i) o i

await :: forall (k :: Type -> Type -> Type) i o. Category k => Plan (k i) o i #

Wait for input.

await = awaits id

maybeYield :: forall o (k :: Type -> Type). Maybe o -> Plan k o () #

Like yield, except stops if there is no value to yield.

yield :: forall o (k :: Type -> Type). o -> Plan k o () #

Output a result.

runPlan :: PlanT k o Identity a -> (a -> r) -> (o -> r -> r) -> (forall z. (z -> r) -> k z -> r -> r) -> r -> r #

Deconstruct a Plan without reference to a Monad.

newtype PlanT (k :: Type -> Type) o (m :: Type -> Type) a #

You can construct a Plan (or PlanT), turning it into a Machine (or MachineT).

Constructors

PlanT 

Fields

  • runPlanT :: forall r. (a -> m r) -> (o -> m r -> m r) -> (forall z. (z -> m r) -> k z -> m r -> m r) -> m r -> m r
     

Instances

Instances details
MonadReader e m => MonadReader e (PlanT k o m) 
Instance details

Defined in Data.Machine.Plan

Methods

ask :: PlanT k o m e #

local :: (e -> e) -> PlanT k o m a -> PlanT k o m a #

reader :: (e -> a) -> PlanT k o m a #

MonadState s m => MonadState s (PlanT k o m) 
Instance details

Defined in Data.Machine.Plan

Methods

get :: PlanT k o m s #

put :: s -> PlanT k o m () #

state :: (s -> (a, s)) -> PlanT k o m a #

MonadWriter w m => MonadWriter w (PlanT k o m) 
Instance details

Defined in Data.Machine.Plan

Methods

writer :: (a, w) -> PlanT k o m a #

tell :: w -> PlanT k o m () #

listen :: PlanT k o m a -> PlanT k o m (a, w) #

pass :: PlanT k o m (a, w -> w) -> PlanT k o m a #

MonadError e m => MonadError e (PlanT k o m) 
Instance details

Defined in Data.Machine.Plan

Methods

throwError :: e -> PlanT k o m a #

catchError :: PlanT k o m a -> (e -> PlanT k o m a) -> PlanT k o m a #

MonadTrans (PlanT k o) 
Instance details

Defined in Data.Machine.Plan

Methods

lift :: Monad m => m a -> PlanT k o m a #

Monad (PlanT k o m) 
Instance details

Defined in Data.Machine.Plan

Methods

(>>=) :: PlanT k o m a -> (a -> PlanT k o m b) -> PlanT k o m b #

(>>) :: PlanT k o m a -> PlanT k o m b -> PlanT k o m b #

return :: a -> PlanT k o m a #

Functor (PlanT k o m) 
Instance details

Defined in Data.Machine.Plan

Methods

fmap :: (a -> b) -> PlanT k o m a -> PlanT k o m b #

(<$) :: a -> PlanT k o m b -> PlanT k o m a #

MonadFail (PlanT k o m) 
Instance details

Defined in Data.Machine.Plan

Methods

fail :: String -> PlanT k o m a #

Applicative (PlanT k o m) 
Instance details

Defined in Data.Machine.Plan

Methods

pure :: a -> PlanT k o m a #

(<*>) :: PlanT k o m (a -> b) -> PlanT k o m a -> PlanT k o m b #

liftA2 :: (a -> b -> c) -> PlanT k o m a -> PlanT k o m b -> PlanT k o m c #

(*>) :: PlanT k o m a -> PlanT k o m b -> PlanT k o m b #

(<*) :: PlanT k o m a -> PlanT k o m b -> PlanT k o m a #

MonadIO m => MonadIO (PlanT k o m) 
Instance details

Defined in Data.Machine.Plan

Methods

liftIO :: IO a -> PlanT k o m a #

Alternative (PlanT k o m) 
Instance details

Defined in Data.Machine.Plan

Methods

empty :: PlanT k o m a #

(<|>) :: PlanT k o m a -> PlanT k o m a -> PlanT k o m a #

some :: PlanT k o m a -> PlanT k o m [a] #

many :: PlanT k o m a -> PlanT k o m [a] #

MonadPlus (PlanT k o m) 
Instance details

Defined in Data.Machine.Plan

Methods

mzero :: PlanT k o m a #

mplus :: PlanT k o m a -> PlanT k o m a -> PlanT k o m a #

type Plan (k :: Type -> Type) o a = forall (m :: Type -> Type). PlanT k o m a #

A Plan k o a is a specification for a pure Machine, that reads inputs selected by k with types based on i, writes values of type o, and has intermediate results of type a.

A Plan k o a can be used as a PlanT k o m a for any Monad m.

It is perhaps easier to think of Plan in its un-cps'ed form, which would look like:

data Plan k o a
  = Done a
  | Yield o (Plan k o a)
  | forall z. Await (z -> Plan k o a) (k z) (Plan k o a)
  | Fail

data Is a b where #

Witnessed type equality

Constructors

Refl :: forall a. Is a a 

Instances

Instances details
Automaton Is 
Instance details

Defined in Data.Machine.Process

Methods

auto :: Is a b -> Process a b #

Category Is 
Instance details

Defined in Data.Machine.Is

Methods

id :: forall (a :: k). Is a a #

(.) :: forall (b :: k) (c :: k) (a :: k). Is b c -> Is a b -> Is a c #

Eq (Is a b) 
Instance details

Defined in Data.Machine.Is

Methods

(==) :: Is a b -> Is a b -> Bool #

(/=) :: Is a b -> Is a b -> Bool #

Ord (Is a b) 
Instance details

Defined in Data.Machine.Is

Methods

compare :: Is a b -> Is a b -> Ordering #

(<) :: Is a b -> Is a b -> Bool #

(<=) :: Is a b -> Is a b -> Bool #

(>) :: Is a b -> Is a b -> Bool #

(>=) :: Is a b -> Is a b -> Bool #

max :: Is a b -> Is a b -> Is a b #

min :: Is a b -> Is a b -> Is a b #

a ~ b => Read (Is a b) 
Instance details

Defined in Data.Machine.Is

Methods

readsPrec :: Int -> ReadS (Is a b) #

readList :: ReadS [Is a b] #

readPrec :: ReadPrec (Is a b) #

readListPrec :: ReadPrec [Is a b] #

Show (Is a b) 
Instance details

Defined in Data.Machine.Is

Methods

showsPrec :: Int -> Is a b -> ShowS #

show :: Is a b -> String #

showList :: [Is a b] -> ShowS #

a ~ b => Semigroup (Is a b) 
Instance details

Defined in Data.Machine.Is

Methods

(<>) :: Is a b -> Is a b -> Is a b #

sconcat :: NonEmpty (Is a b) -> Is a b #

stimes :: Integral b0 => b0 -> Is a b -> Is a b #

a ~ b => Monoid (Is a b) 
Instance details

Defined in Data.Machine.Is

Methods

mempty :: Is a b #

mappend :: Is a b -> Is a b -> Is a b #

mconcat :: [Is a b] -> Is a b #

Concurrent connection

(>~>) :: MonadBaseControl IO m => MachineT m k b -> ProcessT m b c -> MachineT m k c infixl 7 Source #

Flipped (<~<).

(<~<) :: MonadBaseControl IO m => ProcessT m b c -> MachineT m k b -> MachineT m k c Source #

Build a new Machine by adding a Process to the output of an old Machine. The upstream machine is run concurrently with downstream with the aim that upstream will have a yielded value ready as soon as downstream awaits. This effectively creates a buffer between upstream and downstream, or source and sink, that can contain up to one value.

(<~<) :: Process b c -> Process a b -> Process a c
(<~<) :: Process c d -> Tee a b c -> Tee a b d
(<~<) :: Process b c -> Machine k b -> Machine k c

Buffered machines

bufferConnect :: MonadBaseControl IO m => Int -> MachineT m k b -> ProcessT m b c -> MachineT m k c Source #

Mediate a MachineT and a ProcessT with a bounded capacity buffer. The source machine runs concurrently with the sink process, and is only blocked when the buffer is full.

rollingConnect :: MonadBaseControl IO m => Int -> MachineT m k b -> ProcessT m b c -> MachineT m k c Source #

Mediate a MachineT and a ProcessT with a rolling buffer. The source machine runs concurrently with the sink process and is never blocked. If the sink process can not keep up with upstream, yielded values will be dropped.

Concurrent processing of shared inputs

fanout :: (MonadBaseControl IO m, Semigroup r) => [ProcessT m a r] -> ProcessT m a r Source #

Share inputs with each of a list of processes in lockstep. Any values yielded by the processes for a given input are combined into a single yield from the composite process.

fanoutSteps :: (MonadBaseControl IO m, Monoid r) => [ProcessT m a r] -> ProcessT m a r Source #

Share inputs with each of a list of processes in lockstep. If none of the processes yields a value, the composite process will itself yield mempty. The idea is to provide a handle on steps only executed for their side effects. For instance, if you want to run a collection of ProcessTs that await but don't yield some number of times, you can use 'fanOutSteps . map (fmap (const ()))' followed by a taking process.

Concurrent multiple-input machines

wye :: forall m a a' b b' c. MonadBaseControl IO m => ProcessT m a a' -> ProcessT m b b' -> WyeT m a' b' c -> WyeT m a b c Source #

Precompose a Process onto each input of a Wye (or WyeT).

When the choice of input is free (using the Z input descriptor) the two sources will be interleaved.

tee :: forall m a a' b b' c. MonadBaseControl IO m => ProcessT m a a' -> ProcessT m b b' -> TeeT m a' b' c -> TeeT m a b c Source #

Compose a pair of pipes onto the front of a Tee.

scatter :: MonadBaseControl IO m => [MachineT m k o] -> MachineT m k o Source #

Produces values from whichever source MachineT yields first. This operation may also be viewed as a gather operation in that all values produced by the given machines are interleaved when fed downstream. Note that inputs are not shared. The composite machine will await an input when any constituent machine awaits an input. That input will be supplied to the awaiting constituent and no other.

Some examples of more specific useful types scatter may be used at,

scatter :: [ProcessT m a b] -> ProcessT m a b
scatter :: [SourceT m a] -> SourceT m a

The former may be used to stream data through a collection of worker Processes, the latter may be used to intersperse values from a collection of sources.

splitSum :: forall m a b c d. MonadBaseControl IO m => ProcessT m a b -> ProcessT m c d -> ProcessT m (Either a c) (Either b d) Source #

Similar to +++: split the input between two processes, retagging and merging their outputs.

The two processes are run concurrently whenever possible.

mergeSum :: forall m a b r. MonadBaseControl IO m => ProcessT m a r -> ProcessT m b r -> ProcessT m (Either a b) r Source #

Similar to |||: split the input between two processes and merge their outputs.

Connect two processes to the downstream tails of a Machine that produces Eithers. The two downstream consumers are run concurrently when possible. When one downstream consumer stops, the other is allowed to run until it stops or the upstream source yields a value the remaining consumer can not handle.

mergeSum sinkL sinkR produces a topology like this,

                                sinkL
                               /      \
                             a          \
                            /            \
   source -- Either a b -->                -- r -->
                            \            /
                             b          /
                              \       /
                                sinkR 

splitProd :: forall m a b r. MonadBaseControl IO m => ProcessT m a r -> ProcessT m b r -> ProcessT m (a, b) r Source #

Connect two processes to the downstream tails of a Machine that produces tuples. The two downstream consumers are run concurrently. When one downstream consumer stops, the entire pipeline is stopped.

splitProd sink1 sink2 produces a topology like this,

                           sink1
                          /      \
                        a          \
                       /            \
   source -- (a,b) -->               -- r -->
                       \            /
                        b         /
                          \     /
                           sink2