machinecell-3.1.0: Arrow based stream transducers

Safe HaskellSafe
LanguageHaskell2010

Control.Arrow.Machine.Utils

Contents

Synopsis

AFRP-like utilities

hold :: ArrowApply a => b -> ProcessA a (Event b) b Source

dHold :: ArrowApply a => b -> ProcessA a (Event b) b Source

accum :: ArrowApply a => b -> ProcessA a (Event (b -> b)) b Source

dAccum :: ArrowApply a => b -> ProcessA a (Event (b -> b)) b Source

edge :: (ArrowApply a, Eq b) => ProcessA a b (Event b) Source

Switches

Switches inspired by Yampa library. Signature is almost same, but collection requirement is not only Functor, but Traversable. This is because of side effects.

switch :: ArrowApply a => ProcessA a b (c, Event t) -> (t -> ProcessA a b c) -> ProcessA a b c Source

dSwitch :: ArrowApply a => ProcessA a b (c, Event t) -> (t -> ProcessA a b c) -> ProcessA a b c Source

rSwitch :: ArrowApply a => ProcessA a b c -> ProcessA a (b, Event (ProcessA a b c)) c Source

drSwitch :: ArrowApply a => ProcessA a b c -> ProcessA a (b, Event (ProcessA a b c)) c Source

kSwitch :: ArrowApply a => ProcessA a b c -> ProcessA a (b, c) (Event t) -> (ProcessA a b c -> t -> ProcessA a b c) -> ProcessA a b c Source

dkSwitch :: ArrowApply a => ProcessA a b c -> ProcessA a (b, c) (Event t) -> (ProcessA a b c -> t -> ProcessA a b c) -> ProcessA a b c Source

pSwitch :: (ArrowApply a, Traversable col) => (forall sf. b -> col sf -> col (ext, sf)) -> col (ProcessA a ext c) -> ProcessA a (b, col c) (Event mng) -> (col (ProcessA a ext c) -> mng -> ProcessA a b (col c)) -> ProcessA a b (col c) Source

pSwitchB :: (ArrowApply a, Traversable col) => col (ProcessA a b c) -> ProcessA a (b, col c) (Event mng) -> (col (ProcessA a b c) -> mng -> ProcessA a b (col c)) -> ProcessA a b (col c) Source

rpSwitch :: (ArrowApply a, Traversable col) => (forall sf. b -> col sf -> col (ext, sf)) -> col (ProcessA a ext c) -> ProcessA a (b, Event (col (ProcessA a ext c) -> col (ProcessA a ext c))) (col c) Source

rpSwitchB :: (ArrowApply a, Traversable col) => col (ProcessA a b c) -> ProcessA a (b, Event (col (ProcessA a b c) -> col (ProcessA a b c))) (col c) Source

Sources

In addition to the main event stream privided by run, there are two other ways to provide additional input streams, "interleaved" sources and "blocking" sources.

Interleaved sources are actually Event -> Event transformers that don't see the values of the input events. They discard input values and emit their values according to input event timing.

Blocking sources emit their events independent from upstream. Until they exhaust their values, they block upstream transducers.

Here is a demonstration of two kind of sources.

a = proc x ->
  do
    y1 <- source [1, 2, 3] -< x
    y2 <- source [4, 5, 6] -< x

    gather -< [y1, y2]
-- run a (repeat ()) => [1, 4, 2, 5, 3, 6]

b = proc _ ->
  do
    y1 <- blockingSource [1, 2, 3] -< ()
    y2 <- blockingSource [4, 5, 6] -< ()

    gather -< [y1, y2]
-- run b [] => [4, 5, 6, 1, 2, 3]

In above code, you'll see that output values of source (an interleaved source) are actually interelaved, while blockingSource blocks another upstream source.

And they can both implemented using PlanT. The only one deference is await call to listen upstream event timing.

An example is below.

interleavedStdin = constructT kleisli0 (forever pl)
  where
    pl =
      do
        _ <- await
        eof <- isEOF
        if isEOF then stop else return()
        getLine >>= yield

blockingStdin = pure noEvent >>> constructT kleisli0 (forever pl)
  where
    pl =
      do
        -- No await here
        eof <- isEOF
        if isEOF then stop else return()
        getLine >>= yield

They are different in the end behavior. When upstream stops, an interleaved source stops because await call fails. But a blocking source doesn't stop until its own termination.

source :: (ArrowApply a, Foldable f) => f c -> ProcessA a (Event b) (Event c) Source

Provides a source event stream. A dummy input event stream is needed.

  run af [...]

is equivalent to

  run (source [...] >>> af) (repeat ())

blockingSource :: (ArrowApply a, Foldable f) => f c -> ProcessA a () (Event c) Source

Provides a blocking event stream.

interleave :: ArrowApply a => ProcessA a () (Event c) -> ProcessA a (Event b) (Event c) Source

Make a blocking source interleaved.

blocking :: ArrowApply a => ProcessA a (Event ()) (Event c) -> ProcessA a () (Event c) Source

Make an interleaved source blocking.

Other utility arrows

tee :: ArrowApply a => ProcessA a (Event b1, Event b2) (Event (Either b1 b2)) Source

Make two event streams into one. Actually gather is more general and convenient;

... <- tee -< (e1, e2)

is equivalent to

... <- gather -< [Left <$> e1, Right <$> e2]

gather :: (ArrowApply a, Foldable f) => ProcessA a (f (Event b)) (Event b) Source

Make multiple event channels into one. If simultaneous events are given, lefter one is emitted earlier.

fork :: (ArrowApply a, Foldable f) => ProcessA a (Event (f b)) (Event b) Source

Given an array-valued event and emit it's values as inidvidual events.

filter :: ArrowApply a => a b Bool -> ProcessA a (Event b) (Event b) Source

anytime :: ArrowApply a => a b c -> ProcessA a (Event b) (Event c) Source

Executes an action once per an input event is provided.

par :: (ArrowApply a, Traversable col) => (forall sf. b -> col sf -> col (ext, sf)) -> col (ProcessA a ext c) -> ProcessA a b (col c) Source

parB :: (ArrowApply a, Traversable col) => col (ProcessA a b c) -> ProcessA a b (col c) Source

Transformer

readerProc :: (ArrowApply a, ArrowApply a', ArrowAddReader r a a') => ProcessA a b c -> ProcessA a' (b, r) c Source

Run reader of base arrow.