module Control.Event (
EventId
,EventSystem
,noEvent
,initEventSystem
,addEvent
,addEventSTM
,cancelEvent
,cancelEventSTM
,evtSystemSize
) where
import Prelude hiding (lookup)
import Control.Concurrent (forkIO, myThreadId, ThreadId, threadDelay)
import Control.Concurrent.STM
import Control.Exception (throwDynTo, catchDyn, block, unblock)
import Control.Monad (forever)
import Data.Dynamic
import Data.List (partition, deleteBy)
import Data.Map (Map, empty, findMin, deleteFindMin, insertLookupWithKey, adjust, size, singleton, toList, insert, updateLookupWithKey, delete, lookup, fold)
import System.Time (TimeDiff(..), ClockTime(..), diffClockTimes, getClockTime)
import GHC.Conc
type EventNumber = Int
type EventSet = (EventNumber, Map EventNumber (IO ()))
singletonSet :: (IO ()) -> EventSet
singletonSet a = (1, singleton 0 a)
data EventId = EvtId ClockTime EventNumber deriving (Eq, Ord, Show)
noEvent = EvtId (TOD (1) (1)) (1)
data EventSystem = EvtSys {
esEvents :: TVar (Map ClockTime EventSet),
esThread :: TVar (Maybe ThreadId),
esAlarm :: TVar ClockTime,
esNewAlarm :: TVar Bool,
esExpired :: TVar [[EventSet]]
}
initEventSystem :: IO EventSystem
initEventSystem = do
evts <- newTVarIO empty
tid <- newTVarIO Nothing
alm <- newTVarIO (TOD (1) (1))
new <- newTVarIO False
exp <- newTVarIO []
let evtSys = EvtSys evts tid alm new exp
forkIO $ forever $ trackAlarm evtSys
forkIO $ forever $ monitorExpiredQueue exp
forkIO $ expireEvents evtSys
return evtSys
expireEvents :: EventSystem -> IO ()
expireEvents es = do
block (do
tid <- myThreadId
forever $ catchDyn (unblock (setTID (Just tid) es >> expireEvents' es))
(\TimerReset -> return ()) )
where
setTID i es = atomically (writeTVar (esThread es) i)
expireEvents' :: EventSystem -> IO ()
expireEvents' evtSys = do
usDelay <- determineDelay
threadDelay usDelay
runExpire evtSys
where
determineDelay :: IO Int
determineDelay = do
alm <- atomically (do
evts <- readTVar (esEvents evtSys)
case findMinM evts of
Nothing -> retry
Just (c,_) -> return c )
now <- getClockTime
return $ timeDiffToMicroSec $ diffClockTimes alm now
findMinM :: Map ClockTime EventSet -> Maybe (ClockTime,EventSet)
findMinM m | size m == 0 = Nothing
| otherwise = Just $ findMin m
runExpire :: EventSystem -> IO ()
runExpire evtSys = do
now <- getClockTime
atomically (do evts <- readTVar (esEvents evtSys)
let (exp, newMap) = getEarlierKeys now evts
newAlarm = getAlarm newMap
writeTVar (esAlarm evtSys) newAlarm
writeTVar (esEvents evtSys) newMap
if (fold (\(_,m) n -> size m + n) 0 newMap) + sum (map (\(_,m) -> size m) exp) /= (fold (\(_,m) n -> size m + n) 0 evts)
then unsafeIOToSTM $ putStrLn "Expire is dropping events."
else return ()
exps <- readTVar (esExpired evtSys)
writeTVar (esExpired evtSys) (exp:exps) )
where
getEarlierKeys :: ClockTime -> Map ClockTime EventSet -> ([EventSet], Map ClockTime EventSet)
getEarlierKeys clk m =
case deleteFindMinM m of
Just ((k,es), m') ->
if k < clk
then let (exp, lastMap) = getEarlierKeys clk m'
in (es:exp, lastMap)
else ([], m)
Nothing -> ([], m)
getAlarm m | size m == 0 = TOD (1) (1)
| otherwise = fst $ findMin m
deleteFindMinM :: Map k a -> Maybe ((k, a), Map k a)
deleteFindMinM m = if size m == 0 then Nothing else Just (deleteFindMin m)
monitorExpiredQueue :: TVar [[EventSet]] -> IO ()
monitorExpiredQueue exp = do
exp <- atomically (do
e <- readTVar exp
case e of
(a:as) -> writeTVar exp [] >> return e
_ -> retry )
mapM_ (mapM_ runEvents) exp
runEvents :: EventSet -> IO ()
runEvents (_,set) = do
let actions = map snd (toList set)
sequence_ actions
addEvent :: EventSystem -> ClockTime -> IO () -> IO EventId
addEvent sys clk act = atomically (addEventSTM sys clk act)
addEventSTM :: EventSystem -> ClockTime -> IO () -> STM EventId
addEventSTM sys clk act = do
evts <- readTVar (esEvents sys)
let (old, newMap) = insertLookupWithKey (\_ _ o -> insertEvent o) clk (singletonSet act) evts
num = case old of
Nothing -> 0
Just (n,_) -> n
eid = EvtId clk num
writeTVar (esEvents sys) newMap
alm <- readTVar (esAlarm sys)
if clk < alm || alm == (TOD (1) (1))
then writeTVar (esAlarm sys) clk >> writeTVar (esNewAlarm sys) True
else return ()
if fold (\(_,m) n -> n + size m) 0 newMap /= fold (\(_,m) n -> n + size m) 1 evts
then unsafeIOToSTM $ putStrLn "Event mapping has not grown!"
else return ()
return eid
where
insertEvent :: EventSet -> EventSet
insertEvent (num,set) | num == maxBound = error "maxBound events at given time, something is broken."
| otherwise =
(num+1, insert num act set)
cancelEvent :: EventSystem -> EventId -> IO Bool
cancelEvent sys eid = atomically (cancelEventSTM sys eid)
cancelEventSTM :: EventSystem -> EventId -> STM Bool
cancelEventSTM sys eid@(EvtId clk num) = do
evts <- readTVar (esEvents sys)
let newMap :: Map ClockTime EventSet
prev :: Maybe EventSet
(prev,newMap) = updateLookupWithKey (\_ (num, old) -> Just (num,delete num old)) clk evts
ret = case prev of
Nothing -> False
Just (_,p) -> case lookup clk newMap of
Nothing -> False
Just (_,m) -> (size p /= size m)
writeTVar (esEvents sys) newMap
return ret
evtSystemSize :: EventSystem -> STM Int
evtSystemSize sys = do
evts <- readTVar (esEvents sys)
return $ fold (\(_,m) n -> n + size m) 0 evts
trackAlarm :: EventSystem -> IO ()
trackAlarm sys = do
tid <- atomically (do
newAlm <- readTVar (esNewAlarm sys)
if newAlm then writeTVar (esNewAlarm sys) False else retry
tid <- readTVar (esThread sys)
i <- case tid of
Just i -> return i
Nothing -> retry
return i )
throwDynTo tid TimerReset
timeDiffToMicroSec :: TimeDiff -> Int
timeDiffToMicroSec (TimeDiff _ _ _ _ _ sec picosec) =
if realTime > fromIntegral (maxBound :: Int)
then maxBound
else fromIntegral realTime
where
realTime :: Integer
realTime = (fromIntegral sec) * (10^6) + fromIntegral (picosec `div` (10^6))
data TimerReset = TimerReset deriving (Eq, Ord, Show, Typeable)