module Simulation.Aivika.Distributed.Optimistic.Internal.TransientMessageQueue
(TransientMessageQueue,
newTransientMessageQueue,
transientMessageQueueSize,
transientMessageQueueTime,
enqueueTransientMessage,
processAcknowledgmentMessage,
acknowledgmentMessageTime,
resetAcknowledgmentMessageTime,
deliverAcknowledgmentMessage,
deliverAcknowledgmentMessages) where
import qualified Data.Set as S
import Data.List
import Data.IORef
import Control.Monad
import Control.Monad.Trans
import qualified Control.Distributed.Process as DP
import Simulation.Aivika.Distributed.Optimistic.Internal.Message
import Simulation.Aivika.Distributed.Optimistic.Internal.DIO
import Simulation.Aivika.Distributed.Optimistic.Internal.IO
data TransientMessageQueue =
TransientMessageQueue { queuePrototypeMessages :: IORef (S.Set TransientMessageQueueItem),
queueMarkedMessageTime :: IORef Double
}
data TransientMessageQueueItem =
TransientMessageQueueItem { itemSequenceNo :: Int,
itemSendTime :: Double,
itemReceiveTime :: Double,
itemSenderId :: DP.ProcessId,
itemReceiverId :: DP.ProcessId,
itemAntiToggle :: Bool
} deriving (Eq, Show)
instance Ord TransientMessageQueueItem where
x <= y
| (itemReceiveTime x < itemReceiveTime y) = True
| (itemReceiveTime x > itemReceiveTime y) = False
| (itemSendTime x < itemSendTime y) = True
| (itemSendTime x > itemSendTime y) = False
| (itemSequenceNo x < itemSequenceNo y) = True
| (itemSequenceNo x > itemSequenceNo y) = False
| (itemReceiverId x < itemReceiverId y) = True
| (itemReceiverId x > itemReceiverId y) = False
| (itemSenderId x < itemSenderId y) = True
| (itemSenderId x > itemSenderId y) = False
| (itemAntiToggle x < itemAntiToggle y) = True
| (itemAntiToggle x > itemAntiToggle y) = False
| otherwise = True
transientMessageQueueItem :: Message -> TransientMessageQueueItem
transientMessageQueueItem m =
TransientMessageQueueItem { itemSequenceNo = messageSequenceNo m,
itemSendTime = messageSendTime m,
itemReceiveTime = messageReceiveTime m,
itemSenderId = messageSenderId m,
itemReceiverId = messageReceiverId m,
itemAntiToggle = messageAntiToggle m }
acknowledgmentMessageQueueItem :: AcknowledgmentMessage -> TransientMessageQueueItem
acknowledgmentMessageQueueItem m =
TransientMessageQueueItem { itemSequenceNo = acknowledgmentSequenceNo m,
itemSendTime = acknowledgmentSendTime m,
itemReceiveTime = acknowledgmentReceiveTime m,
itemSenderId = acknowledgmentSenderId m,
itemReceiverId = acknowledgmentReceiverId m,
itemAntiToggle = acknowledgmentAntiToggle m }
newTransientMessageQueue :: DIO TransientMessageQueue
newTransientMessageQueue =
do ms <- liftIOUnsafe $ newIORef S.empty
r <- liftIOUnsafe $ newIORef (1 / 0)
return TransientMessageQueue { queuePrototypeMessages = ms,
queueMarkedMessageTime = r }
transientMessageQueueSize :: TransientMessageQueue -> IO Int
transientMessageQueueSize q =
fmap S.size $ readIORef (queuePrototypeMessages q)
transientMessageQueueTime :: TransientMessageQueue -> IO Double
transientMessageQueueTime q =
do s <- readIORef (queuePrototypeMessages q)
if S.null s
then return (1 / 0)
else let m = S.findMin s
in return (itemReceiveTime m)
enqueueTransientMessage :: TransientMessageQueue -> Message -> IO ()
enqueueTransientMessage q m =
modifyIORef (queuePrototypeMessages q) $
S.insert (transientMessageQueueItem m)
enqueueAcknowledgmentMessage :: TransientMessageQueue -> AcknowledgmentMessage -> IO ()
enqueueAcknowledgmentMessage q m =
modifyIORef' (queueMarkedMessageTime q) $
min (acknowledgmentReceiveTime m)
processAcknowledgmentMessage :: TransientMessageQueue -> AcknowledgmentMessage -> IO ()
processAcknowledgmentMessage q m =
do modifyIORef (queuePrototypeMessages q) $
S.delete (acknowledgmentMessageQueueItem m)
when (acknowledgmentMarked m) $
enqueueAcknowledgmentMessage q m
acknowledgmentMessageTime :: TransientMessageQueue -> IO Double
acknowledgmentMessageTime q =
readIORef (queueMarkedMessageTime q)
resetAcknowledgmentMessageTime :: TransientMessageQueue -> IO ()
resetAcknowledgmentMessageTime q =
writeIORef (queueMarkedMessageTime q) (1 / 0)
deliverAcknowledgmentMessage :: AcknowledgmentMessage -> DIO ()
deliverAcknowledgmentMessage x =
liftDistributedUnsafe $
DP.send (acknowledgmentSenderId x) (AcknowledgmentQueueMessage x)
deliverAcknowledgmentMessages :: [AcknowledgmentMessage] -> DIO ()
deliverAcknowledgmentMessages xs =
let ys = groupBy (\a b -> acknowledgmentSenderId a == acknowledgmentSenderId b) xs
dlv [] = return ()
dlv zs@(z : _) =
liftDistributedUnsafe $
DP.send (acknowledgmentSenderId z) (AcknowledgmentQueueMessageBulk zs)
in forM_ ys dlv