machinecell-2.0.1: Arrow based stream transducers

Safe HaskellNone
LanguageHaskell2010

Control.Arrow.Machine

Contents

Description

 

Synopsis

Quick introduction

As other iteratee or pipe libraries, machinecell abstracts general iteration processes.

Here is an example that is a simple iteration over a list.

>>> run (evMap (+1)) [1, 2, 3]
[2, 3, 4]

In above statement, "evMap (+1)" has a type "ProcessA (->) (Event Int) (Event Int)", which denotes "A stream transducer that takes a series of Int as input, gives a series of Int as output, run on base arrow (->)."

ProcessA is the transducer type of machinecell library.

Side effects

In general, Arrow types other than (->) may have side effects. For example any monadic side effects can be performed by wrapping the monad with Kleisli.

ProcessA can run the effects as following.

>>> runKleisli (run_ $ anytime (Kleisli print)) [1, 2, 3]
1
2
3

Where anytime makes a transducer that executes side effects for each input. run_ is almost same as run but discards transducer's output.

That is useful in the case rather side effects are main concern.

ProcessA as pipes

"ProcessA a (Event b) (Event c)" transducers are actually one-directional composable pipes.

They can be constructed from Plan monads. In Plan monad context, await and yield can be used to get and emit values. And actions of base monads can be lifted to the context.

Then, resulting processes are composed as Category using `(>>>)` operator.

source :: ProcessA (Kleisli IO) (Event ()) (Event String)  
source = repeatedlyT kleisli0 $
  do
    _ <- await
    x <- lift getLine
    yield x

pipe :: ArrowApply a => ProcessA a (Event String) (Event String)
pipe = construct $
  do
    s1 <- await
    s2 <- await
    yield (s1 ++ s2)

sink :: ProcessA (Kleisli IO) (Event String) (Event Void)
sink = repeatedlyT kleisli0
  do
    x <- await
    lift $ putStrLn x
>>> runKleisli (run_ $ source \>\>\> pipe \>\>\> sink) (repeat ())

The above code reads two lines from stdin, puts a concatenated line to stdout and finishes.

Unlike other pipe libraries, even a source must call await.

The source awaits dummy input, namely "(repeat ())", and discard input values. Even the input is an infinite list, this program stops when the "pipe" transducer stops.

More details on finalizing

Finalizing behavior of transducers obey the following scenario.

  1. Signals of type Event can carry end signs.
  2. Most transducers stop when they get an end sign. (Some exceptions can be made by onEnd or catchP)
  3. If run function detects an end sign as an output of a running transducer, it stops feeding input values and alternatively feeds end signs.
  4. Continue iteration until no more events can be occurred.

So "await `catchP` some_cleanup" can handle any stop of both upstream and downstream.

On the other hand, a plan never gets end sign without calling await. That's why even sources must call await.

Arrow composition

One of the most attractive feature of machinecell is the arrow composition.

In addition to Category, ProcessA has Arrow instance declaration, which allows parallel compositions.

If a type has an Arrow instance, it can be wrote by ghc extended proc-do notation as following.

f :: ProcessA (Kleisli IO) (Event Int) (Event ())
f = proc x ->
  do
    -- Process odd integers.
    odds <- filter $ arr odd -< x
    anytime $ Kleisli (putStrLn . ("Odd: " ++)) -< show <$> odds

    -- Process even integers.
    evens <- filter $ arr even -< x
    anytime $ Kleisli (putStrLn . ("Even: " ++)) -< show <$> evens
>>> P.runKleisli (run f) [1..10]
Odd: 1
Even: 2
Odd: 3
Even: 4
...

The result implies that two statements that inputs x and their downstreams are executed in parallel.

Behaviours

The transducers we have already seen are all have input and output type wrapped by Event. We have not taken care of them so far because all of them are cancelled each other.

But several built-in transducers provides non-event values like below.

hold :: ArrowApply a => b -> ProcessA a (Event b) b
accum :: ArrowApply a => b -> ProcessA a (Event (b->b)) b

hold keeps the last input until a new value is provided.

accum updates its outputting by applying every input function.

According to a knowledge from arrowized FRP(functional reactive programming), values that appear naked in arrow notations are behaviour, that means coutinuous time-varying values, whereas event values are discrete.

Note that all values that can be input, output, or taken effects must be discrete.

To use continuous values anyhow interacting the real world, they must be encoded to discrete values.

That's done by functor calculations between any existing events.

An example is below.

f :: ArrowApply a => ProcessA a (Event Int) (Event Int)
f = proc x ->
   do
     y <- accum 0 -< (+) <$> x
     returnA -< y <$ x
>>> run f [1, 2, 3]
[1, 3, 6]

`(<$)` operator discards the value of rhs and only uses that's container structure e.g. 1 <$ Just "a" => Just 1, 1 <$ Nothing => Nothing, 1 <$ [True, False, undefined] => [1, 1, 1].

In this case, the value of y are outputed according to the timing of x.

Note

Purity of `ProcessA (->)`

Since a of `ProcessA a b c` represents base monad(ArrowApply), `ProcessA (->)` is expected to be pure.

In other words, the following arrow results the same result for arbitrary f.

proc x ->
  do
    _ <- fit arr f -< x
    g -< x

Which is desugared to `f &&& g >>> arr snd`. At least if Event constructor is exported, the proposition is falsible. When f is "arr (replicate k) >>> fork" for some integer k and g is "arr (const $ Event ())", g yields ()s for k times. That is because, the result value of arrow "f &&& g" is nothing but "(Event x, Event ())" and its number of yields is k because "Event x" must be yielded k times.

That's because Event constructor is hidden. Using primitives exported by this module, it works almost correctly. Event number is conserved by inserting an appropriate number of NoEvents. But there is still a loophole.

Under the current implementation, the arrow below behaves like "arr (const $ Event x)".

proc x -> hold noEvent -< ev <$ ev

I have an idea to correct this, such that the above arrow always be NoEvent. But in the result Event is no longer a functor in the meaning of haskell type class.

For now, if you never make value of nested event type like "ev <$ ev", the problem will be avoided.

Looping

Although ProcessA is an instance of ArrowLoop, to send values to upstream, there is a little difficulties.

In example below, result is [0, 1, 1, 1], not [0, 1, 2, 3].

f = proc x ->
  do
    rec
        b <- dHold 0 -< y
        y <- fork -< (xx -> [xx, xx+1, xx+2, xx+3]) <$> x
    returnA -< b <$ y

dHold i = proc x -> drSwitch (pure i) -< ((), pure <$> x)
>>> run f [1]
[0, 1, 1, 1]

This is because of machinecell's execution strategy. It's much similar to Prolog's backtracking stategy. At the time backtracking reaches fork three values are found and backtracking go and back three times between fork and returnA, but not reaches to dHold until all outputs are done.

In general, Event values should not be refered at upstream.

Rather, they should be encoded to behaviours and send to upstream in rec statement and delayed by cycleDelay.

Another way to send values to upstream is encloseState.

Unsafe primitives

In the code below, edge does not fire.

encloseState False (sta >>> peekState) >>> edge

where

sta = constructT (ary0 $ statefully unArrowMonad) (put True >> await >> put False)

That is because, when "put True" is executing, the backtracking is going up and never hits edge until "put False" is executed.

The same occurs for "proc b -> if b then (now -< ()) else (returnA -< noEvent)" instead of edge.

Even worse, it again breaks the purity of ProcessA. await gets NoEvent if some "arr (replicate k) >>> fork" is inserted somewhere in upstream. Then edge may fire because "put False" execution is delayed.

This means that, encloseState, peekState, edge, and ArrowChoice instance for ProcessA should never be existed simultaneously.

Moreover, their primitives unsafeSteady, unsafeExhaust, fitEx are so.

But I hope some of them can be rescued. So for now, this library contains them all.

Modules

Control.Arrow.Machine is good to import qualified, because no operators are exported.

Alternatively, you can import libraries below individually, with only Control.Arrow.Machine.Utils qualified or identifier specified.

Control.Arrow.Machine.Misc.* are not included by default. They are all designed to import qualified.