-- Hoogle documentation, generated by Haddock -- See Hoogle, http://www.haskell.org/hoogle/ -- | Processing Real-time event streams -- -- This library provides Real Time Stream Processors (RTSPs). An RTSP -- transforms an input event stream into an output event stream. The -- output events occur asynchronously with input events. RTSPs can be -- composed into pipelines or executed in parallel and their outputs -- merged. A Real Time Action (RTA) monad is provided for creating new -- primitive RTSPs. @package Dflow @version 0.0.1 -- | Real Time Stream Processors are used to describe pipelines that -- process events in real time. An event is described as a (Time, Value) -- pair. When an RTSP receives an event it can respond by emitting zero -- or more events at any time at or after the receipt of the original -- event. Further incoming events may influence the stream of emitted -- events. -- -- Piplelines of RTSPs can be built up using the '(.)' operator from the -- Category instance, or alternatively the '(>>>)' operator -- (which is merely dot with its arguments swapped). RTSPs can be run in -- parallel with their outputs merged using mappend from the -- Monoid instance. -- -- Within the RTSP implementation all notions of "delay" and "time" -- merely refer to the time component of events, and are used for event -- ordering. Only the execRTSP function, which runs in the IO -- monad, executes any actual real-time delay. -- -- The main data types for the application programmer are: -- -- -- -- You can test an RTSP in "fast time" (that is, without waiting for -- real-time delays) by using simulateRTSP. Then you can execute -- the RTSP in real time using execRTSP and be confident that the -- real time behaviour will match the fast-time behaviour. -- -- Simultaneous Events -- -- The handling of logically simultaneous events in discrete event -- simulation is a long-standing problem. The three basic approaches are: -- --
    --
  1. Impose an arbitrary but deterministic order on "simultaneous" -- events.
  2. --
  3. Collect the simultaneous events and pass them to the application, -- on the basis that the application programmer can then impose the -- appropriate semantics.
  4. --
  5. Simulate all possible orderings.
  6. --
-- -- This library takes the first approach. Option 2 would force each RTSP -- to wait for the next event to see if it was simultaneous, which is -- possible in a simulator but not in a real time system. In a real time -- system option 3 is not feasible, and would still leave the problem of -- which ordering to present to the outside world as the "real" one. -- -- When two simultaneous events arrive at an RTSP, the current -- implementation uses the following rules: -- -- -- -- However these properties interact in ways that are complex, hard to -- define formally and not guaranteed to be stable. Code that depends on -- the ordering of simultaneous events should therefore be avoided. module Control.RTSP -- | Real time events. data Event a Event :: UTCTime -> a -> Event a eventTime :: Event a -> UTCTime eventValue :: Event a -> a -- | True if the first event occurs strictly before the second. This makes -- Event a poset (partially ordered set). Infix priority 4 (the -- same as other comparison operators). isBefore :: Event a -> Event b -> Bool -- | A real-time event stream cannot be described without reference to -- unknown future inputs. Hence EventStream embodies two -- possible futures: -- -- -- -- There are also two degenerate cases: -- -- -- -- Event streams are like the Mirror of Galadriel, for they show things -- that were, things that are, and things that yet may be. But which it -- is that he sees, even the wisest cannot always tell. -- -- Seeing is both good and perilous. An event stream may be modified by -- new events, but exceptions or inconsistent results will occur if the -- incoming events are not in increasing order of time. data EventStream b c -- | True if the first argument is scheduled to emit an event before the -- second. This makes EventStream a poset (partially ordered -- set). Infix priority 4. emitsBefore :: EventStream b1 c1 -> EventStream b2 c2 -> Bool -- | An event stream that never generates anything. nullStream :: EventStream b c -- | True if the event stream is guaranteed not to emit any future events, -- regardless of input. esFinished :: EventStream b c -> Bool -- | Peek at the events that will be emitted by this EventStream if no -- incoming event interrupts them. esPeek :: EventStream b c -> [Event c] -- | All the possible futures of the event stream. esFutures :: EventStream b c -> [(Event c, EventStream b c)] -- | Given a new input event to an existing event stream, this returns the -- modified event stream. When esProcess is called on the result -- the Event argument to the second call must not occur before the first -- (they can be simultaneous). More formally, if -- --
--   esOut = esProcess (esProcess esIn ev1) ev2
--   
-- -- then not (ev2 isBefore ev1). This precondition is not -- checked. esProcess :: Event b -> EventStream b c -> EventStream b c -- | Merge the outputs of two event streams. Input events are delivered to -- both streams. esMerge :: EventStream b c -> EventStream b c -> EventStream b c -- | Real Time Stream Processor (RTSP) -- -- An EventStream cannot exist independently of some event that caused it -- to start. Hence the only way to create an EventStream is through an -- RTSP. -- -- newtype RTSP b c RTSP :: (Event b -> EventStream b c) -> RTSP b c runRTSP :: RTSP b c -> Event b -> EventStream b c -- | Execute an RTSP against a list of events. Useful for testing. simulateRTSP :: RTSP b c -> [Event b] -> [Event c] -- | Execute an RTSP in the IO monad. The function returns immediately with -- an action for pushing events into the RTSP. execRTSP :: RTSP b (IO ()) -> IO (b -> IO ()) -- | A pure function converted into a stream processor stream :: (b -> c) -> RTSP b c -- | When a new input event is delivered to an RTSP it causes any future -- output events to be dropped in favour of the new events. -- accumulate instead keeps the events from previous inputs -- interleaved with the new ones. If you use this unnecessarily then you -- will get duplicated events. -- -- If there are n output events due to be emitted before an -- input event then this will require O(n) time for the input. accumulate :: RTSP b c -> RTSP b c -- | Repeat each input event after the specified delays until a new event -- arrives, at which point the sequence begins again with the new event -- value. The list of delays must not be negative and must be in -- ascending order. All the delays are relative to the first event. -- -- Be careful when using list comprehensions to create the argument. A -- list like -- --
--   [1..5] :: NominalDiffTime
--   
-- -- will count up in picoseconds rather than seconds, which is probably -- not what is wanted. Instead use -- --
--   map fromInteger [1..5] :: NominalDiffTime
--   
repeatEvent :: [NominalDiffTime] -> RTSP b b -- | Delay input events by the specified time, but given an event stream -- {ev1, ev2, ev3...}, if ev2 arrives before ev1 has been -- emitted then ev1 will be lost. delay0 :: NominalDiffTime -> RTSP b b -- | Delay input events by the specified time. -- -- Unfortunately this requires O(n) time when there are n events -- queued up due to the use of accumulate. delay :: NominalDiffTime -> RTSP b b -- | A conditional stream: events matching the predicate will be passed to -- the stream. type Cond a b = (a -> Bool, RTSP a b) -- | Conditional stream execution: only certain events will be accepted. streamFilter :: Cond a b -> RTSP a b -- | Send each event to all the streams that accept it. cond :: [Cond a b] -> RTSP a b -- | Send each event to the first stream that accepts it, if any. cond1 :: [Cond a b] -> RTSP a b -- | Send each event to the conditional stream if it accepts it, otherwise -- send it to the second argument. -- -- ifThenElse (p, rThen) rElse is equivalent to -- --
--   streamFilter (p, rThen) `mappend` streamFilter (not . p, rElse)
--   
-- -- However ifThenElse only evaluates p once for each -- input event. ifThenElse :: Cond a b -> RTSP a b -> RTSP a b -- | Real-time Actions. This monad is used to build sequential processors -- that can be turned into stream processors. An RTA emits zero or more -- events in response to each input event, and has a state that persists -- from one event to the next. In particular, state changes made after a -- pause will be visible to the next event regardless of the -- relative times. data RTA s c v -- | Get the current state. get :: RTA s c s -- | Put the current state. put :: s -> RTA s c () -- | Apply a function to the current state. modify :: (s -> s) -> RTA s c () -- | Emit a value as an event. emit :: c -> RTA s c () -- | Pause before the next step. This does not actually delay processing; -- it merely increments the time of any emitted events. pause :: NominalDiffTime -> RTA s c () -- | Get the current time. This is the event time plus any pauses. now :: RTA s c UTCTime -- | Execute an RTA as part of a real time stream processor. -- -- When a new event arrives any pending output events will be lost. -- However any state changes are immediately visible to the next event, -- even if they occured "after" the lost events. For instance, consider -- this: -- --
--   execRTA 1 $ \_ -> do
--      n <- get
--      pause 10
--      emit n
--      put (n+1)
--      return True
--   
-- -- If this receives events at t=[0,1,3,20] then it will emit -- [Event 13 3, Event 30 4]. The events that would have been -- emitted at t=[10,11] have been lost, but the state change -- still occured immediately, regardless of the output schedule. execRTA :: s -> (b -> RTA s c Bool) -> RTSP b c -- | Like execRTA, except that output events are accumulated. accumulateRTA :: s -> (b -> RTA s c Bool) -> RTSP b c instance Show a => Show (Event a) instance Eq a => Eq (Event a) instance Show c => Show (EventStream b c) instance Monad (RTA s c) instance Functor (RTA s c) instance Category RTSP instance Category EventStream instance Functor (RTSP b) instance Functor (EventStream b) instance Monoid (EventStream b c) instance Monoid (RTSP b c) instance Show (RTSP b c) instance Functor Event