{- | Real Time Stream Processors are used to describe pipelines that process events in real time. An event is described as a (Time, Value) pair. When an RTSP receives an event it can respond by emitting zero or more events at any time at or after the receipt of the original event. Further incoming events may influence the stream of emitted events. Piplelines of RTSPs can be built up using the '(.)' operator from the Category instance, or alternatively the '(>>>)' operator (which is merely dot with its arguments swapped). RTSPs can be run in parallel with their outputs merged using "mappend" from the Monoid instance. Within the RTSP implementation all notions of \"delay\" and \"time\" merely refer to the time component of events, and are used for event ordering. Only the 'execRTSP' function, which runs in the IO monad, executes any actual real-time delay. The main data types for the application programmer are: ['Event'] A value that occurs at a certain time. For instance an @'Event' Char@ might represent a key press. ['RTSP'] The Real Time Stream Processor. A value of type @'RTSP' x y@ takes in events of type @x@ and emits events of type @y@. RTSPs can be strung together into pipelines using @(.)@ (or @(>>>)@ if you prefer your data to flow left-to-right). RTSPs are also monoids, so you can fork your data through two parallel RTSPs and then merge the results. ['RTA'] A monad for building stateful RTSPs. Convert an 'RTA' into an 'RTSP' using 'execRTA' or 'accumulateRTA' depending what you want to do with pending output events when a new input event arrives. You can test an RTSP in \"fast time\" (that is, without waiting for real-time delays) by using 'simulateRTSP'. Then you can execute the RTSP in real time using 'execRTSP' and be confident that the real time behaviour will match the fast-time behaviour. /Simultaneous Events/ The handling of logically simultaneous events in discrete event simulation is a long-standing problem. The three basic approaches are: 1. Impose an arbitrary but deterministic order on \"simultaneous\" events. 2. Collect the simultaneous events and pass them to the application, on the basis that the application programmer can then impose the appropriate semantics. 3. Simulate all possible orderings. This library takes the first approach. Option 2 would force each RTSP to wait for the next event to see if it was simultaneous, which is possible in a simulator but not in a real time system. In a real time system option 3 is not feasible, and would still leave the problem of which ordering to present to the outside world as the \"real\" one. When two simultaneous events arrive at an RTSP, the current implementation uses the following rules: * Simultaneous output events retain the order of the input events that triggered them. Hence simultaneous events never \"overtake\". * In the case of @(id \`mappend\` stream (+ 1))@ the output alternates the left and right expressions, starting with the left. However these properties interact in ways that are complex, hard to define formally and not guaranteed to be stable. Code that depends on the ordering of simultaneous events should therefore be avoided. -} module Control.RTSP ( -- ** Events Event (..), isBefore, -- ** Real Time Stream Processors EventStream, emitsBefore, nullStream, esFinished, esPeek, esFutures, esProcess, esMerge, RTSP (..), simulateRTSP, execRTSP, stream, accumulate, -- ** Manipulating event times repeatEvent, delay0, delay, -- ** Conditional event processing Cond, streamFilter, cond, cond1, ifThenElse, -- ** Real Time Actions with state RTA, get, put, modify, emit, pause, now, execRTA, accumulateRTA, ) where import Control.Category import Control.Concurrent import Control.Concurrent.STM import Control.Monad import Data.List import Data.Monoid import Data.Sequence import Data.Time import Prelude hiding ((.),id, repeat) infix 4 `isBefore`, `emitsBefore` -- | Real time events. data Event a = Event {eventTime :: UTCTime, eventValue :: a} deriving (Show, Eq) instance Functor Event where fmap f (Event t v) = Event t (f v) -- | True if the first event occurs strictly before the second. This makes @Event@ a poset (partially ordered set). -- Infix priority 4 (the same as other comparison operators). isBefore :: Event a -> Event b -> Bool isBefore ev1 ev2 = eventTime ev1 < eventTime ev2 {- | A real-time event stream cannot be described without reference to unknown future inputs. Hence @EventStream@ embodies two possible futures: * An @Event c@ will be emitted at some time in the future, with a new @EventStream@ representing the future after that event. * An incoming @Event b@ will arrive before the next @Event c@ is emitted, creating a new @EventStream@ representing the response to that event. The old @Event c@ may or may not be part of the new @EventStream@. There are also two degenerate cases: * Wait: no event is scheduled to be emitted, and the @EventStream@ just waits for an incoming event. * Finish: no event will ever be emitted, regardless of incoming events. This is explicitly distinguished so that complex RTSP expressions can be GC'd if they can be proven to be finished. Event streams are like the Mirror of Galadriel, for they show things that were, things that are, and things that yet may be. But which it is that he sees, even the wisest cannot always tell. Seeing is both good and perilous. An event stream may be modified by new events, but exceptions or inconsistent results will occur if the incoming events are not in increasing order of time. -} data EventStream b c = Emit (Event c) (EventStream b c) (RTSP b c) -- ^ The next event to be emitted, the following EventStream, and the function to -- handle an incoming event before then. | Wait (RTSP b c) -- ^ Degenerate case: no event scheduled to be emitted. | Finish -- ^ Semantically equivalent to @Wait eventSink@, but allows completed streams to be GC'd. deriving (Show) -- | Peek at the events that will be emitted by this EventStream if no incoming event interrupts them. esPeek :: EventStream b c -> [Event c] esPeek (Emit ev es1 _) = ev : esPeek es1 esPeek _ = [] -- | True if the first argument is scheduled to emit an event before the second. This makes @EventStream@ a poset -- (partially ordered set). Infix priority 4. emitsBefore :: EventStream b1 c1 -> EventStream b2 c2 -> Bool emitsBefore Finish _ = False emitsBefore (Wait _) _ = False emitsBefore (Emit ev1 _ _) (Emit ev2 _ _) = ev1 `isBefore` ev2 emitsBefore (Emit _ _ _) _ = True -- Only events satisfying the predicate will be passed on. esFilter :: (b -> Bool) -> EventStream b b esFilter = Wait . rtspFilter -- | Only events satisfying the predicate will be passed on. rtspFilter :: (b -> Bool) -> RTSP b b rtspFilter p = RTSP $ \ev -> if p $ eventValue ev then Emit ev (esFilter p) (rtspFilter p) else esFilter p -- | All the possible futures of the event stream. esFutures :: EventStream b c -> [(Event c, EventStream b c)] esFutures (Emit e es1 _) = (e, es1) : esFutures es1 esFutures _ = [] -- | True if the event stream is guaranteed not to emit any future events, regardless of input. esFinished :: EventStream b c -> Bool esFinished Finish = True esFinished _ = False -- | Merge the outputs of two event streams. Input events are delivered -- to both streams. esMerge :: EventStream b c -> EventStream b c -> EventStream b c esMerge Finish es = es esMerge es Finish = es esMerge (Wait k1) (Wait k2) = Wait (splitRTSP k1 k2) esMerge (Emit e es1 k1) es2@(Wait k2) = Emit e (esMerge es1 es2) (splitRTSP k1 k2) esMerge es1@(Wait k1) (Emit e es2 k2) = Emit e (esMerge es1 es2) (splitRTSP k1 k2) esMerge es1@(Emit e1 es1a k1) es2@(Emit e2 es2a k2) = if e2 `isBefore` e1 then Emit e2 (esMerge es1 es2a) (splitRTSP k1 k2) else Emit e1 (esMerge es1a es2) (splitRTSP k1 k2) -- | Given a new input event to an existing event stream, this returns the modified event stream. When @esProcess@ -- is called on the result the Event argument to the second call must not occur before the first (they can be -- simultaneous). More formally, if -- -- > esOut = esProcess (esProcess esIn ev1) ev2 -- -- then @not (ev2 `isBefore` ev1)@. This precondition is not checked. esProcess :: Event b -> EventStream b c -> EventStream b c esProcess _ Finish = Finish esProcess ev (Wait k) = runRTSP k ev esProcess eIn (Emit eOut rest k) = if eIn `isBefore` eOut then runRTSP k eIn else Emit eOut (esProcess eIn rest) k -- | An event stream that never generates anything. nullStream :: EventStream b c nullStream = Finish -- | Real Time Stream Processor (RTSP) -- -- An EventStream cannot exist independently of some event that caused it to start. Hence the only way to -- create an EventStream is through an RTSP. -- -- * "mempty" is the event sink: it never emits an event. -- -- * "mappend" runs its arguments in parallel and merges their outputs. -- -- * "id" is the null operation: events are passed through unchanged. -- -- * "(.)" is sequential composition: events emitted by the second argument are passed to the first argument. newtype RTSP b c = RTSP {runRTSP :: Event b -> EventStream b c} instance Show (RTSP b c) where show _ = "-RTSP-" instance Monoid (RTSP b c) where mempty = eventSink mappend = splitRTSP instance Monoid (EventStream b c) where mempty = nullStream mappend = esMerge instance Functor (EventStream b) where fmap f (Emit eOut rest k) = Emit (fmap f eOut) (fmap f rest) (fmap f k) fmap f (Wait k) = Wait (fmap f k) fmap _ Finish = Finish instance Functor (RTSP b) where fmap f (RTSP r) = RTSP $ \evt -> fmap f $ r evt {- The (.) operator for EventStream has to deal with several scenarios: 1: (Wait k2) . (Wait k1). This is simple because the only possible event is an input that is piped into k1. The result is the composition of the result with (Wait k2), achieived using the instance for RTSP. 2: (Wait k2) . es1@(Emit e1 es1a k1). In this case there are two timelines: a) e1 : (k2 e1) . es1a -- Event e1 is passed to k2 and the result composed with es1a. b) ev e1: (Wait k2) . (k1 ev) -- e1 never happens. Instead the input is passed to k1, which generates a new event stream to be composed with k2. The code for case b is the "k" value in the "let" clause. 3: es2@(Emit e2 es2a k2) es1@(Emit e1 es1a k1). The logic here is similar to scenario 2, except that the timing of e2 has to be taken into account as well. There are five basic timelines. Let ev = the next input event and es2b = runRTSP k2 e1 a) e2 e1 : Emit e2 (es2a . es1a) (k2 . k1) b) e1 e2 : k2 e1 . es1a -- e2 never happens because it is overridden by e1 c) ev ... : es2 . (k1 ev) -- ev overrides e1, and the new output is fed to es2. d) e1 ev e2 : esProcess (es2b . es1a) ev -- e1 overrides e2, giving es1a and es2b to process ev. e) e2 ev e1 : Emit e2 (es2a . (k1 ev)) -- e2 is emitted, then ev overrides e1. -} instance Category EventStream where -- id :: EventStream b c id = Wait id -- (.) :: EventStream c d -> EventStream b c -> EventStream b d Finish . _ = Finish (Wait _) . Finish = Finish (Emit e es1 _). Finish = let future = Emit e (es1 . Finish) (RTSP $ \_ -> future) in future (Wait k2) . (Wait k1) = Wait $ k2 . k1 es2@(Wait k2) . Emit e1 es1a k1 = let k = RTSP $ \ev -> if ev `isBefore` e1 then es2 . runRTSP k1 ev -- Timeline 2b else esProcess ev es -- Timeline 2a es = (runRTSP k2 e1) . es1a -- Future if ev never happens in case es of Emit e2 es2b _ -> Emit e2 es2b k Wait _ -> Wait k Finish -> Wait k -- if ev `isBefore` e1 then es never happens. es2@(Emit e2 es2a _) . es1@(Wait k1) = Emit e2 (es2a . es1) (RTSP $ \ev -> es2 . runRTSP k1 ev) es2@(Emit e2 es2a k2) . es1@(Emit e1 es1a k1) = let es = (runRTSP k2 e1) . es1a in if e1 `isBefore` e2 then -- Timelines 3b, 3c and 3d. e2 never happens. let k = RTSP $ \ev -> if ev `isBefore` e1 then es2 . runRTSP k1 ev -- Timeline 3c else esProcess ev es -- Timeline 3d in case es of Emit e3 es3 _ -> Emit e3 es3 k Wait _ -> Wait k Finish -> Wait k -- As above. else -- Timelines 3a, 3c and 3e. let k = RTSP $ \ev -> es2 . runRTSP k1 ev in Emit e2 (es2a . es1) k instance Category RTSP where -- id :: RTSP b c id = RTSP $ \ev -> Emit ev id id -- (.) :: RTSP c d -> RTSP b c -> RTSP b d r2 . r1 = RTSP $ \e0 -> (Wait r2) . runRTSP r1 e0 -- | Execute an RTSP against a list of events. Useful for testing. simulateRTSP :: RTSP b c -- ^ The processor to execute. -> [Event b] -- ^ The events must be finite and in chronological order. This is unchecked. -> [Event c] simulateRTSP r = esPeek . foldl (flip esProcess) (Wait r) -- | Execute an RTSP in the IO monad. The function returns immediately with an action for pushing events into the RTSP. execRTSP :: RTSP b (IO ()) -- ^ The output of the RTSP is a series of action events that will be executed in a separate thread sequentially at the -- times given. The actions may, of course, fork their own threads as necessary. -- -- execRTSP uses 'atomically', so it cannot be called within 'unsafePerformIO'. -> IO (b -> IO ()) execRTSP r = do eventQ <- newTChanIO let putValue v = do t <- getCurrentTime atomically $ writeTChan eventQ $ Event t v execStream (Emit ev es r1) = do {- "c1" and "c2" are threads that race to put a value in "var". "c1" waits until the next event emission time and then puts "Nothing". "c2" waits for the next input on "eventQ" and puts "Just" the event. Once this happens "mev2" can get the result and both threads are killed. The tricky bit is avoiding a race condition in "c2" where it reads the channel and is then killed by the timeout, which would result in a dropped event. "var" is never emptied, so if "c1" wins the race then "c2" can never complete before being killed, so the event remains on the queue ready for the next race. -} var <- newEmptyTMVarIO c1 <- timeout (eventTime ev) (atomically $ putTMVar var Nothing) c2 <- forkIO $ atomically $ do ev1 <- readTChan eventQ putTMVar var $ Just ev1 mev2 <- atomically $ readTMVar var killThread c1 killThread c2 case mev2 of Just ev2 -> execStream $ runRTSP r1 ev2 Nothing -> do () <- eventValue ev execStream es execStream (Wait r1) = do ev2 <- atomically $ readTChan eventQ execStream $ runRTSP r1 ev2 execStream Finish = return () _ <- forkIO $ execStream $ Wait r return putValue -- Execute the given action at the given time. Returns immediately with the ThreadID that will execute the action. timeout :: UTCTime -> IO () -> IO ThreadId timeout t action = forkIO $ do t0 <- getCurrentTime longDelay (round ((t `diffUTCTime` t0) * 1000000)) action where -- threadDelay takes an Int, which may be as small as 2^29 (a bit over 5 minutes). longDelay :: Integer -> IO () longDelay dt = let (n,dt1) = if dt > 0 then dt `divMod` cycleT else (0,0) in do replicateM_ (fromIntegral n) $ threadDelay (fromIntegral cycleT) threadDelay (fromIntegral dt1) cycleT = 500000000 -- 500 seconds in uSec. Small enough to fit into an Int. -- | An RTSP that never emits events regardless of its inputs. eventSink :: RTSP b c eventSink = RTSP $ \_ -> nullStream -- | A pure function converted into a stream processor stream :: (b -> c) -> RTSP b c stream f = fmap f id -- | Deliver an event to two stream processors and merge the resulting event -- streams. splitRTSP :: RTSP b c -> RTSP b c -> RTSP b c splitRTSP (RTSP r1) (RTSP r2) = RTSP $ \evt -> esMerge (r1 evt) (r2 evt) where -- | Convert an list of events into an event stream. Events coming into this -- stream are ignored. The list must be in chronological order. streamFromList :: [Event c]-> EventStream b c streamFromList [] = Finish streamFromList (e:es) = Emit e (streamFromList es) (RTSP $ \_ -> streamFromList (e:es)) -- | When a new input event is delivered to an RTSP it causes any future output events to be dropped in favour of the new -- events. @accumulate@ instead keeps the events from previous inputs interleaved with the new ones. If you use -- this unnecessarily then you will get duplicated events. -- -- If there are @n@ output events due to be emitted before an input event then this will require O(n) time for the input. accumulate :: RTSP b c -> RTSP b c accumulate r = rAccum [] r where rAccum evs@(ev1:evs1) r1 = RTSP $ \ev2 -> if ev2 `isBefore` ev1 -- hpc says this is always true. Can this be proved? then sAccum evs $ runRTSP r1 ev2 else Emit ev1 (sAccum evs1 $ runRTSP r ev2) (rAccum evs r1) rAccum [] r1 = RTSP $ \ev -> sAccum [] $ runRTSP r1 ev sAccum evs1@(ev1:evs1a) es2@(Emit ev2 es2a r2a) = let future = if ev2 `isBefore` ev1 then Emit ev2 (sAccum evs1 es2a) (rAccum (esPeek future) r2a) else Emit ev1 (sAccum evs1a es2) (rAccum (esPeek future) r2a) in future sAccum [] es2@(Emit ev2 es2a r2) = Emit ev2 (sAccum [] es2a) (rAccum (esPeek es2) r2) sAccum evs1@(ev1:evs1a) k2@(Wait r2) = Emit ev1 (sAccum evs1a k2) (rAccum evs1 r2) sAccum [] (Wait r2) = Wait $ accumulate r2 sAccum evs Finish = streamFromList evs -- | Repeat each input event after the specified delays until a new event arrives, at which point the sequence begins again -- with the new event value. The list of delays must not be negative and must be in ascending order. All the delays are -- relative to the first event. -- -- Be careful when using list comprehensions to create the argument. A list like -- -- > [1..5] :: NominalDiffTime -- -- will count up in picoseconds rather than seconds, which is probably not what is wanted. Instead use -- -- > map fromInteger [1..5] :: NominalDiffTime repeatEvent :: [NominalDiffTime] -> RTSP b b repeatEvent dts1 = RTSP $ \(Event t0 v) -> let rStream dt0 (dt:dts2) | dt0 <= dt = Emit (Event (dt `addUTCTime` t0) v) (rStream dt dts2) (repeatEvent dts1) | otherwise = error "Control.Applicative.RTSP.streamRepeat: negative time increment." rStream _ [] = Wait (repeatEvent dts1) in rStream 0 dts1 -- | Delay input events by the specified time, but given an event stream @{ev1, ev2, ev3...}@, if ev2 arrives before -- ev1 has been emitted then ev1 will be lost. delay0 :: NominalDiffTime -> RTSP b b delay0 dt = repeatEvent [dt] -- | Delay input events by the specified time. -- -- Unfortunately this requires O(n) time when there are @n@ events queued up due to the use of "accumulate". delay :: NominalDiffTime -> RTSP b b delay = accumulate . delay0 -- | A conditional stream: events matching the predicate will be passed to the stream. type Cond a b = (a -> Bool, RTSP a b) -- | Conditional stream execution: only certain events will be accepted. streamFilter :: Cond a b -> RTSP a b streamFilter (p, r1) = rtspFilter p >>> r1 -- | Send each event to all the streams that accept it. cond :: [Cond a b] -> RTSP a b cond = mconcat . map streamFilter -- | Send each event to the first stream that accepts it, if any. cond1 :: [Cond a b] -> RTSP a b cond1 = foldr ifThenElse eventSink -- | Send each event to the conditional stream if it accepts it, otherwise send it to the second argument. -- -- @ifThenElse (p, rThen) rElse@ is equivalent to -- -- > streamFilter (p, rThen) `mappend` streamFilter (not . p, rElse) -- -- However @ifThenElse@ only evaluates @p@ once for each input event. ifThenElse :: Cond a b -> RTSP a b -> RTSP a b ifThenElse (p,rThen) rElse = ifRTSP (Wait rThen) (Wait rElse) where ifRTSP es1 es2 = RTSP $ \ev -> if p $ eventValue ev then ifStream (esProcess ev es1) es2 else ifStream es1 (esProcess ev es2) ifStream Finish Finish = Finish ifStream Finish es@(Wait _) = Wait $ ifRTSP Finish es ifStream Finish es@(Emit e es1 _) = Emit e (ifStream Finish es1) (ifRTSP Finish es) ifStream es@(Wait _) Finish = Wait $ ifRTSP es Finish ifStream es@(Emit e es1 _) Finish = Emit e (ifStream es1 Finish) (ifRTSP es Finish) ifStream es1@(Wait _) es2@(Wait _) = Wait $ ifRTSP es1 es2 ifStream es1@(Emit e es1a _) es2@(Wait _) = Emit e (ifStream es1a es2) (ifRTSP es1 es2) ifStream es1@(Wait _) es2@(Emit e es2a _) = Emit e (ifStream es1 es2a) (ifRTSP es1 es2) ifStream es1@(Emit e1 es1a _) es2@(Emit e2 es2a _) = if e1 `isBefore` e2 then Emit e1 (ifStream es1a es2) (ifRTSP es1 es2) else Emit e2 (ifStream es1 es2a) (ifRTSP es1 es2) -- | Real-time Actions. This monad is used to build sequential processors that can be turned into stream processors. -- An RTA emits zero or more events in response to each input event, and has a state that persists from one event to the next. -- In particular, state changes made after a "pause" will be visible to the next event regardless of the relative times. newtype RTA s c v = RTA {unRTA :: s -> Seq (Event c) -> UTCTime -> (v, s, Seq (Event c), UTCTime)} {- In the RTA definition code, the following initial variable letters are used: b - The type of input events c - The type of output events f - A function of whatever type. q - Queue of output events, of type Seq (Event c) s - Used for both the type and value of the state. t - Time, of type UTCTime v - Used for both the value returned by the current action and its type. z - Non-termination flag. True if the RTSP should respond to future events. -} instance Functor (RTA s c) where fmap f rv = RTA $ \s q t -> let (v1, s1, q1, t1) = unRTA rv s q t in (f v1, s1, q1, t1) instance Monad (RTA s c) where return v = RTA $ \ s q t -> (v, s, q, t) rv >>= f = RTA $ \s q t -> let (v1, s1, q1, t1) = unRTA rv s q t in unRTA (f v1) s1 q1 t1 -- | Get the current time. This is the event time plus any pauses. now :: RTA s c UTCTime now = RTA $ \s q t -> (t, s, q, t) -- | Get the current state. get :: RTA s c s get = RTA $ \s q t -> (s, s, q, t) -- | Put the current state. put :: s -> RTA s c () put v = RTA $ \_ q t -> ((), v, q, t) -- | Apply a function to the current state. modify :: (s -> s) -> RTA s c () modify f = fmap f get >>= put -- | Emit a value as an event. emit :: c -> RTA s c () emit v = RTA $ \s q t -> ((), s, q |> Event t v, t) -- | Pause before the next step. This does not actually delay processing; it merely increments the time of any emitted events. pause :: NominalDiffTime -> RTA s c () pause dt | dt >= 0 = RTA $ \s q t -> ((), s, q, addUTCTime dt t) | otherwise = error $ "pause: negative interval of " ++ show dt -- | Execute an RTA as part of a real time stream processor. -- -- When a new event arrives any pending output events will be lost. However any state changes are immediately visible to the -- next event, even if they occured \"after\" the lost events. For instance, consider this: -- -- > execRTA 1 $ \_ -> do -- > n <- get -- > pause 10 -- > emit n -- > put (n+1) -- > return True -- -- If this receives events at @t=[0,1,3,20]@ then it will emit @[Event 13 3, Event 30 4]@. The events that would have been emitted -- at @t=[10,11]@ have been lost, but the state change still occured immediately, regardless of the output schedule. execRTA :: s -- ^ The initial state. State persists between input events. -> (b -> RTA s c Bool) -- ^ A function from the input value to an action. If the action returns @True@ then subsequent input events -- will run the action again. If it returns @False@ then the RTSP finishes and will not respond to further events. -> RTSP b c execRTA s f = RTSP $ \ev -> let t = eventTime ev v = eventValue ev (z, s1, q, _) = unRTA (f v) s empty t queueStream q1 = case viewl q1 of EmptyL -> if z then Wait $ execRTA s1 f else nullStream c :< q2 -> Emit c (queueStream q2) (if z then execRTA s1 f else eventSink) in queueStream q -- | Like "execRTA", except that output events are accumulated. accumulateRTA :: s -> (b -> RTA s c Bool) -> RTSP b c accumulateRTA s f = accumulate $ execRTA s f