module Simulation.Aivika.Distributed.Optimistic.Internal.KeepAliveManager
(KeepAliveManager,
KeepAliveParams(..),
newKeepAliveManager,
addKeepAliveReceiver,
trySendKeepAlive,
trySendKeepAliveUTC) where
import qualified Data.Set as S
import Data.Maybe
import Data.IORef
import Data.Time.Clock
import Control.Monad
import Control.Monad.Trans
import qualified Control.Distributed.Process as DP
import Simulation.Aivika.Distributed.Optimistic.Internal.Priority
import Simulation.Aivika.Distributed.Optimistic.Internal.Message
data KeepAliveParams =
KeepAliveParams { keepAliveLoggingPriority :: Priority,
keepAliveInterval :: Int
}
data KeepAliveManager =
KeepAliveManager { keepAliveParams :: KeepAliveParams,
keepAliveTimestamp :: IORef UTCTime,
keepAliveReceivers :: IORef (S.Set DP.ProcessId)
}
newKeepAliveManager :: KeepAliveParams -> IO KeepAliveManager
newKeepAliveManager ps =
do timestamp <- getCurrentTime >>= newIORef
receivers <- newIORef S.empty
return KeepAliveManager { keepAliveParams = ps,
keepAliveTimestamp = timestamp,
keepAliveReceivers = receivers }
addKeepAliveReceiver :: KeepAliveManager -> DP.ProcessId -> IO ()
addKeepAliveReceiver manager pid =
modifyIORef (keepAliveReceivers manager) $
S.insert pid
trySendKeepAlive :: KeepAliveManager -> DP.Process ()
trySendKeepAlive manager =
do empty <- liftIO $ fmap S.null $ readIORef (keepAliveReceivers manager)
unless empty $
do utc <- liftIO getCurrentTime
trySendKeepAliveUTC manager utc
trySendKeepAliveUTC :: KeepAliveManager -> UTCTime -> DP.Process ()
trySendKeepAliveUTC manager utc =
do empty <- liftIO $ fmap S.null $ readIORef (keepAliveReceivers manager)
unless empty $
do f <- liftIO $ shouldSendKeepAlive manager utc
when f $
do
logKeepAliveManager manager INFO $
"Sending a keep-alive message"
liftIO $ writeIORef (keepAliveTimestamp manager) utc
pids <- liftIO $ readIORef (keepAliveReceivers manager)
forM_ pids $ \pid ->
DP.send pid KeepAliveMessage
shouldSendKeepAlive :: KeepAliveManager -> UTCTime -> IO Bool
shouldSendKeepAlive manager utc =
do utc0 <- readIORef (keepAliveTimestamp manager)
let dt = fromRational $ toRational (diffUTCTime utc utc0)
return $
secondsToMicroseconds dt > (keepAliveInterval $ keepAliveParams manager)
secondsToMicroseconds :: Double -> Int
secondsToMicroseconds x = fromInteger $ toInteger $ round (1000000 * x)
logKeepAliveManager :: KeepAliveManager -> Priority -> String -> DP.Process ()
logKeepAliveManager manager p message =
when (keepAliveLoggingPriority (keepAliveParams manager) <= p) $
DP.say $
embracePriority p ++ " " ++ message