module Simulation.Aivika.Distributed.Optimistic.Internal.KeepAliveManager
(KeepAliveManager,
KeepAliveParams(..),
newKeepAliveManager,
addKeepAliveReceiver,
remonitorKeepAliveReceiver,
existsKeepAliveReceiver,
trySendKeepAlive,
trySendKeepAliveUTC) where
import qualified Data.Map as M
import Data.Maybe
import Data.IORef
import Data.Time.Clock
import Control.Monad
import Control.Monad.Trans
import qualified Control.Distributed.Process as DP
import Simulation.Aivika.Distributed.Optimistic.Internal.Priority
import Simulation.Aivika.Distributed.Optimistic.Internal.Message
data KeepAliveParams =
KeepAliveParams { keepAliveLoggingPriority :: Priority,
keepAliveInterval :: Int
}
data KeepAliveManager =
KeepAliveManager { keepAliveParams :: KeepAliveParams,
keepAliveTimestamp :: IORef UTCTime,
keepAliveReceivers :: IORef (M.Map DP.ProcessId KeepAliveReceiver)
}
data KeepAliveReceiver =
KeepAliveReceiver { keepAliveReceiverProcess :: DP.ProcessId,
keepAliveReceiverMonitor :: IORef DP.MonitorRef
}
newKeepAliveManager :: KeepAliveParams -> IO KeepAliveManager
newKeepAliveManager ps =
do timestamp <- getCurrentTime >>= newIORef
receivers <- newIORef M.empty
return KeepAliveManager { keepAliveParams = ps,
keepAliveTimestamp = timestamp,
keepAliveReceivers = receivers }
addKeepAliveReceiver :: KeepAliveManager -> DP.ProcessId -> DP.Process ()
addKeepAliveReceiver manager pid =
do
logKeepAliveManager manager INFO $ "Monitoring process " ++ show pid
r <- DP.monitor pid
r2 <- liftIO $ newIORef r
let x = KeepAliveReceiver pid r2
liftIO $
modifyIORef (keepAliveReceivers manager) $
M.insert pid x
remonitorKeepAliveReceiver :: KeepAliveManager -> DP.ProcessId -> DP.Process ()
remonitorKeepAliveReceiver manager pid =
do rs <- liftIO $ readIORef (keepAliveReceivers manager)
case M.lookup pid rs of
Nothing ->
do
logKeepAliveManager manager WARNING $ "Could not find the monitored process " ++ show pid
Just x ->
do
logKeepAliveManager manager NOTICE $ "Re-monitoring process " ++ show pid
r <- liftIO $ readIORef (keepAliveReceiverMonitor x)
r' <- DP.monitor pid
DP.unmonitor r
liftIO $ writeIORef (keepAliveReceiverMonitor x) r'
existsKeepAliveReceiver :: KeepAliveManager -> DP.ProcessId -> IO Bool
existsKeepAliveReceiver manager pid =
readIORef (keepAliveReceivers manager) >>=
return . M.member pid
trySendKeepAlive :: KeepAliveManager -> DP.Process ()
trySendKeepAlive manager =
do empty <- liftIO $ fmap M.null $ readIORef (keepAliveReceivers manager)
unless empty $
do utc <- liftIO getCurrentTime
trySendKeepAliveUTC manager utc
trySendKeepAliveUTC :: KeepAliveManager -> UTCTime -> DP.Process ()
trySendKeepAliveUTC manager utc =
do empty <- liftIO $ fmap M.null $ readIORef (keepAliveReceivers manager)
unless empty $
do f <- liftIO $ shouldSendKeepAlive manager utc
when f $
do
logKeepAliveManager manager INFO $
"Sending a keep-alive message"
liftIO $ writeIORef (keepAliveTimestamp manager) utc
rs <- liftIO $ readIORef (keepAliveReceivers manager)
forM_ rs $ \r ->
let pid = keepAliveReceiverProcess r
in DP.send pid KeepAliveMessage
shouldSendKeepAlive :: KeepAliveManager -> UTCTime -> IO Bool
shouldSendKeepAlive manager utc =
do utc0 <- readIORef (keepAliveTimestamp manager)
let dt = fromRational $ toRational (diffUTCTime utc utc0)
return $
secondsToMicroseconds dt > (keepAliveInterval $ keepAliveParams manager)
secondsToMicroseconds :: Double -> Int
secondsToMicroseconds x = fromInteger $ toInteger $ round (1000000 * x)
logKeepAliveManager :: KeepAliveManager -> Priority -> String -> DP.Process ()
logKeepAliveManager manager p message =
when (keepAliveLoggingPriority (keepAliveParams manager) <= p) $
DP.say $
embracePriority p ++ " " ++ message