machinecell-3.3.2: Arrow based stream transducers

Safe HaskellSafe
LanguageHaskell2010

Control.Arrow.Machine.Utils

Contents

Synopsis

AFRP-like utilities

hold :: ArrowApply a => b -> ProcessA a (Event b) b Source #

dHold :: ArrowApply a => b -> ProcessA a (Event b) b Source #

accum :: ArrowApply a => b -> ProcessA a (Event (b -> b)) b Source #

dAccum :: ArrowApply a => b -> ProcessA a (Event (b -> b)) b Source #

edge :: (ArrowApply a, Eq b) => ProcessA a b (Event b) Source #

Switches

Switches inspired by Yampa library. Signature is almost same, but collection requirement is not only Functor, but Traversable. This is because of side effects.

switch :: ArrowApply a => ProcessA a b (c, Event t) -> (t -> ProcessA a b c) -> ProcessA a b c Source #

dSwitch :: ArrowApply a => ProcessA a b (c, Event t) -> (t -> ProcessA a b c) -> ProcessA a b c Source #

rSwitch :: ArrowApply a => ProcessA a b c -> ProcessA a (b, Event (ProcessA a b c)) c Source #

drSwitch :: ArrowApply a => ProcessA a b c -> ProcessA a (b, Event (ProcessA a b c)) c Source #

kSwitch :: ArrowApply a => ProcessA a b c -> ProcessA a (b, c) (Event t) -> (ProcessA a b c -> t -> ProcessA a b c) -> ProcessA a b c Source #

dkSwitch :: ArrowApply a => ProcessA a b c -> ProcessA a (b, c) (Event t) -> (ProcessA a b c -> t -> ProcessA a b c) -> ProcessA a b c Source #

pSwitch :: (ArrowApply a, Traversable col) => (forall sf. b -> col sf -> col (ext, sf)) -> col (ProcessA a ext c) -> ProcessA a (b, col c) (Event mng) -> (col (ProcessA a ext c) -> mng -> ProcessA a b (col c)) -> ProcessA a b (col c) Source #

pSwitchB :: (ArrowApply a, Traversable col) => col (ProcessA a b c) -> ProcessA a (b, col c) (Event mng) -> (col (ProcessA a b c) -> mng -> ProcessA a b (col c)) -> ProcessA a b (col c) Source #

rpSwitch :: (ArrowApply a, Traversable col) => (forall sf. b -> col sf -> col (ext, sf)) -> col (ProcessA a ext c) -> ProcessA a (b, Event (col (ProcessA a ext c) -> col (ProcessA a ext c))) (col c) Source #

rpSwitchB :: (ArrowApply a, Traversable col) => col (ProcessA a b c) -> ProcessA a (b, Event (col (ProcessA a b c) -> col (ProcessA a b c))) (col c) Source #

Sources

In addition to the main event stream privided by run, there are two other ways to provide additional input streams, "interleaved" sources and "blocking" sources.

Interleaved sources are actually Event -> Event transformers that don't see the values of the input events. They discard input values and emit their values according to input event timing.

Blocking sources emit their events independent from upstream. Until they exhaust their values, they block upstream transducers.

Here is a demonstration of two kind of sources.

a = proc x ->
  do
    y1 <- source [1, 2, 3] -< x
    y2 <- source [4, 5, 6] -< x

    gather -< [y1, y2]
-- run a (repeat ()) => [1, 4, 2, 5, 3, 6]

b = proc _ ->
  do
    y1 <- blockingSource [1, 2, 3] -< ()
    y2 <- blockingSource [4, 5, 6] -< ()

    gather -< [y1, y2]
-- run b [] => [4, 5, 6, 1, 2, 3]

In above code, you'll see that output values of source (an interleaved source) are actually interelaved, while blockingSource blocks another upstream source.

And they can both implemented using PlanT. The only one deference is await call to listen upstream event timing.

An example is below.

interleavedStdin = constructT kleisli0 (forever pl)
  where
    pl =
      do
        _ <- await
        eof <- isEOF
        if isEOF then stop else return()
        getLine >>= yield

blockingStdin = pure noEvent >>> constructT kleisli0 (forever pl)
  where
    pl =
      do
        -- No await here
        eof <- isEOF
        if isEOF then stop else return()
        getLine >>= yield

They are different in the end behavior. When upstream stops, an interleaved source stops because await call fails. But a blocking source doesn't stop until its own termination.

source :: (ArrowApply a, Foldable f) => f c -> ProcessA a (Event b) (Event c) Source #

Provides a source event stream. A dummy input event stream is needed.

  run af [...]

is equivalent to

  run (source [...] >>> af) (repeat ())

blockingSource :: (ArrowApply a, Foldable f) => f c -> ProcessA a () (Event c) Source #

Provides a blocking event stream.

interleave :: ArrowApply a => ProcessA a () (Event c) -> ProcessA a (Event b) (Event c) Source #

Make a blocking source interleaved.

blocking :: ArrowApply a => ProcessA a (Event ()) (Event c) -> ProcessA a () (Event c) Source #

Make an interleaved source blocking.

Other utility arrows

tee :: ArrowApply a => ProcessA a (Event b1, Event b2) (Event (Either b1 b2)) Source #

Make two event streams into one. Actually gather is more general and convenient;

... <- tee -< (e1, e2)

is equivalent to

... <- gather -< [Left <$> e1, Right <$> e2]

gather :: (ArrowApply a, Foldable f) => ProcessA a (f (Event b)) (Event b) Source #

Make multiple event channels into one. If simultaneous events are given, lefter one is emitted earlier.

fork :: (ArrowApply a, Foldable f) => ProcessA a (Event (f b)) (Event b) Source #

Given an array-valued event and emit it's values as inidvidual events.

filter :: ArrowApply a => a b Bool -> ProcessA a (Event b) (Event b) Source #

anytime :: ArrowApply a => a b c -> ProcessA a (Event b) (Event c) Source #

Executes an action once per an input event is provided.

par :: (ArrowApply a, Traversable col) => (forall sf. b -> col sf -> col (ext, sf)) -> col (ProcessA a ext c) -> ProcessA a b (col c) Source #

parB :: (ArrowApply a, Traversable col) => col (ProcessA a b c) -> ProcessA a b (col c) Source #

oneshot :: ArrowApply a => c -> ProcessA a b (Event c) Source #

Emit an event of given value as soon as possible.

now :: ArrowApply a => ProcessA a b (Event ()) Source #

Emit an event as soon as possible.

 now = oneshot ()

onEnd :: (ArrowApply a, Occasional' b) => ProcessA a b (Event ()) Source #

Emit an event at the end of the input stream.