Dflow-0.0.1: Processing Real-time event streams

Safe HaskellSafe-Infered




Real Time Stream Processors are used to describe pipelines that process events in real time. An event is described as a (Time, Value) pair. When an RTSP receives an event it can respond by emitting zero or more events at any time at or after the receipt of the original event. Further incoming events may influence the stream of emitted events.

Piplelines of RTSPs can be built up using the '(.)' operator from the Category instance, or alternatively the '(>>>)' operator (which is merely dot with its arguments swapped). RTSPs can be run in parallel with their outputs merged using mappend from the Monoid instance.

Within the RTSP implementation all notions of "delay" and "time" merely refer to the time component of events, and are used for event ordering. Only the execRTSP function, which runs in the IO monad, executes any actual real-time delay.

The main data types for the application programmer are:

A value that occurs at a certain time. For instance an Event Char might represent a key press.
The Real Time Stream Processor. A value of type RTSP x y takes in events of type x and emits events of type y. RTSPs can be strung together into pipelines using (.) (or (>>>) if you prefer your data to flow left-to-right). RTSPs are also monoids, so you can fork your data through two parallel RTSPs and then merge the results.
A monad for building stateful RTSPs. Convert an RTA into an RTSP using execRTA or accumulateRTA depending what you want to do with pending output events when a new input event arrives.

You can test an RTSP in "fast time" (that is, without waiting for real-time delays) by using simulateRTSP. Then you can execute the RTSP in real time using execRTSP and be confident that the real time behaviour will match the fast-time behaviour.

Simultaneous Events

The handling of logically simultaneous events in discrete event simulation is a long-standing problem. The three basic approaches are:

  1. Impose an arbitrary but deterministic order on "simultaneous" events.
  2. Collect the simultaneous events and pass them to the application, on the basis that the application programmer can then impose the appropriate semantics.
  3. Simulate all possible orderings.

This library takes the first approach. Option 2 would force each RTSP to wait for the next event to see if it was simultaneous, which is possible in a simulator but not in a real time system. In a real time system option 3 is not feasible, and would still leave the problem of which ordering to present to the outside world as the "real" one.

When two simultaneous events arrive at an RTSP, the current implementation uses the following rules:

  • Simultaneous output events retain the order of the input events that triggered them. Hence simultaneous events never "overtake".
  • In the case of (id `mappend` stream (+ 1)) the output alternates the left and right expressions, starting with the left.

However these properties interact in ways that are complex, hard to define formally and not guaranteed to be stable. Code that depends on the ordering of simultaneous events should therefore be avoided.



data Event a Source

Real time events.




eventTime :: UTCTime
eventValue :: a


Functor Event 
Eq a => Eq (Event a) 
Show a => Show (Event a) 

isBefore :: Event a -> Event b -> BoolSource

True if the first event occurs strictly before the second. This makes Event a poset (partially ordered set). Infix priority 4 (the same as other comparison operators).

Real Time Stream Processors

data EventStream b c Source

A real-time event stream cannot be described without reference to unknown future inputs. Hence EventStream embodies two possible futures:

  • An Event c will be emitted at some time in the future, with a new EventStream representing the future after that event.
  • An incoming Event b will arrive before the next Event c is emitted, creating a new EventStream representing the response to that event. The old Event c may or may not be part of the new EventStream.

There are also two degenerate cases:

  • Wait: no event is scheduled to be emitted, and the EventStream just waits for an incoming event.
  • Finish: no event will ever be emitted, regardless of incoming events. This is explicitly distinguished so that complex RTSP expressions can be GC'd if they can be proven to be finished.

Event streams are like the Mirror of Galadriel, for they show things that were, things that are, and things that yet may be. But which it is that he sees, even the wisest cannot always tell.

Seeing is both good and perilous. An event stream may be modified by new events, but exceptions or inconsistent results will occur if the incoming events are not in increasing order of time.

emitsBefore :: EventStream b1 c1 -> EventStream b2 c2 -> BoolSource

True if the first argument is scheduled to emit an event before the second. This makes EventStream a poset (partially ordered set). Infix priority 4.

nullStream :: EventStream b cSource

An event stream that never generates anything.

esFinished :: EventStream b c -> BoolSource

True if the event stream is guaranteed not to emit any future events, regardless of input.

esPeek :: EventStream b c -> [Event c]Source

Peek at the events that will be emitted by this EventStream if no incoming event interrupts them.

esFutures :: EventStream b c -> [(Event c, EventStream b c)]Source

All the possible futures of the event stream.

esProcess :: Event b -> EventStream b c -> EventStream b cSource

Given a new input event to an existing event stream, this returns the modified event stream. When esProcess is called on the result the Event argument to the second call must not occur before the first (they can be simultaneous). More formally, if

  esOut = esProcess (esProcess esIn ev1) ev2

then not (ev2 isBefore ev1). This precondition is not checked.

esMerge :: EventStream b c -> EventStream b c -> EventStream b cSource

Merge the outputs of two event streams. Input events are delivered to both streams.

newtype RTSP b c Source

Real Time Stream Processor (RTSP)

An EventStream cannot exist independently of some event that caused it to start. Hence the only way to create an EventStream is through an RTSP.

  • mempty is the event sink: it never emits an event.
  • mappend runs its arguments in parallel and merges their outputs.
  • id is the null operation: events are passed through unchanged.
  • (.) is sequential composition: events emitted by the second argument are passed to the first argument.




runRTSP :: Event b -> EventStream b c


Category RTSP 
Functor (RTSP b) 
Show (RTSP b c) 
Monoid (RTSP b c) 



:: RTSP b c

The processor to execute.

-> [Event b]

The events must be finite and in chronological order. This is unchecked.

-> [Event c] 

Execute an RTSP against a list of events. Useful for testing.



:: RTSP b (IO ())

The output of the RTSP is a series of action events that will be executed in a separate thread sequentially at the times given. The actions may, of course, fork their own threads as necessary.

execRTSP uses atomically, so it cannot be called within unsafePerformIO.

-> IO (b -> IO ()) 

Execute an RTSP in the IO monad. The function returns immediately with an action for pushing events into the RTSP.

stream :: (b -> c) -> RTSP b cSource

A pure function converted into a stream processor

accumulate :: RTSP b c -> RTSP b cSource

When a new input event is delivered to an RTSP it causes any future output events to be dropped in favour of the new events. accumulate instead keeps the events from previous inputs interleaved with the new ones. If you use this unnecessarily then you will get duplicated events.

If there are n output events due to be emitted before an input event then this will require O(n) time for the input.

Manipulating event times

repeatEvent :: [NominalDiffTime] -> RTSP b bSource

Repeat each input event after the specified delays until a new event arrives, at which point the sequence begins again with the new event value. The list of delays must not be negative and must be in ascending order. All the delays are relative to the first event.

Be careful when using list comprehensions to create the argument. A list like

 [1..5] :: NominalDiffTime

will count up in picoseconds rather than seconds, which is probably not what is wanted. Instead use

 map fromInteger [1..5] :: NominalDiffTime

delay0 :: NominalDiffTime -> RTSP b bSource

Delay input events by the specified time, but given an event stream {ev1, ev2, ev3...}, if ev2 arrives before ev1 has been emitted then ev1 will be lost.

delay :: NominalDiffTime -> RTSP b bSource

Delay input events by the specified time.

Unfortunately this requires O(n) time when there are n events queued up due to the use of accumulate.

Conditional event processing

type Cond a b = (a -> Bool, RTSP a b)Source

A conditional stream: events matching the predicate will be passed to the stream.

streamFilter :: Cond a b -> RTSP a bSource

Conditional stream execution: only certain events will be accepted.

cond :: [Cond a b] -> RTSP a bSource

Send each event to all the streams that accept it.

cond1 :: [Cond a b] -> RTSP a bSource

Send each event to the first stream that accepts it, if any.

ifThenElse :: Cond a b -> RTSP a b -> RTSP a bSource

Send each event to the conditional stream if it accepts it, otherwise send it to the second argument.

ifThenElse (p, rThen) rElse is equivalent to

  streamFilter (p, rThen) `mappend` streamFilter (not . p, rElse)

However ifThenElse only evaluates p once for each input event.

Real Time Actions with state

data RTA s c v Source

Real-time Actions. This monad is used to build sequential processors that can be turned into stream processors. An RTA emits zero or more events in response to each input event, and has a state that persists from one event to the next. In particular, state changes made after a pause will be visible to the next event regardless of the relative times.


Monad (RTA s c) 
Functor (RTA s c) 

get :: RTA s c sSource

Get the current state.

put :: s -> RTA s c ()Source

Put the current state.

modify :: (s -> s) -> RTA s c ()Source

Apply a function to the current state.

emit :: c -> RTA s c ()Source

Emit a value as an event.

pause :: NominalDiffTime -> RTA s c ()Source

Pause before the next step. This does not actually delay processing; it merely increments the time of any emitted events.

now :: RTA s c UTCTimeSource

Get the current time. This is the event time plus any pauses.



:: s

The initial state. State persists between input events.

-> (b -> RTA s c Bool)

A function from the input value to an action. If the action returns True then subsequent input events will run the action again. If it returns False then the RTSP finishes and will not respond to further events.

-> RTSP b c 

Execute an RTA as part of a real time stream processor.

When a new event arrives any pending output events will be lost. However any state changes are immediately visible to the next event, even if they occured "after" the lost events. For instance, consider this:

   execRTA 1 $ \_ -> do
      n <- get
      pause 10
      emit n
      put (n+1)
      return True

If this receives events at t=[0,1,3,20] then it will emit [Event 13 3, Event 30 4]. The events that would have been emitted at t=[10,11] have been lost, but the state change still occured immediately, regardless of the output schedule.

accumulateRTA :: s -> (b -> RTA s c Bool) -> RTSP b cSource

Like execRTA, except that output events are accumulated.