machinecell-4.0.1: Arrow based stream transducers

Safe HaskellSafe
LanguageHaskell2010

Control.Arrow.Machine.Utils

Contents

Synopsis

AFRP-like utilities

hold :: Monad m => b -> ProcessT m (Event b) b Source #

dHold :: Monad m => b -> ProcessT m (Event b) b Source #

accum :: Monad m => b -> ProcessT m (Event (b -> b)) b Source #

Accumulate inputs like fold.

>>> :{
let pa = proc evx ->
      do
        val <- accum 0 -< (+1) <$ evx
        returnA -< val <$ evx
  in
    run pa (replicate 10 ())
:}
[1,2,3,4,5,6,7,8,9,10]

Since 4.0.0, this function become strict for the first argument because lazy one could rarely be used.

You can make switches to make lazy one.

dAccum :: Monad m => b -> ProcessT m (Event (b -> b)) b Source #

Delayed version of accum.

>>> :{
let pa = proc evx ->
      do
        val <- dAccum 0 -< (+1) <$ evx
        returnA -< val <$ evx
  in
    run pa (replicate 10 ())
:}
[0,1,2,3,4,5,6,7,8,9]

Since 4.0.0, this function become strict for the first argument because lazy one could rarely be used.

You can make switches to make lazy one.

edge :: (Monad m, Eq b) => ProcessT m b (Event b) Source #

Detects edges of input behaviour.

>>> run (hold 0 >>> edge) [1, 1, 2, 2, 2, 3]
[0,1,2,3]
>>> run (hold 0 >>> edge) [0, 1, 1, 2, 2, 2, 3]
[0,1,2,3]

Switches

Switches inspired by Yampa library. Signature is almost same, but collection requirement is not only Functor, but Traversable. This is because of side effects.

switch :: Monad m => ProcessT m b (c, Event t) -> (t -> ProcessT m b c) -> ProcessT m b c Source #

Run the 1st transducer at the beggining. Then switch to 2nd when Event t occurs.

>>> :{
let
    before = proc x ->
      do
        trigger <- filterEvent (== 3) -< x
        returnA -< ((*10) <$> x, trigger)
    after t = proc x -> returnA -< (*100) <$> x
 in
    run (switch before after) [1..5]
:}
[10,20,300,400,500]

dSwitch :: Monad m => ProcessT m b (c, Event t) -> (t -> ProcessT m b c) -> ProcessT m b c Source #

Delayed version of switch

>>> :{
let
    before = proc x ->
      do
        trigger <- filterEvent (== 3) -< x
        returnA -< ((*10) <$> x, trigger)
    after t = proc x -> returnA -< (*100) <$> x
 in
    run (dSwitch before after) [1..5]
:}
[10,20,30,400,500]

rSwitch :: Monad m => ProcessT m b c -> ProcessT m (b, Event (ProcessT m b c)) c Source #

Recurring switch.

>>> :{
let pa = proc evtp ->
      do
        evx <- returnA -< fst <$> evtp
        evarr <- filterJust -< snd <$> evtp
        rSwitch (evMap (*10)) -< (evx, evarr)
    l = [(1, Nothing),
         (2, Just (arr $ fmap (*100))),
         (3, Nothing),
         (4, Just (arr $ fmap (*1000))),
         (5, Nothing)]
  in
    run pa l
:}
[10,200,300,4000,5000]

drSwitch :: Monad m => ProcessT m b c -> ProcessT m (b, Event (ProcessT m b c)) c Source #

Delayed version of rSwitch.

>>> :{
let pa = proc evtp ->
      do
        evx <- returnA -< fst <$> evtp
        evarr <- filterJust -< snd <$> evtp
        drSwitch (evMap (*10)) -< (evx, evarr)
    l = [(1, Nothing),
         (2, Just (arr $ fmap (*100))),
         (3, Nothing),
         (4, Just (arr $ fmap (*1000))),
         (5, Nothing)]
  in
    run pa l
:}
[10,20,300,400,5000]

kSwitch :: Monad m => ProcessT m b c -> ProcessT m (b, c) (Event t) -> (ProcessT m b c -> t -> ProcessT m b c) -> ProcessT m b c Source #

dkSwitch :: Monad m => ProcessT m b c -> ProcessT m (b, c) (Event t) -> (ProcessT m b c -> t -> ProcessT m b c) -> ProcessT m b c Source #

pSwitch :: (Monad m, Traversable col) => (forall sf. b -> col sf -> col (ext, sf)) -> col (ProcessT m ext c) -> ProcessT m (b, col c) (Event mng) -> (col (ProcessT m ext c) -> mng -> ProcessT m b (col c)) -> ProcessT m b (col c) Source #

pSwitchB :: (Monad m, Traversable col) => col (ProcessT m b c) -> ProcessT m (b, col c) (Event mng) -> (col (ProcessT m b c) -> mng -> ProcessT m b (col c)) -> ProcessT m b (col c) Source #

rpSwitch :: (Monad m, Traversable col) => (forall sf. b -> col sf -> col (ext, sf)) -> col (ProcessT m ext c) -> ProcessT m (b, Event (col (ProcessT m ext c) -> col (ProcessT m ext c))) (col c) Source #

rpSwitchB :: (Monad m, Traversable col) => col (ProcessT m b c) -> ProcessT m (b, Event (col (ProcessT m b c) -> col (ProcessT m b c))) (col c) Source #

Sources

In addition to the main event stream privided by run, there are two other ways to provide additional input streams, "interleaved" sources and "blocking" sources.

Interleaved sources are actually Event -> Event transformers that don't see the values of the input events. They discard input values and emit their values according to input event timing.

Blocking sources emit their events independent from upstream. Until they exhaust their values, they block upstream transducers.

Here is a demonstration of two kind of sources.

a = proc x ->
  do
    y1 <- source [1, 2, 3] -< x
    y2 <- source [4, 5, 6] -< x

    gather -< [y1, y2]
-- run a (repeat ()) => [1, 4, 2, 5, 3, 6]

b = proc _ ->
  do
    y1 <- blockingSource [1, 2, 3] -< ()
    y2 <- blockingSource [4, 5, 6] -< ()

    gather -< [y1, y2]
-- run b [] => [4, 5, 6, 1, 2, 3]

In above code, you'll see that output values of source (an interleaved source) are actually interelaved, while blockingSource blocks another upstream source.

And they can both implemented using PlanT. The only one deference is await call to listen upstream event timing.

An example is below.

interleavedStdin = constructT kleisli0 (forever pl)
  where
    pl =
      do
        _ <- await
        eof <- isEOF
        if isEOF then stop else return()
        getLine >>= yield

blockingStdin = pure noEvent >>> constructT kleisli0 (forever pl)
  where
    pl =
      do
        -- No await here
        eof <- isEOF
        if isEOF then stop else return()
        getLine >>= yield

They are different in the end behavior. When upstream stops, an interleaved source stops because await call fails. But a blocking source doesn't stop until its own termination.

source :: (Monad m, Foldable f) => f a -> ProcessT m (Event i) (Event a) Source #

Provides a source event stream. A dummy input event stream is needed.

  run af [...]

is equivalent to

  run (source [...] >>> af) (repeat ())

blockingSource :: (Monad m, Foldable f) => f a -> ProcessT m ZeroEvent (Event a) Source #

Provides a blocking event stream.

interleave :: Monad m => ProcessT m ZeroEvent (Event a) -> ProcessT m (Event i) (Event a) Source #

Make a blocking source interleaved.

blocking :: Monad m => ProcessT m (Event ()) (Event a) -> ProcessT m ZeroEvent (Event a) Source #

Make an interleaved source blocking.

Other utility arrows

tee :: Monad m => ProcessT m (Event b1, Event b2) (Event (Either b1 b2)) Source #

Make two event streams into one. Actually gather is more general and convenient;

... <- tee -< (e1, e2)

is equivalent to

... <- gather -< [Left <$> e1, Right <$> e2]

gather :: (Monad m, Foldable f) => ProcessT m (f (Event b)) (Event b) Source #

Make multiple event channels into one. If simultaneous events are given, lefter one is emitted earlier.

>>> :{
let pa = proc x ->
      do
        r1 <- filterEvent (\x -> x `mod` 2 == 0) -< x
        r2 <- filterEvent (\x -> x `mod` 3 == 0) -< x
        gather -< [r1, r2]
  in
    run pa [1..6]
:}
[2,3,4,6,6]

It is terminated when the last input finishes.

>>> :{
let pa = proc x ->
      do
        r1 <- filterEvent (\x -> x `mod` 3 == 0) -< x :: Event Int
        r2 <- stopped -< x
        r3 <- returnA -< r2
        fin <- gather -< [r1, r2, r3]
        val <- hold 0 -< r1
        end <- onEnd -< fin
        returnA -< val <$ end
  in
    run pa [1..5]
:}
[3]

fork :: (Monad m, Foldable f) => ProcessT m (Event (f b)) (Event b) Source #

Given an array-valued event and emit it's values as inidvidual events.

>>> run fork [[1,2,3],[],[4,5]]
[1,2,3,4,5]

fire :: Monad m => (b -> m c) -> ProcessT m (Event b) (Event c) Source #

Executes an action once per an input event is provided.

fire0 :: Monad m => m c -> ProcessT m (Event ()) (Event c) Source #

Executes an action once per an input event is provided.

anytime :: ArrowApply a => a b c -> ProcessA a (Event b) (Event c) Source #

Executes an action once per an input event is provided.

par :: (Monad m, Traversable col) => (forall sf. b -> col sf -> col (ext, sf)) -> col (ProcessT m ext c) -> ProcessT m b (col c) Source #

parB :: (Monad m, Traversable col) => col (ProcessT m b c) -> ProcessT m b (col c) Source #

oneshot :: Monad m => c -> ProcessT m b (Event c) Source #

Emit an event of given value as soon as possible.

now :: Monad m => ProcessT m b (Event ()) Source #

Emit an event as soon as possible.

 now = oneshot ()

onEnd :: (Monad m, Occasional' b) => ProcessT m b (Event ()) Source #

Emit an event at the end of the input stream. >>> :{ let pa = proc evx -> do x <- hold 0 -< evx ed <- onEnd -< evx returnA -< x <$ ed in run pa [1..10] :} [10]