-- |This module can execute events at specified time. It uses a two thread -- system that allows the STM adding and deleting of new threads without -- requiring later IO actions. An ability to place arbitrary event preprocessing -- when adding each event exists, but in a course grained manner. This -- feature can be expanded on request. -- -- This is very like control-timeout, but was developed separately internal -- operation is similar with a thread sleeping via threadDelay and EventIds -- being based in part on expire time. It differs in that control-event is: -- * More complex -- * Requires initilization -- * Allows pure STM adding and removing of events (no post STM IO action) -- * Allows user control over event systems (can have more than one) -- * Allows events to run in event handler thread -- (advisable if thread spark is too expensive / computation is cheap) -- * No possible duplication of EventId (theoretical! no real advantage) -- -- On the other hand, a shim could be made to provide the -- control-timeout API with Control.Event running under the hood. module Control.Event ( Event ,EventId ,EventSystem ,noEvent ,initEventSystem ,setEventPreprocessing ,addEvent ,addEventSTM ,cancelEvent ,cancelEventSTM ,alwaysForkEvents ,neverForkEvents ,printMessageOnEvents ) where import Control.Concurrent (forkIO, myThreadId, ThreadId, threadDelay) import Control.Concurrent.STM import Control.Exception (throwDynTo, catchDyn, block, unblock) import Control.Monad (forever) import Data.Dynamic import Data.List (partition) import Data.Map (Map, empty) import Data.Word (Word32) import System.Time (TimeDiff(..), ClockTime(..), diffClockTimes, getClockTime) -- |IDs the program can use to cancel previously scheduled events. data EventId = EvtId ClockTime Word32 deriving (Eq, Ord, Show) noEvent = EvtId (TOD (-1) (-1)) 0 -- |The basic event structure, exported to allow custom function for setEventPreprocessing data Event = Evt { evtId :: EventId, evtAction :: IO (), evtUseOwnThread :: Bool } evtExpire :: Event -> ClockTime evtExpire (Evt (EvtId clk _) _ _) = clk -- |The event system can either be initilized and passed as state or a global -- system can be declared using gEvtSys = unsafePeformIO initEventSystem data EventSystem = EvtSys { esEvents :: TVar [Event], -- Pending Events esThread :: TVar (Maybe ThreadId), -- Id of thread for TimerReset exceptions esAlarm :: TVar ClockTime, -- Time of soonest event esPreProcessing :: TVar (Event -> Event) -- Event preprocessing } -- |The only way to get an event system is to initilize one, which sets internal TVars -- and sparks two threads (one to expire events, one to look if you've added an -- event expiring before the current alarm). initEventSystem :: IO EventSystem initEventSystem = do evts <- newTVarIO ([] :: [Event]) tid <- newTVarIO Nothing alm <- newTVarIO (TOD (-1) (-1)) pp <- newTVarIO (id :: (Event -> Event)) let evtSys = EvtSys evts tid alm pp forkIO $ forever $ trackAlarm evtSys forkIO $ expireEvents evtSys return evtSys where maxEvents = show (maxBound :: Word32) -- |Main thread that delays till the alarm time then executes any expired events. -- Asynchronous 'TimerReset' exceptions might occur to indicate a new, earlier, alarm time. expireEvents :: EventSystem -> IO () expireEvents es = do block (do tid <- myThreadId forever $ catchDyn (unblock (setTID (Just tid) es >> expireEvents' es)) (\TimerReset -> return ()) ) where setTID i es = atomically (writeTVar (esThread es) i) -- |Worker function for expireEvents - the parent simply catches the exceptions expireEvents' :: EventSystem -> IO () expireEvents' evtSys = do usDelay <- determineDelay threadDelay usDelay runExpire evtSys where determineDelay :: IO Int determineDelay = do alm <- atomically (do alm <- readTVar (esAlarm evtSys) case alm of (TOD (-1) (-1)) -> retry time -> return time ) now <- getClockTime return $ timeDiffToMicroSec $ diffClockTimes alm now -- |Determines which events are expired, running all their actions runExpire :: EventSystem -> IO () runExpire evtSys = do now <- getClockTime evts <- atomically (do evts <- readTVar (esEvents evtSys) let (expired, remain) = partition ((< now) . evtExpire) evts newAlarm = getAlarm remain writeTVar (esEvents evtSys) remain writeTVar (esAlarm evtSys) newAlarm return expired ) runEvents evts where getAlarm [] = TOD (-1) (-1) getAlarm (e:_) = evtExpire e -- |Runs all provided events (which must have expired) runEvents :: [Event] -> IO () runEvents [] = return () runEvents ((Evt _ act True) :es) = (forkIO act) >> runEvents es runEvents ((Evt _ act _ ) :es) = act >> runEvents es -- |Add an *action* to be performed at *time* by *system*, returning a unique id. addEvent :: EventSystem -> ClockTime -> IO () -> IO EventId addEvent sys clk act = atomically (addEventSTM sys clk act) -- |Atomically add an action to be performed at specified time and returning a unique id. addEventSTM :: EventSystem -> ClockTime -> IO () -> STM EventId addEventSTM sys clk act = do evts <- readTVar (esEvents sys) pp <- readTVar (esPreProcessing sys) let evt = pp $ Evt (EvtId clk 0) act True (newEvts, eid) = insertEvent evts evt 0 writeTVar (esEvents sys) newEvts alm <- readTVar (esAlarm sys) if clk < alm || alm == (TOD (-1) (-1)) then writeTVar (esAlarm sys) clk else return () return eid where insertEvent :: [Event] -> Event -> Word32 -> ([Event],EventId) insertEvent [] n i = let eid = EvtId clk i in ([n { evtId = EvtId clk i }], eid) insertEvent (e:es) n i | i == maxBound = error "Unreasonably large number of events (maxBound::Word32)" | evtExpire n == evtExpire e = let (lst,eid) = insertEvent es n (i+1) in (e:lst, eid) | evtExpire n < evtExpire e = let eid = EvtId clk i in (n { evtId = eid } : e : es, eid) | otherwise = let (lst, eid) = insertEvent es n i in (e:lst, eid) -- |Cancel an event from the system, returning True on success. cancelEvent :: EventSystem -> EventId -> IO Bool cancelEvent sys eid = atomically (cancelEventSTM sys eid) -- |Atomically cancel an event from the system, returning True on success. cancelEventSTM :: EventSystem -> EventId -> STM Bool cancelEventSTM sys eid = do evts <- readTVar (esEvents sys) let (newEvts,ret) = deleteOrd evts eid writeTVar (esEvents sys) newEvts return ret where deleteOrd :: [Event] -> EventId -> ([Event],Bool) deleteOrd [] _ = ([],False) deleteOrd (e:es) eid@(EvtId clk num) | evtExpire e > clk = (e : es, False) | evtId e == eid = (es, True) | otherwise = let (es',ret) = (deleteOrd es eid) in (e : es', ret) -- |Tracks the alarm time and the earliest event. If an earlier event is added -- the alarm time is updated and TimerReset is thrown to the expireEvent thread trackAlarm :: EventSystem -> IO () trackAlarm sys = do tid <- atomically (do tid <- readTVar (esThread sys) i <- case tid of Just i -> return i Nothing -> retry alm <- readTVar (esAlarm sys) evts <- readTVar (esEvents sys) case evts of [] -> retry (e:_) -> if alm > evtExpire e || alm == (TOD (-1) (-1)) then writeTVar (esAlarm sys) (evtExpire e) else retry return i ) throwDynTo tid TimerReset -- |Returns the time difference in microseconds (potentially returning maxBound <= the real difference) timeDiffToMicroSec :: TimeDiff -> Int timeDiffToMicroSec (TimeDiff _ _ _ _ _ sec picosec) = if realTime > fromIntegral (maxBound :: Int) then maxBound else fromIntegral realTime where realTime :: Integer realTime = (fromIntegral sec) * (10^6) + fromIntegral (picosec `div` (10^6)) data TimerReset = TimerReset deriving (Eq, Ord, Show, Typeable) -- Start extra bloat here -- -- |You can set a modifier on each event as its added to the event system. -- alwaysForkEvents and neverForkEvents are two such premade functions. -- Default is 'id'. setEventPreprocessing :: EventSystem -> (Event -> Event) -> STM () setEventPreprocessing sys pp = writeTVar (esPreProcessing sys) pp -- |A premade event preprocessor to always fork a new Haskell thread for each event (default) -- If used as an argument to *setEventPreprocessor* it will effect all newly added events. alwaysForkEvents :: Event -> Event alwaysForkEvents evt = evt { evtUseOwnThread = True } -- |A premade event preprocessor to never fork a Haskell thread for new events. -- If used as an argument to *setEventPreprocessor* it will effect all newly added events. -- In testing this harms the event system scalability, but it might save a bit of CPU time -- in some use cases. neverForkEvents :: Event -> Event neverForkEvents e = e { evtUseOwnThread = False } -- |A premade event preprocessor that prints the provided debug message before each event. printMessageOnEvents :: String -> Event -> Event printMessageOnEvents str = \evt -> evt { evtAction = putStrLn str >> evtAction evt }