-- Hoogle documentation, generated by Haddock
-- See Hoogle, http://www.haskell.org/hoogle/
-- | Processing Real-time event streams
--
-- This library provides Real Time Stream Processors (RTSPs). An RTSP
-- transforms an input event stream into an output event stream. The
-- output events occur asynchronously with input events. RTSPs can be
-- composed into pipelines or executed in parallel and their outputs
-- merged. A Real Time Action (RTA) monad is provided for creating new
-- primitive RTSPs.
@package Dflow
@version 0.0.1
-- | Real Time Stream Processors are used to describe pipelines that
-- process events in real time. An event is described as a (Time, Value)
-- pair. When an RTSP receives an event it can respond by emitting zero
-- or more events at any time at or after the receipt of the original
-- event. Further incoming events may influence the stream of emitted
-- events.
--
-- Piplelines of RTSPs can be built up using the '(.)' operator from the
-- Category instance, or alternatively the '(>>>)' operator
-- (which is merely dot with its arguments swapped). RTSPs can be run in
-- parallel with their outputs merged using mappend from the
-- Monoid instance.
--
-- Within the RTSP implementation all notions of "delay" and "time"
-- merely refer to the time component of events, and are used for event
-- ordering. Only the execRTSP function, which runs in the IO
-- monad, executes any actual real-time delay.
--
-- The main data types for the application programmer are:
--
--
-- - Event A value that occurs at a certain time. For
-- instance an Event Char might represent a key
-- press.
-- - RTSP The Real Time Stream Processor. A value of type
-- RTSP x y takes in events of type x and emits
-- events of type y. RTSPs can be strung together into pipelines
-- using (.) (or (>>>) if you prefer your data
-- to flow left-to-right). RTSPs are also monoids, so you can fork your
-- data through two parallel RTSPs and then merge the results.
-- - RTA A monad for building stateful RTSPs. Convert an
-- RTA into an RTSP using execRTA or
-- accumulateRTA depending what you want to do with pending output
-- events when a new input event arrives.
--
--
-- You can test an RTSP in "fast time" (that is, without waiting for
-- real-time delays) by using simulateRTSP. Then you can execute
-- the RTSP in real time using execRTSP and be confident that the
-- real time behaviour will match the fast-time behaviour.
--
-- Simultaneous Events
--
-- The handling of logically simultaneous events in discrete event
-- simulation is a long-standing problem. The three basic approaches are:
--
--
-- - Impose an arbitrary but deterministic order on "simultaneous"
-- events.
-- - Collect the simultaneous events and pass them to the application,
-- on the basis that the application programmer can then impose the
-- appropriate semantics.
-- - Simulate all possible orderings.
--
--
-- This library takes the first approach. Option 2 would force each RTSP
-- to wait for the next event to see if it was simultaneous, which is
-- possible in a simulator but not in a real time system. In a real time
-- system option 3 is not feasible, and would still leave the problem of
-- which ordering to present to the outside world as the "real" one.
--
-- When two simultaneous events arrive at an RTSP, the current
-- implementation uses the following rules:
--
--
-- - Simultaneous output events retain the order of the input events
-- that triggered them. Hence simultaneous events never "overtake".
-- - In the case of (id `mappend` stream (+ 1)) the output
-- alternates the left and right expressions, starting with the
-- left.
--
--
-- However these properties interact in ways that are complex, hard to
-- define formally and not guaranteed to be stable. Code that depends on
-- the ordering of simultaneous events should therefore be avoided.
module Control.RTSP
-- | Real time events.
data Event a
Event :: UTCTime -> a -> Event a
eventTime :: Event a -> UTCTime
eventValue :: Event a -> a
-- | True if the first event occurs strictly before the second. This makes
-- Event a poset (partially ordered set). Infix priority 4 (the
-- same as other comparison operators).
isBefore :: Event a -> Event b -> Bool
-- | A real-time event stream cannot be described without reference to
-- unknown future inputs. Hence EventStream embodies two
-- possible futures:
--
--
-- - An Event c will be emitted at some time in the future,
-- with a new EventStream representing the future after that
-- event.
-- - An incoming Event b will arrive before the next Event
-- c is emitted, creating a new EventStream representing
-- the response to that event. The old Event c may or may not be
-- part of the new EventStream.
--
--
-- There are also two degenerate cases:
--
--
-- - Wait: no event is scheduled to be emitted, and the
-- EventStream just waits for an incoming event.
-- - Finish: no event will ever be emitted, regardless of incoming
-- events. This is explicitly distinguished so that complex RTSP
-- expressions can be GC'd if they can be proven to be finished.
--
--
-- Event streams are like the Mirror of Galadriel, for they show things
-- that were, things that are, and things that yet may be. But which it
-- is that he sees, even the wisest cannot always tell.
--
-- Seeing is both good and perilous. An event stream may be modified by
-- new events, but exceptions or inconsistent results will occur if the
-- incoming events are not in increasing order of time.
data EventStream b c
-- | True if the first argument is scheduled to emit an event before the
-- second. This makes EventStream a poset (partially ordered
-- set). Infix priority 4.
emitsBefore :: EventStream b1 c1 -> EventStream b2 c2 -> Bool
-- | An event stream that never generates anything.
nullStream :: EventStream b c
-- | True if the event stream is guaranteed not to emit any future events,
-- regardless of input.
esFinished :: EventStream b c -> Bool
-- | Peek at the events that will be emitted by this EventStream if no
-- incoming event interrupts them.
esPeek :: EventStream b c -> [Event c]
-- | All the possible futures of the event stream.
esFutures :: EventStream b c -> [(Event c, EventStream b c)]
-- | Given a new input event to an existing event stream, this returns the
-- modified event stream. When esProcess is called on the result
-- the Event argument to the second call must not occur before the first
-- (they can be simultaneous). More formally, if
--
--
-- esOut = esProcess (esProcess esIn ev1) ev2
--
--
-- then not (ev2 isBefore ev1). This precondition is not
-- checked.
esProcess :: Event b -> EventStream b c -> EventStream b c
-- | Merge the outputs of two event streams. Input events are delivered to
-- both streams.
esMerge :: EventStream b c -> EventStream b c -> EventStream b c
-- | Real Time Stream Processor (RTSP)
--
-- An EventStream cannot exist independently of some event that caused it
-- to start. Hence the only way to create an EventStream is through an
-- RTSP.
--
--
-- - mempty is the event sink: it never emits an event.
-- - mappend runs its arguments in parallel and merges their
-- outputs.
-- - id is the null operation: events are passed through
-- unchanged.
-- - (.) is sequential composition: events emitted by the second
-- argument are passed to the first argument.
--
newtype RTSP b c
RTSP :: (Event b -> EventStream b c) -> RTSP b c
runRTSP :: RTSP b c -> Event b -> EventStream b c
-- | Execute an RTSP against a list of events. Useful for testing.
simulateRTSP :: RTSP b c -> [Event b] -> [Event c]
-- | Execute an RTSP in the IO monad. The function returns immediately with
-- an action for pushing events into the RTSP.
execRTSP :: RTSP b (IO ()) -> IO (b -> IO ())
-- | A pure function converted into a stream processor
stream :: (b -> c) -> RTSP b c
-- | When a new input event is delivered to an RTSP it causes any future
-- output events to be dropped in favour of the new events.
-- accumulate instead keeps the events from previous inputs
-- interleaved with the new ones. If you use this unnecessarily then you
-- will get duplicated events.
--
-- If there are n output events due to be emitted before an
-- input event then this will require O(n) time for the input.
accumulate :: RTSP b c -> RTSP b c
-- | Repeat each input event after the specified delays until a new event
-- arrives, at which point the sequence begins again with the new event
-- value. The list of delays must not be negative and must be in
-- ascending order. All the delays are relative to the first event.
--
-- Be careful when using list comprehensions to create the argument. A
-- list like
--
--
-- [1..5] :: NominalDiffTime
--
--
-- will count up in picoseconds rather than seconds, which is probably
-- not what is wanted. Instead use
--
--
-- map fromInteger [1..5] :: NominalDiffTime
--
repeatEvent :: [NominalDiffTime] -> RTSP b b
-- | Delay input events by the specified time, but given an event stream
-- {ev1, ev2, ev3...}, if ev2 arrives before ev1 has been
-- emitted then ev1 will be lost.
delay0 :: NominalDiffTime -> RTSP b b
-- | Delay input events by the specified time.
--
-- Unfortunately this requires O(n) time when there are n events
-- queued up due to the use of accumulate.
delay :: NominalDiffTime -> RTSP b b
-- | A conditional stream: events matching the predicate will be passed to
-- the stream.
type Cond a b = (a -> Bool, RTSP a b)
-- | Conditional stream execution: only certain events will be accepted.
streamFilter :: Cond a b -> RTSP a b
-- | Send each event to all the streams that accept it.
cond :: [Cond a b] -> RTSP a b
-- | Send each event to the first stream that accepts it, if any.
cond1 :: [Cond a b] -> RTSP a b
-- | Send each event to the conditional stream if it accepts it, otherwise
-- send it to the second argument.
--
-- ifThenElse (p, rThen) rElse is equivalent to
--
--
-- streamFilter (p, rThen) `mappend` streamFilter (not . p, rElse)
--
--
-- However ifThenElse only evaluates p once for each
-- input event.
ifThenElse :: Cond a b -> RTSP a b -> RTSP a b
-- | Real-time Actions. This monad is used to build sequential processors
-- that can be turned into stream processors. An RTA emits zero or more
-- events in response to each input event, and has a state that persists
-- from one event to the next. In particular, state changes made after a
-- pause will be visible to the next event regardless of the
-- relative times.
data RTA s c v
-- | Get the current state.
get :: RTA s c s
-- | Put the current state.
put :: s -> RTA s c ()
-- | Apply a function to the current state.
modify :: (s -> s) -> RTA s c ()
-- | Emit a value as an event.
emit :: c -> RTA s c ()
-- | Pause before the next step. This does not actually delay processing;
-- it merely increments the time of any emitted events.
pause :: NominalDiffTime -> RTA s c ()
-- | Get the current time. This is the event time plus any pauses.
now :: RTA s c UTCTime
-- | Execute an RTA as part of a real time stream processor.
--
-- When a new event arrives any pending output events will be lost.
-- However any state changes are immediately visible to the next event,
-- even if they occured "after" the lost events. For instance, consider
-- this:
--
--
-- execRTA 1 $ \_ -> do
-- n <- get
-- pause 10
-- emit n
-- put (n+1)
-- return True
--
--
-- If this receives events at t=[0,1,3,20] then it will emit
-- [Event 13 3, Event 30 4]. The events that would have been
-- emitted at t=[10,11] have been lost, but the state change
-- still occured immediately, regardless of the output schedule.
execRTA :: s -> (b -> RTA s c Bool) -> RTSP b c
-- | Like execRTA, except that output events are accumulated.
accumulateRTA :: s -> (b -> RTA s c Bool) -> RTSP b c
instance Show a => Show (Event a)
instance Eq a => Eq (Event a)
instance Show c => Show (EventStream b c)
instance Monad (RTA s c)
instance Functor (RTA s c)
instance Category RTSP
instance Category EventStream
instance Functor (RTSP b)
instance Functor (EventStream b)
instance Monoid (EventStream b c)
instance Monoid (RTSP b c)
instance Show (RTSP b c)
instance Functor Event