{-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE NumericUnderscores #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE StandaloneDeriving #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE UndecidableInstances #-}
module OM.Legion.Runtime (
forkLegionary,
Runtime,
StartupMode(..),
MonadConstraints,
applyFast,
applyConsistent,
readState,
call,
cast,
broadcall,
broadcast,
eject,
getSelf,
getClusterName,
getStats,
Stats(..),
) where
import Control.Arrow ((&&&))
import Control.Concurrent (Chan, newChan, readChan, threadDelay,
writeChan)
import Control.Exception.Safe (MonadCatch, tryAny)
import Control.Monad (unless, void, when)
import Control.Monad.IO.Class (MonadIO(liftIO))
import Control.Monad.IO.Unlift (MonadUnliftIO)
import Control.Monad.Logger.CallStack (LoggingT(runLoggingT),
MonadLoggerIO(askLoggerIO), LogStr, MonadLogger, logDebug, logError,
logInfo)
import Control.Monad.State (MonadState(get, put), StateT, evalStateT,
gets, modify')
import Control.Monad.Trans.Class (lift)
import Data.Aeson (ToJSON)
import Data.Binary (Binary)
import Data.ByteString.Lazy (ByteString)
import Data.CRDT.EventFold (Event(Output, State),
UpdateResult(urEventFold, urOutputs), Diff, EventFold, EventId,
infimumId, projParticipants)
import Data.CRDT.EventFold.Monad (MonadUpdateEF(diffMerge, disassociate,
event, fullMerge, participate), EventFoldT, runEventFoldT)
import Data.Conduit ((.|), awaitForever, runConduit, yield)
import Data.Default.Class (Default)
import Data.IORef (IORef, atomicModifyIORef', newIORef, readIORef)
import Data.Map (Map)
import Data.Set ((\\), Set)
import Data.Time (DiffTime, diffTimeToPicoseconds, picosecondsToDiffTime)
import Data.UUID (UUID)
import Data.UUID.V1 (nextUUID)
import GHC.Generics (Generic)
import Network.Socket (PortNumber)
import OM.Fork (Actor(Msg, actorChan), Race, Responder, race)
import OM.Legion.Conduit (chanToSink)
import OM.Legion.Connection (JoinResponse(JoinOk),
RuntimeState(RuntimeState, rsBroadcalls, rsCalls, rsClusterState,
rsConnections, rsJoins, rsNextId, rsNotify, rsSelf, rsWaiting),
EventConstraints, disconnect, peerMessagePort, sendPeer)
import OM.Legion.MsgChan (MessageId(M), Peer(unPeer), PeerMessage(PMCall,
PMCallResponse, PMCast, PMFullMerge, PMMerge, PMOutputs), ClusterName)
import OM.Logging (withPrefix)
import OM.Show (showj, showt)
import OM.Socket (AddressDescription(AddressDescription), connectServer,
openIngress, openServer)
import OM.Time (MonadTimeSpec(getTime), addTime, diffTimeSpec)
import System.Clock (TimeSpec)
import System.Random.Shuffle (shuffleM)
import qualified Data.Binary as Binary
import qualified Data.CRDT.EventFold as EF
import qualified Data.Map as Map
import qualified Data.Set as Set
import qualified OM.Fork as Fork
{-# ANN module ("HLint: ignore Redundant <$>" :: String) #-}
{-# ANN module ("HLint: ignore Use underscore" :: String) #-}
type MonadConstraints m =
( MonadCatch m
, MonadFail m
, MonadLoggerIO m
, MonadTimeSpec m
, MonadUnliftIO m
, Race
)
forkLegionary
:: ( EventConstraints e
, MonadConstraints m
)
=> (ByteString -> IO ByteString)
-> (ByteString -> IO ())
-> (Peer -> EventFold ClusterName Peer e -> IO ())
-> Int
-> StartupMode e
-> m (Runtime e)
forkLegionary :: forall e (m :: * -> *).
(EventConstraints e, MonadConstraints m) =>
(ByteString -> IO ByteString)
-> (ByteString -> IO ())
-> (Peer -> EventFold ClusterName Peer e -> IO ())
-> Int
-> StartupMode e
-> m (Runtime e)
forkLegionary
ByteString -> IO ByteString
handleUserCall
ByteString -> IO ()
handleUserCast
Peer -> EventFold ClusterName Peer e -> IO ()
notify
Int
resendInterval
StartupMode e
startupMode
= do
Text -> m ()
forall (m :: * -> *). (HasCallStack, MonadLogger m) => Text -> m ()
logInfo (Text -> m ()) -> Text -> m ()
forall a b. (a -> b) -> a -> b
$ Text
"Starting up with the following Mode: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> StartupMode e -> Text
forall a b. (Show a, IsString b) => a -> b
showt StartupMode e
startupMode
RuntimeState e
rts <- (Peer -> EventFold ClusterName Peer e -> IO ())
-> StartupMode e -> m (RuntimeState e)
forall e (m :: * -> *).
(EventConstraints e, MonadLoggerIO m) =>
(Peer -> EventFold ClusterName Peer e -> IO ())
-> StartupMode e -> m (RuntimeState e)
makeRuntimeState Peer -> EventFold ClusterName Peer e -> IO ()
notify StartupMode e
startupMode
RChan e
runtimeChan <- Chan (RuntimeMessage e) -> RChan e
forall e. Chan (RuntimeMessage e) -> RChan e
RChan (Chan (RuntimeMessage e) -> RChan e)
-> m (Chan (RuntimeMessage e)) -> m (RChan e)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO (Chan (RuntimeMessage e)) -> m (Chan (RuntimeMessage e))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (Chan (RuntimeMessage e))
forall a. IO (Chan a)
newChan
Loc -> Text -> LogLevel -> LogStr -> IO ()
logging <- LogStr
-> (Loc -> Text -> LogLevel -> LogStr -> IO ())
-> Loc
-> Text
-> LogLevel
-> LogStr
-> IO ()
withPrefix (Peer -> LogStr
logPrefix (RuntimeState e -> Peer
forall e. RuntimeState e -> Peer
rsSelf RuntimeState e
rts)) ((Loc -> Text -> LogLevel -> LogStr -> IO ())
-> Loc -> Text -> LogLevel -> LogStr -> IO ())
-> m (Loc -> Text -> LogLevel -> LogStr -> IO ())
-> m (Loc -> Text -> LogLevel -> LogStr -> IO ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> m (Loc -> Text -> LogLevel -> LogStr -> IO ())
forall (m :: * -> *).
MonadLoggerIO m =>
m (Loc -> Text -> LogLevel -> LogStr -> IO ())
askLoggerIO
IORef (Map Peer TimeSpec)
rStats <- IO (IORef (Map Peer TimeSpec)) -> m (IORef (Map Peer TimeSpec))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef (Map Peer TimeSpec)) -> m (IORef (Map Peer TimeSpec)))
-> IO (IORef (Map Peer TimeSpec)) -> m (IORef (Map Peer TimeSpec))
forall a b. (a -> b) -> a -> b
$ Map Peer TimeSpec -> IO (IORef (Map Peer TimeSpec))
forall a. a -> IO (IORef a)
newIORef Map Peer TimeSpec
forall a. Monoid a => a
mempty
(LoggingT m ()
-> (Loc -> Text -> LogLevel -> LogStr -> IO ()) -> m ()
forall (m :: * -> *) a.
LoggingT m a -> (Loc -> Text -> LogLevel -> LogStr -> IO ()) -> m a
`runLoggingT` Loc -> Text -> LogLevel -> LogStr -> IO ()
logging) (LoggingT m () -> m ()) -> LoggingT m () -> m ()
forall a b. (a -> b) -> a -> b
$
(ByteString -> IO ByteString)
-> (ByteString -> IO ())
-> Int
-> RuntimeState e
-> RChan e
-> IORef (Map Peer TimeSpec)
-> LoggingT m ()
forall e (m :: * -> *).
(Binary (Output e), Binary (State e), Binary e, Default (State e),
Eq (Output e), Eq e, Event Peer e, MonadCatch m, MonadFail m,
MonadLoggerIO m, MonadTimeSpec m, MonadUnliftIO m, Race,
Show (Output e), Show (State e), Show e, ToJSON (Output e),
ToJSON (State e), ToJSON e) =>
(ByteString -> IO ByteString)
-> (ByteString -> IO ())
-> Int
-> RuntimeState e
-> RChan e
-> IORef (Map Peer TimeSpec)
-> m ()
executeRuntime
ByteString -> IO ByteString
handleUserCall
ByteString -> IO ()
handleUserCast
Int
resendInterval
RuntimeState e
rts
RChan e
runtimeChan
IORef (Map Peer TimeSpec)
rStats
let
clusterId :: ClusterName
clusterId :: ClusterName
clusterId = EventFold ClusterName Peer e -> ClusterName
forall o p e. EventFold o p e -> o
EF.origin (RuntimeState e -> EventFold ClusterName Peer e
forall e. RuntimeState e -> EventFold ClusterName Peer e
rsClusterState RuntimeState e
rts)
Runtime e -> m (Runtime e)
forall (m :: * -> *) a. Monad m => a -> m a
return
Runtime :: forall e.
RChan e
-> Peer -> ClusterName -> IORef (Map Peer TimeSpec) -> Runtime e
Runtime
{ rChan :: RChan e
rChan = RChan e
runtimeChan
, rSelf :: Peer
rSelf = RuntimeState e -> Peer
forall e. RuntimeState e -> Peer
rsSelf RuntimeState e
rts
, rClusterId :: ClusterName
rClusterId = ClusterName
clusterId
, IORef (Map Peer TimeSpec)
rStats :: IORef (Map Peer TimeSpec)
rStats :: IORef (Map Peer TimeSpec)
rStats
}
where
logPrefix :: Peer -> LogStr
logPrefix :: Peer -> LogStr
logPrefix Peer
self_ = LogStr
"[" LogStr -> LogStr -> LogStr
forall a. Semigroup a => a -> a -> a
<> Peer -> LogStr
forall a b. (Show a, IsString b) => a -> b
showt Peer
self_ LogStr -> LogStr -> LogStr
forall a. Semigroup a => a -> a -> a
<> LogStr
"] "
data Runtime e = Runtime
{ forall e. Runtime e -> RChan e
rChan :: RChan e
, forall e. Runtime e -> Peer
rSelf :: Peer
, forall e. Runtime e -> ClusterName
rClusterId :: ClusterName
, forall e. Runtime e -> IORef (Map Peer TimeSpec)
rStats :: IORef (Map Peer TimeSpec)
}
instance Actor (Runtime e) where
type Msg (Runtime e) = RuntimeMessage e
actorChan :: Runtime e -> Msg (Runtime e) -> IO ()
actorChan = RChan e -> RuntimeMessage e -> IO ()
forall a. Actor a => a -> Msg a -> IO ()
actorChan (RChan e -> RuntimeMessage e -> IO ())
-> (Runtime e -> RChan e) -> Runtime e -> RuntimeMessage e -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Runtime e -> RChan e
forall e. Runtime e -> RChan e
rChan
newtype Stats = Stats
{ Stats -> Map Peer DiffTime
timeWithoutProgress :: Map Peer DiffTime
}
deriving stock ((forall x. Stats -> Rep Stats x)
-> (forall x. Rep Stats x -> Stats) -> Generic Stats
forall x. Rep Stats x -> Stats
forall x. Stats -> Rep Stats x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep Stats x -> Stats
$cfrom :: forall x. Stats -> Rep Stats x
Generic, Int -> Stats -> ShowS
[Stats] -> ShowS
Stats -> String
(Int -> Stats -> ShowS)
-> (Stats -> String) -> ([Stats] -> ShowS) -> Show Stats
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [Stats] -> ShowS
$cshowList :: [Stats] -> ShowS
show :: Stats -> String
$cshow :: Stats -> String
showsPrec :: Int -> Stats -> ShowS
$cshowsPrec :: Int -> Stats -> ShowS
Show, Stats -> Stats -> Bool
(Stats -> Stats -> Bool) -> (Stats -> Stats -> Bool) -> Eq Stats
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: Stats -> Stats -> Bool
$c/= :: Stats -> Stats -> Bool
== :: Stats -> Stats -> Bool
$c== :: Stats -> Stats -> Bool
Eq)
deriving anyclass ([Stats] -> Encoding
[Stats] -> Value
Stats -> Encoding
Stats -> Value
(Stats -> Value)
-> (Stats -> Encoding)
-> ([Stats] -> Value)
-> ([Stats] -> Encoding)
-> ToJSON Stats
forall a.
(a -> Value)
-> (a -> Encoding)
-> ([a] -> Value)
-> ([a] -> Encoding)
-> ToJSON a
toEncodingList :: [Stats] -> Encoding
$ctoEncodingList :: [Stats] -> Encoding
toJSONList :: [Stats] -> Value
$ctoJSONList :: [Stats] -> Value
toEncoding :: Stats -> Encoding
$ctoEncoding :: Stats -> Encoding
toJSON :: Stats -> Value
$ctoJSON :: Stats -> Value
ToJSON)
instance Binary Stats where
get :: Get Stats
get =
Map Peer DiffTime -> Stats
Stats (Map Peer DiffTime -> Stats)
-> Get (Map Peer DiffTime) -> Get Stats
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ((Integer -> DiffTime) -> Map Peer Integer -> Map Peer DiffTime
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Integer -> DiffTime
picosecondsToDiffTime (Map Peer Integer -> Map Peer DiffTime)
-> Get (Map Peer Integer) -> Get (Map Peer DiffTime)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Get (Map Peer Integer)
forall t. Binary t => Get t
Binary.get)
put :: Stats -> Put
put (Stats Map Peer DiffTime
timeWithoutProgress) =
Map Peer Integer -> Put
forall t. Binary t => t -> Put
Binary.put (DiffTime -> Integer
diffTimeToPicoseconds (DiffTime -> Integer) -> Map Peer DiffTime -> Map Peer Integer
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Map Peer DiffTime
timeWithoutProgress)
newtype RChan e = RChan {
forall e. RChan e -> Chan (RuntimeMessage e)
unRChan :: Chan (RuntimeMessage e)
}
instance Actor (RChan e) where
type Msg (RChan e) = RuntimeMessage e
actorChan :: RChan e -> Msg (RChan e) -> IO ()
actorChan = Chan (RuntimeMessage e) -> RuntimeMessage e -> IO ()
forall a. Chan a -> a -> IO ()
writeChan (Chan (RuntimeMessage e) -> RuntimeMessage e -> IO ())
-> (RChan e -> Chan (RuntimeMessage e))
-> RChan e
-> RuntimeMessage e
-> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. RChan e -> Chan (RuntimeMessage e)
forall e. RChan e -> Chan (RuntimeMessage e)
unRChan
applyFast :: (MonadIO m)
=> Runtime e
-> e
-> m (Output e)
applyFast :: forall (m :: * -> *) e. MonadIO m => Runtime e -> e -> m (Output e)
applyFast Runtime e
runtime e
e = Runtime e
-> (Responder (Output e) -> Msg (Runtime e)) -> m (Output e)
forall actor (m :: * -> *) a.
(Actor actor, MonadIO m) =>
actor -> (Responder a -> Msg actor) -> m a
Fork.call Runtime e
runtime (e -> Responder (Output e) -> RuntimeMessage e
forall e. e -> Responder (Output e) -> RuntimeMessage e
ApplyFast e
e)
applyConsistent :: (MonadIO m)
=> Runtime e
-> e
-> m (Output e)
applyConsistent :: forall (m :: * -> *) e. MonadIO m => Runtime e -> e -> m (Output e)
applyConsistent Runtime e
runtime e
e = Runtime e
-> (Responder (Output e) -> Msg (Runtime e)) -> m (Output e)
forall actor (m :: * -> *) a.
(Actor actor, MonadIO m) =>
actor -> (Responder a -> Msg actor) -> m a
Fork.call Runtime e
runtime (e -> Responder (Output e) -> RuntimeMessage e
forall e. e -> Responder (Output e) -> RuntimeMessage e
ApplyConsistent e
e)
readState :: (MonadIO m)
=> Runtime e
-> m (EventFold ClusterName Peer e)
readState :: forall (m :: * -> *) e.
MonadIO m =>
Runtime e -> m (EventFold ClusterName Peer e)
readState Runtime e
runtime = Runtime e
-> (Responder (EventFold ClusterName Peer e) -> Msg (Runtime e))
-> m (EventFold ClusterName Peer e)
forall actor (m :: * -> *) a.
(Actor actor, MonadIO m) =>
actor -> (Responder a -> Msg actor) -> m a
Fork.call Runtime e
runtime Responder (EventFold ClusterName Peer e) -> Msg (Runtime e)
forall e.
Responder (EventFold ClusterName Peer e) -> RuntimeMessage e
ReadState
call :: (MonadIO m) => Runtime e -> Peer -> ByteString -> m ByteString
call :: forall (m :: * -> *) e.
MonadIO m =>
Runtime e -> Peer -> ByteString -> m ByteString
call Runtime e
runtime Peer
target ByteString
msg = Runtime e
-> (Responder ByteString -> Msg (Runtime e)) -> m ByteString
forall actor (m :: * -> *) a.
(Actor actor, MonadIO m) =>
actor -> (Responder a -> Msg actor) -> m a
Fork.call Runtime e
runtime (Peer -> ByteString -> Responder ByteString -> RuntimeMessage e
forall e.
Peer -> ByteString -> Responder ByteString -> RuntimeMessage e
Call Peer
target ByteString
msg)
sendCallResponse :: (MonadIO m)
=> RChan e
-> Peer
-> MessageId
-> ByteString
-> m ()
sendCallResponse :: forall (m :: * -> *) e.
MonadIO m =>
RChan e -> Peer -> MessageId -> ByteString -> m ()
sendCallResponse RChan e
runtimeChan Peer
target MessageId
mid ByteString
msg =
RChan e -> Msg (RChan e) -> m ()
forall actor (m :: * -> *).
(Actor actor, MonadIO m) =>
actor -> Msg actor -> m ()
Fork.cast RChan e
runtimeChan (Peer -> MessageId -> ByteString -> RuntimeMessage e
forall e. Peer -> MessageId -> ByteString -> RuntimeMessage e
SendCallResponse Peer
target MessageId
mid ByteString
msg)
cast :: (MonadIO m) => Runtime e -> Peer -> ByteString -> m ()
cast :: forall (m :: * -> *) e.
MonadIO m =>
Runtime e -> Peer -> ByteString -> m ()
cast Runtime e
runtime Peer
target ByteString
message = Runtime e -> Msg (Runtime e) -> m ()
forall actor (m :: * -> *).
(Actor actor, MonadIO m) =>
actor -> Msg actor -> m ()
Fork.cast Runtime e
runtime (Peer -> ByteString -> RuntimeMessage e
forall e. Peer -> ByteString -> RuntimeMessage e
Cast Peer
target ByteString
message)
broadcall :: (MonadIO m)
=> Runtime e
-> DiffTime
-> ByteString
-> m (Map Peer (Maybe ByteString))
broadcall :: forall (m :: * -> *) e.
MonadIO m =>
Runtime e
-> DiffTime -> ByteString -> m (Map Peer (Maybe ByteString))
broadcall Runtime e
runtime DiffTime
timeout ByteString
msg = Runtime e
-> (Responder (Map Peer (Maybe ByteString)) -> Msg (Runtime e))
-> m (Map Peer (Maybe ByteString))
forall actor (m :: * -> *) a.
(Actor actor, MonadIO m) =>
actor -> (Responder a -> Msg actor) -> m a
Fork.call Runtime e
runtime (DiffTime
-> ByteString
-> Responder (Map Peer (Maybe ByteString))
-> RuntimeMessage e
forall e.
DiffTime
-> ByteString
-> Responder (Map Peer (Maybe ByteString))
-> RuntimeMessage e
Broadcall DiffTime
timeout ByteString
msg)
broadcast :: (MonadIO m) => Runtime e -> ByteString -> m ()
broadcast :: forall (m :: * -> *) e.
MonadIO m =>
Runtime e -> ByteString -> m ()
broadcast Runtime e
runtime ByteString
msg = Runtime e -> Msg (Runtime e) -> m ()
forall actor (m :: * -> *).
(Actor actor, MonadIO m) =>
actor -> Msg actor -> m ()
Fork.cast Runtime e
runtime (ByteString -> RuntimeMessage e
forall e. ByteString -> RuntimeMessage e
Broadcast ByteString
msg)
eject :: (MonadIO m) => Runtime e -> Peer -> m ()
eject :: forall (m :: * -> *) e. MonadIO m => Runtime e -> Peer -> m ()
eject Runtime e
runtime Peer
peer = Runtime e -> (Responder () -> Msg (Runtime e)) -> m ()
forall actor (m :: * -> *) a.
(Actor actor, MonadIO m) =>
actor -> (Responder a -> Msg actor) -> m a
Fork.call Runtime e
runtime (Peer -> Responder () -> RuntimeMessage e
forall e. Peer -> Responder () -> RuntimeMessage e
Eject Peer
peer)
getSelf :: Runtime e -> Peer
getSelf :: forall e. Runtime e -> Peer
getSelf = Runtime e -> Peer
forall e. Runtime e -> Peer
rSelf
data RuntimeMessage e
= ApplyFast e (Responder (Output e))
| ApplyConsistent e (Responder (Output e))
| Eject Peer (Responder ())
| Merge (Diff ClusterName Peer e)
| FullMerge (EventFold ClusterName Peer e)
| Outputs (Map (EventId Peer) (Output e))
| Join JoinRequest (Responder (JoinResponse e))
| ReadState (Responder (EventFold ClusterName Peer e))
| Call Peer ByteString (Responder ByteString)
| Cast Peer ByteString
| Broadcall
DiffTime
ByteString
(Responder (Map Peer (Maybe ByteString)))
| Broadcast ByteString
| SendCallResponse Peer MessageId ByteString
| HandleCallResponse Peer MessageId ByteString
| Resend (Responder ())
| GetStats (Responder (EventFold ClusterName Peer e))
deriving stock instance
( Show e
, Show (Output e)
, Show (State e)
)
=>
Show (RuntimeMessage e)
executeRuntime
:: ( Binary (Output e)
, Binary (State e)
, Binary e
, Default (State e)
, Eq (Output e)
, Eq e
, Event Peer e
, MonadCatch m
, MonadFail m
, MonadLoggerIO m
, MonadTimeSpec m
, MonadUnliftIO m
, Race
, Show (Output e)
, Show (State e)
, Show e
, ToJSON (Output e)
, ToJSON (State e)
, ToJSON e
)
=> (ByteString -> IO ByteString)
-> (ByteString -> IO ())
-> Int
-> RuntimeState e
-> RChan e
-> IORef (Map Peer TimeSpec)
-> m ()
executeRuntime :: forall e (m :: * -> *).
(Binary (Output e), Binary (State e), Binary e, Default (State e),
Eq (Output e), Eq e, Event Peer e, MonadCatch m, MonadFail m,
MonadLoggerIO m, MonadTimeSpec m, MonadUnliftIO m, Race,
Show (Output e), Show (State e), Show e, ToJSON (Output e),
ToJSON (State e), ToJSON e) =>
(ByteString -> IO ByteString)
-> (ByteString -> IO ())
-> Int
-> RuntimeState e
-> RChan e
-> IORef (Map Peer TimeSpec)
-> m ()
executeRuntime
ByteString -> IO ByteString
handleUserCall
ByteString -> IO ()
handleUserCast
Int
resendInterval
RuntimeState e
rts
RChan e
runtimeChan
IORef (Map Peer TimeSpec)
peerStats
= do
ProcessName -> m () -> m ()
forall (m :: * -> *) a.
(MonadCatch m, MonadLogger m, MonadUnliftIO m, Race) =>
ProcessName -> m a -> m ()
race ProcessName
"om-legion peer listener" m ()
forall (m :: * -> *). (MonadLoggerIO m, MonadFail m) => m ()
runPeerListener
ProcessName -> m () -> m ()
forall (m :: * -> *) a.
(MonadCatch m, MonadLogger m, MonadUnliftIO m, Race) =>
ProcessName -> m a -> m ()
race ProcessName
"om-legion join listener" m ()
forall (m :: * -> *). (MonadLoggerIO m, MonadFail m) => m ()
runJoinListener
ProcessName -> m () -> m ()
forall (m :: * -> *) a.
(MonadCatch m, MonadLogger m, MonadUnliftIO m, Race) =>
ProcessName -> m a -> m ()
race ProcessName
"om-legion periodic resend" m ()
forall (m :: * -> *). MonadIO m => m ()
runPeriodicResent
ProcessName -> m Any -> m ()
forall (m :: * -> *) a.
(MonadCatch m, MonadLogger m, MonadUnliftIO m, Race) =>
ProcessName -> m a -> m ()
race ProcessName
"om-legion message handler" (m Any -> m ()) -> m Any -> m ()
forall a b. (a -> b) -> a -> b
$
(StateT (RuntimeState e) m Any -> RuntimeState e -> m Any
forall (m :: * -> *) s a. Monad m => StateT s m a -> s -> m a
`evalStateT` RuntimeState e
rts)
(
let
handleMessages :: StateT (RuntimeState e) m Any
handleMessages = do
RuntimeMessage e
msg <- IO (RuntimeMessage e)
-> StateT (RuntimeState e) m (RuntimeMessage e)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (RuntimeMessage e)
-> StateT (RuntimeState e) m (RuntimeMessage e))
-> IO (RuntimeMessage e)
-> StateT (RuntimeState e) m (RuntimeMessage e)
forall a b. (a -> b) -> a -> b
$ Chan (RuntimeMessage e) -> IO (RuntimeMessage e)
forall a. Chan a -> IO a
readChan (RChan e -> Chan (RuntimeMessage e)
forall e. RChan e -> Chan (RuntimeMessage e)
unRChan RChan e
runtimeChan)
RuntimeState {rsClusterState :: forall e. RuntimeState e -> EventFold ClusterName Peer e
rsClusterState = EventFold ClusterName Peer e
cluster1} <- StateT (RuntimeState e) m (RuntimeState e)
forall s (m :: * -> *). MonadState s m => m s
get
Text -> StateT (RuntimeState e) m ()
forall (m :: * -> *). (HasCallStack, MonadLogger m) => Text -> m ()
logDebug (Text -> StateT (RuntimeState e) m ())
-> Text -> StateT (RuntimeState e) m ()
forall a b. (a -> b) -> a -> b
$ Text
"Handling: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> RuntimeMessage e -> Text
forall a b. (Show a, IsString b) => a -> b
showt RuntimeMessage e
msg
RuntimeMessage e -> StateT (RuntimeState e) m ()
forall e (m :: * -> *).
(Binary (Output e), Binary (State e), Binary e, Default (State e),
Eq (Output e), Eq e, Event Peer e, MonadCatch m, MonadLoggerIO m,
MonadTimeSpec m, Show (Output e), Show (State e), Show e,
ToJSON (Output e), ToJSON (State e), ToJSON e) =>
RuntimeMessage e -> StateT (RuntimeState e) m ()
handleRuntimeMessage RuntimeMessage e
msg
RuntimeState {rsClusterState :: forall e. RuntimeState e -> EventFold ClusterName Peer e
rsClusterState = EventFold ClusterName Peer e
cluster2} <- StateT (RuntimeState e) m (RuntimeState e)
forall s (m :: * -> *). MonadState s m => m s
get
Bool
-> StateT (RuntimeState e) m () -> StateT (RuntimeState e) m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (EventFold ClusterName Peer e
cluster1 EventFold ClusterName Peer e
-> EventFold ClusterName Peer e -> Bool
forall a. Eq a => a -> a -> Bool
/= EventFold ClusterName Peer e
cluster2) (StateT (RuntimeState e) m () -> StateT (RuntimeState e) m ())
-> StateT (RuntimeState e) m () -> StateT (RuntimeState e) m ()
forall a b. (a -> b) -> a -> b
$
Text -> StateT (RuntimeState e) m ()
forall (m :: * -> *). (HasCallStack, MonadLogger m) => Text -> m ()
logDebug (Text -> StateT (RuntimeState e) m ())
-> Text -> StateT (RuntimeState e) m ()
forall a b. (a -> b) -> a -> b
$ Text
"New Cluster State: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> EventFold ClusterName Peer e -> Text
forall a b. (ToJSON a, IsString b) => a -> b
showj EventFold ClusterName Peer e
cluster2
StateT (RuntimeState e) m ()
forall (m :: * -> *) e.
(MonadIO m, MonadTimeSpec m) =>
StateT (RuntimeState e) m ()
handleBroadcallTimeouts
StateT (RuntimeState e) m ()
forall (m :: * -> *) e.
MonadLoggerIO m =>
StateT (RuntimeState e) m ()
handleOutstandingJoins
StateT (RuntimeState e) m Any
handleMessages
in do
IO () -> StateT (RuntimeState e) m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> StateT (RuntimeState e) m ())
-> IO () -> StateT (RuntimeState e) m ()
forall a b. (a -> b) -> a -> b
$ RuntimeState e -> EventFold ClusterName Peer e -> IO ()
forall e. RuntimeState e -> EventFold ClusterName Peer e -> IO ()
rsNotify RuntimeState e
rts (RuntimeState e -> EventFold ClusterName Peer e
forall e. RuntimeState e -> EventFold ClusterName Peer e
rsClusterState RuntimeState e
rts)
StateT (RuntimeState e) m Any
handleMessages
)
where
runPeerListener :: (MonadLoggerIO m, MonadFail m) => m ()
runPeerListener :: forall (m :: * -> *). (MonadLoggerIO m, MonadFail m) => m ()
runPeerListener =
let
addy :: AddressDescription
addy :: AddressDescription
addy =
Text -> AddressDescription
AddressDescription
(
Peer -> Text
unPeer (RuntimeState e -> Peer
forall e. RuntimeState e -> Peer
rsSelf RuntimeState e
rts)
Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
":" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> PortNumber -> Text
forall a b. (Show a, IsString b) => a -> b
showt PortNumber
peerMessagePort
)
in
ConduitT () Void m () -> m ()
forall (m :: * -> *) r. Monad m => ConduitT () Void m r -> m r
runConduit (
AddressDescription -> ConduitT () (Peer, PeerMessage e) m ()
forall i (m :: * -> *).
(Binary i, MonadIO m, MonadFail m) =>
AddressDescription -> ConduitT () i m ()
openIngress AddressDescription
addy
ConduitT () (Peer, PeerMessage e) m ()
-> ConduitT (Peer, PeerMessage e) Void m ()
-> ConduitT () Void m ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.| ((Peer, PeerMessage e)
-> ConduitT (Peer, PeerMessage e) (RuntimeMessage e) m ())
-> ConduitT (Peer, PeerMessage e) (RuntimeMessage e) m ()
forall (m :: * -> *) i o r.
Monad m =>
(i -> ConduitT i o m r) -> ConduitT i o m ()
awaitForever (\ (Peer
msgSource, PeerMessage e
msg) -> do
IO () -> ConduitT (Peer, PeerMessage e) (RuntimeMessage e) m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ConduitT (Peer, PeerMessage e) (RuntimeMessage e) m ())
-> IO () -> ConduitT (Peer, PeerMessage e) (RuntimeMessage e) m ()
forall a b. (a -> b) -> a -> b
$ do
TimeSpec
now <- IO TimeSpec
forall (m :: * -> *). MonadTimeSpec m => m TimeSpec
getTime
IORef (Map Peer TimeSpec)
-> (Map Peer TimeSpec -> (Map Peer TimeSpec, ())) -> IO ()
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef'
IORef (Map Peer TimeSpec)
peerStats
(\Map Peer TimeSpec
peerTimes -> (Peer -> TimeSpec -> Map Peer TimeSpec -> Map Peer TimeSpec
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert Peer
msgSource TimeSpec
now Map Peer TimeSpec
peerTimes, ()))
Text -> ConduitT (Peer, PeerMessage e) (RuntimeMessage e) m ()
forall (m :: * -> *). (HasCallStack, MonadLogger m) => Text -> m ()
logDebug (Text -> ConduitT (Peer, PeerMessage e) (RuntimeMessage e) m ())
-> Text -> ConduitT (Peer, PeerMessage e) (RuntimeMessage e) m ()
forall a b. (a -> b) -> a -> b
$ Text
"Handling: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> (Peer, PeerMessage e) -> Text
forall a b. (Show a, IsString b) => a -> b
showt (Peer
msgSource :: Peer, PeerMessage e
msg)
case PeerMessage e
msg of
PMFullMerge EventFold ClusterName Peer e
ps -> RuntimeMessage e
-> ConduitT (Peer, PeerMessage e) (RuntimeMessage e) m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield (EventFold ClusterName Peer e -> RuntimeMessage e
forall e. EventFold ClusterName Peer e -> RuntimeMessage e
FullMerge EventFold ClusterName Peer e
ps)
PMOutputs Map (EventId Peer) (Output e)
outputs -> RuntimeMessage e
-> ConduitT (Peer, PeerMessage e) (RuntimeMessage e) m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield (Map (EventId Peer) (Output e) -> RuntimeMessage e
forall e. Map (EventId Peer) (Output e) -> RuntimeMessage e
Outputs Map (EventId Peer) (Output e)
outputs)
PMMerge Diff ClusterName Peer e
ps -> RuntimeMessage e
-> ConduitT (Peer, PeerMessage e) (RuntimeMessage e) m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield (Diff ClusterName Peer e -> RuntimeMessage e
forall e. Diff ClusterName Peer e -> RuntimeMessage e
Merge Diff ClusterName Peer e
ps)
PMCall Peer
source MessageId
mid ByteString
callMsg ->
(IO (Either SomeException ByteString)
-> ConduitT
(Peer, PeerMessage e)
(RuntimeMessage e)
m
(Either SomeException ByteString)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either SomeException ByteString)
-> ConduitT
(Peer, PeerMessage e)
(RuntimeMessage e)
m
(Either SomeException ByteString))
-> (IO ByteString -> IO (Either SomeException ByteString))
-> IO ByteString
-> ConduitT
(Peer, PeerMessage e)
(RuntimeMessage e)
m
(Either SomeException ByteString)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO ByteString -> IO (Either SomeException ByteString)
forall (m :: * -> *) a.
MonadCatch m =>
m a -> m (Either SomeException a)
tryAny) (ByteString -> IO ByteString
handleUserCall ByteString
callMsg) ConduitT
(Peer, PeerMessage e)
(RuntimeMessage e)
m
(Either SomeException ByteString)
-> (Either SomeException ByteString
-> ConduitT (Peer, PeerMessage e) (RuntimeMessage e) m ())
-> ConduitT (Peer, PeerMessage e) (RuntimeMessage e) m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Left SomeException
err ->
Text -> ConduitT (Peer, PeerMessage e) (RuntimeMessage e) m ()
forall (m :: * -> *). (HasCallStack, MonadLogger m) => Text -> m ()
logError
(Text -> ConduitT (Peer, PeerMessage e) (RuntimeMessage e) m ())
-> Text -> ConduitT (Peer, PeerMessage e) (RuntimeMessage e) m ()
forall a b. (a -> b) -> a -> b
$ Text
"User call handling failed with: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> SomeException -> Text
forall a b. (Show a, IsString b) => a -> b
showt SomeException
err
Right ByteString
v -> RChan e
-> Peer
-> MessageId
-> ByteString
-> ConduitT (Peer, PeerMessage e) (RuntimeMessage e) m ()
forall (m :: * -> *) e.
MonadIO m =>
RChan e -> Peer -> MessageId -> ByteString -> m ()
sendCallResponse RChan e
runtimeChan Peer
source MessageId
mid ByteString
v
PMCast ByteString
castMsg -> IO () -> ConduitT (Peer, PeerMessage e) (RuntimeMessage e) m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (ByteString -> IO ()
handleUserCast ByteString
castMsg)
PMCallResponse Peer
source MessageId
mid ByteString
responseMsg ->
RuntimeMessage e
-> ConduitT (Peer, PeerMessage e) (RuntimeMessage e) m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield (Peer -> MessageId -> ByteString -> RuntimeMessage e
forall e. Peer -> MessageId -> ByteString -> RuntimeMessage e
HandleCallResponse Peer
source MessageId
mid ByteString
responseMsg)
)
ConduitT (Peer, PeerMessage e) (RuntimeMessage e) m ()
-> ConduitT (RuntimeMessage e) Void m ()
-> ConduitT (Peer, PeerMessage e) Void m ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.| Chan (RuntimeMessage e) -> ConduitT (RuntimeMessage e) Void m ()
forall (m :: * -> *) a void.
MonadIO m =>
Chan a -> ConduitT a void m ()
chanToSink (RChan e -> Chan (RuntimeMessage e)
forall e. RChan e -> Chan (RuntimeMessage e)
unRChan RChan e
runtimeChan)
)
runJoinListener :: (MonadLoggerIO m, MonadFail m) => m ()
runJoinListener :: forall (m :: * -> *). (MonadLoggerIO m, MonadFail m) => m ()
runJoinListener =
let
addy :: AddressDescription
addy :: AddressDescription
addy =
Text -> AddressDescription
AddressDescription
(
Peer -> Text
unPeer (RuntimeState e -> Peer
forall e. RuntimeState e -> Peer
rsSelf RuntimeState e
rts)
Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
":" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> PortNumber -> Text
forall a b. (Show a, IsString b) => a -> b
showt PortNumber
joinMessagePort
)
in
ConduitT () Void m () -> m ()
forall (m :: * -> *) r. Monad m => ConduitT () Void m r -> m r
runConduit (
() -> ConduitT () Void m ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
ConduitT () Void m ()
-> ConduitT Void Void m () -> ConduitT () Void m ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.| AddressDescription
-> Maybe (IO ServerParams)
-> ConduitT Void (JoinRequest, JoinResponse e -> m Responded) m ()
forall request response (m :: * -> *).
(Binary request, Binary response, MonadFail m, MonadLoggerIO m,
Show request, Show response) =>
AddressDescription
-> Maybe (IO ServerParams)
-> ConduitT Void (request, response -> m Responded) m ()
openServer AddressDescription
addy Maybe (IO ServerParams)
forall a. Maybe a
Nothing
ConduitT Void (JoinRequest, JoinResponse e -> m Responded) m ()
-> ConduitT (JoinRequest, JoinResponse e -> m Responded) Void m ()
-> ConduitT Void Void m ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.| ((JoinRequest, JoinResponse e -> m Responded)
-> ConduitT
(JoinRequest, JoinResponse e -> m Responded) Void m Responded)
-> ConduitT (JoinRequest, JoinResponse e -> m Responded) Void m ()
forall (m :: * -> *) i o r.
Monad m =>
(i -> ConduitT i o m r) -> ConduitT i o m ()
awaitForever (\(JoinRequest
req, JoinResponse e -> m Responded
respond_) -> m Responded
-> ConduitT
(JoinRequest, JoinResponse e -> m Responded) Void m Responded
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m Responded
-> ConduitT
(JoinRequest, JoinResponse e -> m Responded) Void m Responded)
-> m Responded
-> ConduitT
(JoinRequest, JoinResponse e -> m Responded) Void m Responded
forall a b. (a -> b) -> a -> b
$
RChan e
-> (Responder (JoinResponse e) -> Msg (RChan e))
-> m (JoinResponse e)
forall actor (m :: * -> *) a.
(Actor actor, MonadIO m) =>
actor -> (Responder a -> Msg actor) -> m a
Fork.call RChan e
runtimeChan (JoinRequest -> Responder (JoinResponse e) -> RuntimeMessage e
forall e.
JoinRequest -> Responder (JoinResponse e) -> RuntimeMessage e
Join JoinRequest
req) m (JoinResponse e)
-> (JoinResponse e -> m Responded) -> m Responded
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= JoinResponse e -> m Responded
respond_
)
)
runPeriodicResent :: (MonadIO m) => m ()
runPeriodicResent :: forall (m :: * -> *). MonadIO m => m ()
runPeriodicResent =
let
periodicResend :: (MonadIO m) => m ()
periodicResend :: forall (m :: * -> *). MonadIO m => m ()
periodicResend = do
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Int -> IO ()
threadDelay Int
resendInterval
RChan e -> (Responder () -> Msg (RChan e)) -> m ()
forall actor (m :: * -> *) a.
(Actor actor, MonadIO m) =>
actor -> (Responder a -> Msg actor) -> m a
Fork.call RChan e
runtimeChan Responder () -> Msg (RChan e)
forall e. Responder () -> RuntimeMessage e
Resend
m ()
forall (m :: * -> *). MonadIO m => m ()
periodicResend
in
m ()
forall (m :: * -> *). MonadIO m => m ()
periodicResend
handleOutstandingJoins :: (MonadLoggerIO m) => StateT (RuntimeState e) m ()
handleOutstandingJoins :: forall (m :: * -> *) e.
MonadLoggerIO m =>
StateT (RuntimeState e) m ()
handleOutstandingJoins = do
state :: RuntimeState e
state@RuntimeState {Map (EventId Peer) (Responder (JoinResponse e))
rsJoins :: Map (EventId Peer) (Responder (JoinResponse e))
rsJoins :: forall e.
RuntimeState e -> Map (EventId Peer) (Responder (JoinResponse e))
rsJoins, EventFold ClusterName Peer e
rsClusterState :: EventFold ClusterName Peer e
rsClusterState :: forall e. RuntimeState e -> EventFold ClusterName Peer e
rsClusterState} <- StateT (RuntimeState e) m (RuntimeState e)
forall s (m :: * -> *). MonadState s m => m s
get
let
(Map (EventId Peer) (Responder (JoinResponse e))
consistent, Map (EventId Peer) (Responder (JoinResponse e))
pending) =
(EventId Peer -> Responder (JoinResponse e) -> Bool)
-> Map (EventId Peer) (Responder (JoinResponse e))
-> (Map (EventId Peer) (Responder (JoinResponse e)),
Map (EventId Peer) (Responder (JoinResponse e)))
forall k a. (k -> a -> Bool) -> Map k a -> (Map k a, Map k a)
Map.partitionWithKey
(\EventId Peer
k Responder (JoinResponse e)
_ -> EventId Peer
k EventId Peer -> EventId Peer -> Bool
forall a. Ord a => a -> a -> Bool
<= EventFold ClusterName Peer e -> EventId Peer
forall o p e. EventFold o p e -> EventId p
infimumId EventFold ClusterName Peer e
rsClusterState)
Map (EventId Peer) (Responder (JoinResponse e))
rsJoins
RuntimeState e -> StateT (RuntimeState e) m ()
forall s (m :: * -> *). MonadState s m => s -> m ()
put RuntimeState e
state {rsJoins :: Map (EventId Peer) (Responder (JoinResponse e))
rsJoins = Map (EventId Peer) (Responder (JoinResponse e))
pending}
[StateT (RuntimeState e) m ()] -> StateT (RuntimeState e) m ()
forall (t :: * -> *) (m :: * -> *) a.
(Foldable t, Monad m) =>
t (m a) -> m ()
sequence_ [
do
Text -> StateT (RuntimeState e) m ()
forall (m :: * -> *). (HasCallStack, MonadLogger m) => Text -> m ()
logInfo (Text -> StateT (RuntimeState e) m ())
-> Text -> StateT (RuntimeState e) m ()
forall a b. (a -> b) -> a -> b
$ Text
"Completing join (" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> EventId Peer -> Text
forall a b. (Show a, IsString b) => a -> b
showt EventId Peer
sid Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
")."
Responder (JoinResponse e)
-> JoinResponse e -> StateT (RuntimeState e) m ()
forall (m :: * -> *) a. MonadIO m => Responder a -> a -> m ()
respond Responder (JoinResponse e)
responder (EventFold ClusterName Peer e -> JoinResponse e
forall e. EventFold ClusterName Peer e -> JoinResponse e
JoinOk EventFold ClusterName Peer e
rsClusterState)
| (EventId Peer
sid, Responder (JoinResponse e)
responder) <- Map (EventId Peer) (Responder (JoinResponse e))
-> [(EventId Peer, Responder (JoinResponse e))]
forall k a. Map k a -> [(k, a)]
Map.toList Map (EventId Peer) (Responder (JoinResponse e))
consistent
]
handleBroadcallTimeouts
:: ( MonadIO m
, MonadTimeSpec m
)
=> StateT (RuntimeState e) m ()
handleBroadcallTimeouts :: forall (m :: * -> *) e.
(MonadIO m, MonadTimeSpec m) =>
StateT (RuntimeState e) m ()
handleBroadcallTimeouts = do
Map
MessageId
(Map Peer (Maybe ByteString),
Responder (Map Peer (Maybe ByteString)), TimeSpec)
broadcalls <- (RuntimeState e
-> Map
MessageId
(Map Peer (Maybe ByteString),
Responder (Map Peer (Maybe ByteString)), TimeSpec))
-> StateT
(RuntimeState e)
m
(Map
MessageId
(Map Peer (Maybe ByteString),
Responder (Map Peer (Maybe ByteString)), TimeSpec))
forall s (m :: * -> *) a. MonadState s m => (s -> a) -> m a
gets RuntimeState e
-> Map
MessageId
(Map Peer (Maybe ByteString),
Responder (Map Peer (Maybe ByteString)), TimeSpec)
forall e.
RuntimeState e
-> Map
MessageId
(Map Peer (Maybe ByteString),
Responder (Map Peer (Maybe ByteString)), TimeSpec)
rsBroadcalls
TimeSpec
now <- StateT (RuntimeState e) m TimeSpec
forall (m :: * -> *). MonadTimeSpec m => m TimeSpec
getTime
[StateT (RuntimeState e) m ()] -> StateT (RuntimeState e) m ()
forall (t :: * -> *) (m :: * -> *) a.
(Foldable t, Monad m) =>
t (m a) -> m ()
sequence_ [
do
Responder (Map Peer (Maybe ByteString))
-> Map Peer (Maybe ByteString) -> StateT (RuntimeState e) m ()
forall (m :: * -> *) a. MonadIO m => Responder a -> a -> m ()
respond Responder (Map Peer (Maybe ByteString))
responder Map Peer (Maybe ByteString)
responses
(RuntimeState e -> RuntimeState e) -> StateT (RuntimeState e) m ()
forall s (m :: * -> *). MonadState s m => (s -> s) -> m ()
modify' (\RuntimeState e
rs -> RuntimeState e
rs {
rsBroadcalls :: Map
MessageId
(Map Peer (Maybe ByteString),
Responder (Map Peer (Maybe ByteString)), TimeSpec)
rsBroadcalls = MessageId
-> Map
MessageId
(Map Peer (Maybe ByteString),
Responder (Map Peer (Maybe ByteString)), TimeSpec)
-> Map
MessageId
(Map Peer (Maybe ByteString),
Responder (Map Peer (Maybe ByteString)), TimeSpec)
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete MessageId
messageId (RuntimeState e
-> Map
MessageId
(Map Peer (Maybe ByteString),
Responder (Map Peer (Maybe ByteString)), TimeSpec)
forall e.
RuntimeState e
-> Map
MessageId
(Map Peer (Maybe ByteString),
Responder (Map Peer (Maybe ByteString)), TimeSpec)
rsBroadcalls RuntimeState e
rs)
})
| (MessageId
messageId, (Map Peer (Maybe ByteString)
responses, Responder (Map Peer (Maybe ByteString))
responder, TimeSpec
expiry)) <- Map
MessageId
(Map Peer (Maybe ByteString),
Responder (Map Peer (Maybe ByteString)), TimeSpec)
-> [(MessageId,
(Map Peer (Maybe ByteString),
Responder (Map Peer (Maybe ByteString)), TimeSpec))]
forall k a. Map k a -> [(k, a)]
Map.toList Map
MessageId
(Map Peer (Maybe ByteString),
Responder (Map Peer (Maybe ByteString)), TimeSpec)
broadcalls
, TimeSpec
now TimeSpec -> TimeSpec -> Bool
forall a. Ord a => a -> a -> Bool
>= TimeSpec
expiry
]
handleRuntimeMessage
:: ( Binary (Output e)
, Binary (State e)
, Binary e
, Default (State e)
, Eq (Output e)
, Eq e
, Event Peer e
, MonadCatch m
, MonadLoggerIO m
, MonadTimeSpec m
, Show (Output e)
, Show (State e)
, Show e
, ToJSON (Output e)
, ToJSON (State e)
, ToJSON e
)
=> RuntimeMessage e
-> StateT (RuntimeState e) m ()
handleRuntimeMessage :: forall e (m :: * -> *).
(Binary (Output e), Binary (State e), Binary e, Default (State e),
Eq (Output e), Eq e, Event Peer e, MonadCatch m, MonadLoggerIO m,
MonadTimeSpec m, Show (Output e), Show (State e), Show e,
ToJSON (Output e), ToJSON (State e), ToJSON e) =>
RuntimeMessage e -> StateT (RuntimeState e) m ()
handleRuntimeMessage (Outputs Map (EventId Peer) (Output e)
outputs) =
{-# SCC "Outputs" #-}
Map (EventId Peer) (Output e) -> StateT (RuntimeState e) m ()
forall (m :: * -> *) e.
(MonadLoggerIO m, MonadState (RuntimeState e) m,
Show (Output e)) =>
Map (EventId Peer) (Output e) -> m ()
respondToWaiting Map (EventId Peer) (Output e)
outputs
handleRuntimeMessage (GetStats Responder (EventFold ClusterName Peer e)
responder) =
{-# SCC "GetStats" #-}
Responder (EventFold ClusterName Peer e)
-> EventFold ClusterName Peer e -> StateT (RuntimeState e) m ()
forall (m :: * -> *) a. MonadIO m => Responder a -> a -> m ()
respond Responder (EventFold ClusterName Peer e)
responder (EventFold ClusterName Peer e -> StateT (RuntimeState e) m ())
-> StateT (RuntimeState e) m (EventFold ClusterName Peer e)
-> StateT (RuntimeState e) m ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< (RuntimeState e -> EventFold ClusterName Peer e)
-> StateT (RuntimeState e) m (EventFold ClusterName Peer e)
forall s (m :: * -> *) a. MonadState s m => (s -> a) -> m a
gets RuntimeState e -> EventFold ClusterName Peer e
forall e. RuntimeState e -> EventFold ClusterName Peer e
rsClusterState
handleRuntimeMessage (ApplyFast e
e Responder (Output e)
responder) =
{-# SCC "ApplyFast" #-}
EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
-> StateT (RuntimeState e) m ()
forall e (m :: * -> *) a.
(EventConstraints e, MonadCatch m, MonadLoggerIO m,
MonadState (RuntimeState e) m) =>
EventFoldT ClusterName Peer e m a -> m a
updateCluster (EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
-> StateT (RuntimeState e) m ())
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
-> StateT (RuntimeState e) m ()
forall a b. (a -> b) -> a -> b
$
(Output e, EventId Peer) -> Output e
forall a b. (a, b) -> a
fst ((Output e, EventId Peer) -> Output e)
-> EventFoldT
ClusterName
Peer
e
(StateT (RuntimeState e) m)
(Output e, EventId Peer)
-> EventFoldT
ClusterName Peer e (StateT (RuntimeState e) m) (Output e)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> e
-> EventFoldT
ClusterName
Peer
e
(StateT (RuntimeState e) m)
(Output e, EventId Peer)
forall o p e (m :: * -> *).
MonadUpdateEF o p e m =>
e -> m (Output e, EventId p)
event e
e EventFoldT
ClusterName Peer e (StateT (RuntimeState e) m) (Output e)
-> (Output e
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ())
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Responder (Output e)
-> Output e
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
forall (m :: * -> *) a. MonadIO m => Responder a -> a -> m ()
respond Responder (Output e)
responder
handleRuntimeMessage (ApplyConsistent e
e Responder (Output e)
responder) =
{-# SCC "ApplyConsistent" #-}
(
do
EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
-> StateT (RuntimeState e) m ()
forall e (m :: * -> *) a.
(EventConstraints e, MonadCatch m, MonadLoggerIO m,
MonadState (RuntimeState e) m) =>
EventFoldT ClusterName Peer e m a -> m a
updateCluster (EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
-> StateT (RuntimeState e) m ())
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
-> StateT (RuntimeState e) m ()
forall a b. (a -> b) -> a -> b
$ do
(Output e
_v, EventId Peer
sid) <- e
-> EventFoldT
ClusterName
Peer
e
(StateT (RuntimeState e) m)
(Output e, EventId Peer)
forall o p e (m :: * -> *).
MonadUpdateEF o p e m =>
e -> m (Output e, EventId p)
event e
e
StateT (RuntimeState e) m ()
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (EventId Peer
-> Responder (Output e) -> StateT (RuntimeState e) m ()
forall (m :: * -> *) e.
Monad m =>
EventId Peer
-> Responder (Output e) -> StateT (RuntimeState e) m ()
waitOn EventId Peer
sid Responder (Output e)
responder)
RuntimeState e
rs <- StateT (RuntimeState e) m (RuntimeState e)
forall s (m :: * -> *). MonadState s m => m s
get
Text -> StateT (RuntimeState e) m ()
forall (m :: * -> *). (HasCallStack, MonadLogger m) => Text -> m ()
logDebug (Text -> StateT (RuntimeState e) m ())
-> Text -> StateT (RuntimeState e) m ()
forall a b. (a -> b) -> a -> b
$ Text
"Waiting: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Map (EventId Peer) (Responder (Output e)) -> Text
forall a b. (Show a, IsString b) => a -> b
showt (RuntimeState e -> Map (EventId Peer) (Responder (Output e))
forall e.
RuntimeState e -> Map (EventId Peer) (Responder (Output e))
rsWaiting RuntimeState e
rs)
)
handleRuntimeMessage (Eject Peer
peer Responder ()
responder) =
{-# SCC "Eject" #-}
do
Peer
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
-> StateT (RuntimeState e) m ()
forall e (m :: * -> *) a.
(EventConstraints e, MonadCatch m, MonadLoggerIO m,
MonadState (RuntimeState e) m) =>
Peer -> EventFoldT ClusterName Peer e m a -> m a
updateClusterAs Peer
peer (EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
-> StateT (RuntimeState e) m ())
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
-> StateT (RuntimeState e) m ()
forall a b. (a -> b) -> a -> b
$
EventFoldT
ClusterName Peer e (StateT (RuntimeState e) m) (EventId Peer)
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (EventFoldT
ClusterName Peer e (StateT (RuntimeState e) m) (EventId Peer)
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ())
-> EventFoldT
ClusterName Peer e (StateT (RuntimeState e) m) (EventId Peer)
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
forall a b. (a -> b) -> a -> b
$ Peer
-> EventFoldT
ClusterName Peer e (StateT (RuntimeState e) m) (EventId Peer)
forall o p e (m :: * -> *).
MonadUpdateEF o p e m =>
p -> m (EventId p)
disassociate Peer
peer
StateT (RuntimeState e) m ()
forall e (m :: * -> *).
(EventConstraints e, MonadCatch m, MonadLoggerIO m,
MonadState (RuntimeState e) m) =>
m ()
propagate
IO () -> StateT (RuntimeState e) m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> StateT (RuntimeState e) m ())
-> IO () -> StateT (RuntimeState e) m ()
forall a b. (a -> b) -> a -> b
$ Int -> IO ()
threadDelay Int
500_000
Responder () -> () -> StateT (RuntimeState e) m ()
forall (m :: * -> *) a. MonadIO m => Responder a -> a -> m ()
respond Responder ()
responder ()
handleRuntimeMessage (Merge Diff ClusterName Peer e
other) =
{-# SCC "Merge" #-}
EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
-> StateT (RuntimeState e) m ()
forall e (m :: * -> *) a.
(EventConstraints e, MonadCatch m, MonadLoggerIO m,
MonadState (RuntimeState e) m) =>
EventFoldT ClusterName Peer e m a -> m a
updateCluster (EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
-> StateT (RuntimeState e) m ())
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
-> StateT (RuntimeState e) m ()
forall a b. (a -> b) -> a -> b
$
Diff ClusterName Peer e
-> EventFoldT
ClusterName
Peer
e
(StateT (RuntimeState e) m)
(Either (MergeError ClusterName Peer e) ())
forall o p e (m :: * -> *).
MonadUpdateEF o p e m =>
Diff o p e -> m (Either (MergeError o p e) ())
diffMerge Diff ClusterName Peer e
other EventFoldT
ClusterName
Peer
e
(StateT (RuntimeState e) m)
(Either (MergeError ClusterName Peer e) ())
-> (Either (MergeError ClusterName Peer e) ()
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ())
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Left MergeError ClusterName Peer e
err -> Text
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
forall (m :: * -> *). (HasCallStack, MonadLogger m) => Text -> m ()
logError (Text
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ())
-> Text
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
forall a b. (a -> b) -> a -> b
$ Text
"Bad cluster merge: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> MergeError ClusterName Peer e -> Text
forall a b. (Show a, IsString b) => a -> b
showt MergeError ClusterName Peer e
err
Right () -> () -> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
handleRuntimeMessage (FullMerge EventFold ClusterName Peer e
other) =
{-# SCC "FullMerge" #-}
EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
-> StateT (RuntimeState e) m ()
forall e (m :: * -> *) a.
(EventConstraints e, MonadCatch m, MonadLoggerIO m,
MonadState (RuntimeState e) m) =>
EventFoldT ClusterName Peer e m a -> m a
updateCluster (EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
-> StateT (RuntimeState e) m ())
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
-> StateT (RuntimeState e) m ()
forall a b. (a -> b) -> a -> b
$
EventFold ClusterName Peer e
-> EventFoldT
ClusterName
Peer
e
(StateT (RuntimeState e) m)
(Either (MergeError ClusterName Peer e) ())
forall o p e (m :: * -> *).
MonadUpdateEF o p e m =>
EventFold o p e -> m (Either (MergeError o p e) ())
fullMerge EventFold ClusterName Peer e
other EventFoldT
ClusterName
Peer
e
(StateT (RuntimeState e) m)
(Either (MergeError ClusterName Peer e) ())
-> (Either (MergeError ClusterName Peer e) ()
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ())
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Left MergeError ClusterName Peer e
err -> Text
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
forall (m :: * -> *). (HasCallStack, MonadLogger m) => Text -> m ()
logError (Text
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ())
-> Text
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
forall a b. (a -> b) -> a -> b
$ Text
"Bad cluster merge: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> MergeError ClusterName Peer e -> Text
forall a b. (Show a, IsString b) => a -> b
showt MergeError ClusterName Peer e
err
Right () -> () -> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
handleRuntimeMessage (Join (JoinRequest Peer
peer) Responder (JoinResponse e)
responder) =
{-# SCC "Join" #-}
do
Text -> StateT (RuntimeState e) m ()
forall (m :: * -> *). (HasCallStack, MonadLogger m) => Text -> m ()
logInfo (Text -> StateT (RuntimeState e) m ())
-> Text -> StateT (RuntimeState e) m ()
forall a b. (a -> b) -> a -> b
$ Text
"Handling join from peer: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Peer -> Text
forall a b. (Show a, IsString b) => a -> b
showt Peer
peer
EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
-> StateT (RuntimeState e) m ()
forall e (m :: * -> *) a.
(EventConstraints e, MonadCatch m, MonadLoggerIO m,
MonadState (RuntimeState e) m) =>
EventFoldT ClusterName Peer e m a -> m a
updateCluster (do
EventFoldT
ClusterName Peer e (StateT (RuntimeState e) m) (EventId Peer)
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (EventFoldT
ClusterName Peer e (StateT (RuntimeState e) m) (EventId Peer)
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ())
-> EventFoldT
ClusterName Peer e (StateT (RuntimeState e) m) (EventId Peer)
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
forall a b. (a -> b) -> a -> b
$ Peer
-> EventFoldT
ClusterName Peer e (StateT (RuntimeState e) m) (EventId Peer)
forall o p e (m :: * -> *).
MonadUpdateEF o p e m =>
p -> m (EventId p)
disassociate Peer
peer
EventFoldT
ClusterName Peer e (StateT (RuntimeState e) m) (EventId Peer)
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (EventFoldT
ClusterName Peer e (StateT (RuntimeState e) m) (EventId Peer)
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ())
-> EventFoldT
ClusterName Peer e (StateT (RuntimeState e) m) (EventId Peer)
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
forall a b. (a -> b) -> a -> b
$ Peer
-> EventFoldT
ClusterName Peer e (StateT (RuntimeState e) m) (EventId Peer)
forall o p e (m :: * -> *).
MonadUpdateEF o p e m =>
p -> m (EventId p)
participate Peer
peer
)
RuntimeState {EventFold ClusterName Peer e
rsClusterState :: EventFold ClusterName Peer e
rsClusterState :: forall e. RuntimeState e -> EventFold ClusterName Peer e
rsClusterState} <- StateT (RuntimeState e) m (RuntimeState e)
forall s (m :: * -> *). MonadState s m => m s
get
Text -> StateT (RuntimeState e) m ()
forall (m :: * -> *). (HasCallStack, MonadLogger m) => Text -> m ()
logInfo (Text -> StateT (RuntimeState e) m ())
-> Text -> StateT (RuntimeState e) m ()
forall a b. (a -> b) -> a -> b
$ Text
"Join immediately with: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> EventFold ClusterName Peer e -> Text
forall a b. (Show a, IsString b) => a -> b
showt EventFold ClusterName Peer e
rsClusterState
Responder (JoinResponse e)
-> JoinResponse e -> StateT (RuntimeState e) m ()
forall (m :: * -> *) a. MonadIO m => Responder a -> a -> m ()
respond Responder (JoinResponse e)
responder (EventFold ClusterName Peer e -> JoinResponse e
forall e. EventFold ClusterName Peer e -> JoinResponse e
JoinOk EventFold ClusterName Peer e
rsClusterState)
handleRuntimeMessage (ReadState Responder (EventFold ClusterName Peer e)
responder) =
{-# SCC "ReadState" #-}
Responder (EventFold ClusterName Peer e)
-> EventFold ClusterName Peer e -> StateT (RuntimeState e) m ()
forall (m :: * -> *) a. MonadIO m => Responder a -> a -> m ()
respond Responder (EventFold ClusterName Peer e)
responder (EventFold ClusterName Peer e -> StateT (RuntimeState e) m ())
-> (RuntimeState e -> EventFold ClusterName Peer e)
-> RuntimeState e
-> StateT (RuntimeState e) m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. RuntimeState e -> EventFold ClusterName Peer e
forall e. RuntimeState e -> EventFold ClusterName Peer e
rsClusterState (RuntimeState e -> StateT (RuntimeState e) m ())
-> StateT (RuntimeState e) m (RuntimeState e)
-> StateT (RuntimeState e) m ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< StateT (RuntimeState e) m (RuntimeState e)
forall s (m :: * -> *). MonadState s m => m s
get
handleRuntimeMessage (Call Peer
target ByteString
msg Responder ByteString
responder) =
{-# SCC "Call" #-}
do
MessageId
mid <- StateT (RuntimeState e) m MessageId
forall (m :: * -> *) e.
Monad m =>
StateT (RuntimeState e) m MessageId
newMessageId
Peer
source <- (RuntimeState e -> Peer) -> StateT (RuntimeState e) m Peer
forall s (m :: * -> *) a. MonadState s m => (s -> a) -> m a
gets RuntimeState e -> Peer
forall e. RuntimeState e -> Peer
rsSelf
MessageId -> StateT (RuntimeState e) m ()
forall (m :: * -> *) e.
Monad m =>
MessageId -> StateT (RuntimeState e) m ()
setCallResponder MessageId
mid
PeerMessage e -> Peer -> StateT (RuntimeState e) m ()
forall (m :: * -> *) e.
(EventConstraints e, MonadCatch m, MonadLoggerIO m,
MonadState (RuntimeState e) m) =>
PeerMessage e -> Peer -> m ()
sendPeer (Peer -> MessageId -> ByteString -> PeerMessage e
forall e. Peer -> MessageId -> ByteString -> PeerMessage e
PMCall Peer
source MessageId
mid ByteString
msg) Peer
target
where
setCallResponder :: (Monad m)
=> MessageId
-> StateT (RuntimeState e) m ()
setCallResponder :: forall (m :: * -> *) e.
Monad m =>
MessageId -> StateT (RuntimeState e) m ()
setCallResponder MessageId
mid = do
state :: RuntimeState e
state@RuntimeState {Map MessageId (Responder ByteString)
rsCalls :: Map MessageId (Responder ByteString)
rsCalls :: forall e. RuntimeState e -> Map MessageId (Responder ByteString)
rsCalls} <- StateT (RuntimeState e) m (RuntimeState e)
forall s (m :: * -> *). MonadState s m => m s
get
RuntimeState e -> StateT (RuntimeState e) m ()
forall s (m :: * -> *). MonadState s m => s -> m ()
put RuntimeState e
state {
rsCalls :: Map MessageId (Responder ByteString)
rsCalls = MessageId
-> Responder ByteString
-> Map MessageId (Responder ByteString)
-> Map MessageId (Responder ByteString)
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert MessageId
mid Responder ByteString
responder Map MessageId (Responder ByteString)
rsCalls
}
handleRuntimeMessage (Cast Peer
target ByteString
msg) =
{-# SCC "Cast" #-}
PeerMessage e -> Peer -> StateT (RuntimeState e) m ()
forall (m :: * -> *) e.
(EventConstraints e, MonadCatch m, MonadLoggerIO m,
MonadState (RuntimeState e) m) =>
PeerMessage e -> Peer -> m ()
sendPeer (ByteString -> PeerMessage e
forall e. ByteString -> PeerMessage e
PMCast ByteString
msg) Peer
target
handleRuntimeMessage (Broadcall DiffTime
timeout ByteString
msg Responder (Map Peer (Maybe ByteString))
responder) =
{-# SCC "Broadcall" #-}
do
TimeSpec
expiry <- DiffTime -> TimeSpec -> TimeSpec
addTime DiffTime
timeout (TimeSpec -> TimeSpec)
-> StateT (RuntimeState e) m TimeSpec
-> StateT (RuntimeState e) m TimeSpec
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> StateT (RuntimeState e) m TimeSpec
forall (m :: * -> *). MonadTimeSpec m => m TimeSpec
getTime
MessageId
mid <- StateT (RuntimeState e) m MessageId
forall (m :: * -> *) e.
Monad m =>
StateT (RuntimeState e) m MessageId
newMessageId
Peer
source <- (RuntimeState e -> Peer) -> StateT (RuntimeState e) m Peer
forall s (m :: * -> *) a. MonadState s m => (s -> a) -> m a
gets RuntimeState e -> Peer
forall e. RuntimeState e -> Peer
rsSelf
TimeSpec -> MessageId -> StateT (RuntimeState e) m ()
forall (m :: * -> *) e.
Monad m =>
TimeSpec -> MessageId -> StateT (RuntimeState e) m ()
setBroadcallResponder TimeSpec
expiry MessageId
mid
(Peer -> StateT (RuntimeState e) m ())
-> Set Peer -> StateT (RuntimeState e) m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (PeerMessage e -> Peer -> StateT (RuntimeState e) m ()
forall (m :: * -> *) e.
(EventConstraints e, MonadCatch m, MonadLoggerIO m,
MonadState (RuntimeState e) m) =>
PeerMessage e -> Peer -> m ()
sendPeer (Peer -> MessageId -> ByteString -> PeerMessage e
forall e. Peer -> MessageId -> ByteString -> PeerMessage e
PMCall Peer
source MessageId
mid ByteString
msg)) (Set Peer -> StateT (RuntimeState e) m ())
-> StateT (RuntimeState e) m (Set Peer)
-> StateT (RuntimeState e) m ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< StateT (RuntimeState e) m (Set Peer)
forall (m :: * -> *) e.
Monad m =>
StateT (RuntimeState e) m (Set Peer)
getPeers
where
setBroadcallResponder :: (Monad m)
=> TimeSpec
-> MessageId
-> StateT (RuntimeState e) m ()
setBroadcallResponder :: forall (m :: * -> *) e.
Monad m =>
TimeSpec -> MessageId -> StateT (RuntimeState e) m ()
setBroadcallResponder TimeSpec
expiry MessageId
mid = do
Set Peer
peers <- StateT (RuntimeState e) m (Set Peer)
forall (m :: * -> *) e.
Monad m =>
StateT (RuntimeState e) m (Set Peer)
getPeers
state :: RuntimeState e
state@RuntimeState {Map
MessageId
(Map Peer (Maybe ByteString),
Responder (Map Peer (Maybe ByteString)), TimeSpec)
rsBroadcalls :: Map
MessageId
(Map Peer (Maybe ByteString),
Responder (Map Peer (Maybe ByteString)), TimeSpec)
rsBroadcalls :: forall e.
RuntimeState e
-> Map
MessageId
(Map Peer (Maybe ByteString),
Responder (Map Peer (Maybe ByteString)), TimeSpec)
rsBroadcalls} <- StateT (RuntimeState e) m (RuntimeState e)
forall s (m :: * -> *). MonadState s m => m s
get
RuntimeState e -> StateT (RuntimeState e) m ()
forall s (m :: * -> *). MonadState s m => s -> m ()
put RuntimeState e
state {
rsBroadcalls :: Map
MessageId
(Map Peer (Maybe ByteString),
Responder (Map Peer (Maybe ByteString)), TimeSpec)
rsBroadcalls =
MessageId
-> (Map Peer (Maybe ByteString),
Responder (Map Peer (Maybe ByteString)), TimeSpec)
-> Map
MessageId
(Map Peer (Maybe ByteString),
Responder (Map Peer (Maybe ByteString)), TimeSpec)
-> Map
MessageId
(Map Peer (Maybe ByteString),
Responder (Map Peer (Maybe ByteString)), TimeSpec)
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert
MessageId
mid
(
[(Peer, Maybe ByteString)] -> Map Peer (Maybe ByteString)
forall k a. Ord k => [(k, a)] -> Map k a
Map.fromList [(Peer
peer, Maybe ByteString
forall a. Maybe a
Nothing) | Peer
peer <- Set Peer -> [Peer]
forall a. Set a -> [a]
Set.toList Set Peer
peers],
Responder (Map Peer (Maybe ByteString))
responder,
TimeSpec
expiry
)
Map
MessageId
(Map Peer (Maybe ByteString),
Responder (Map Peer (Maybe ByteString)), TimeSpec)
rsBroadcalls
}
handleRuntimeMessage (Broadcast ByteString
msg) =
{-# SCC "Broadcast" #-}
(Peer -> StateT (RuntimeState e) m ())
-> Set Peer -> StateT (RuntimeState e) m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (PeerMessage e -> Peer -> StateT (RuntimeState e) m ()
forall (m :: * -> *) e.
(EventConstraints e, MonadCatch m, MonadLoggerIO m,
MonadState (RuntimeState e) m) =>
PeerMessage e -> Peer -> m ()
sendPeer (ByteString -> PeerMessage e
forall e. ByteString -> PeerMessage e
PMCast ByteString
msg)) (Set Peer -> StateT (RuntimeState e) m ())
-> StateT (RuntimeState e) m (Set Peer)
-> StateT (RuntimeState e) m ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< StateT (RuntimeState e) m (Set Peer)
forall (m :: * -> *) e.
Monad m =>
StateT (RuntimeState e) m (Set Peer)
getPeers
handleRuntimeMessage (SendCallResponse Peer
target MessageId
mid ByteString
msg) =
{-# SCC "SendCallResponse" #-}
do
Peer
source <- (RuntimeState e -> Peer) -> StateT (RuntimeState e) m Peer
forall s (m :: * -> *) a. MonadState s m => (s -> a) -> m a
gets RuntimeState e -> Peer
forall e. RuntimeState e -> Peer
rsSelf
PeerMessage e -> Peer -> StateT (RuntimeState e) m ()
forall (m :: * -> *) e.
(EventConstraints e, MonadCatch m, MonadLoggerIO m,
MonadState (RuntimeState e) m) =>
PeerMessage e -> Peer -> m ()
sendPeer (Peer -> MessageId -> ByteString -> PeerMessage e
forall e. Peer -> MessageId -> ByteString -> PeerMessage e
PMCallResponse Peer
source MessageId
mid ByteString
msg) Peer
target
handleRuntimeMessage (HandleCallResponse Peer
source MessageId
mid ByteString
msg) =
{-# SCC "HandleCallResponse" #-}
do
state :: RuntimeState e
state@RuntimeState {Map MessageId (Responder ByteString)
rsCalls :: Map MessageId (Responder ByteString)
rsCalls :: forall e. RuntimeState e -> Map MessageId (Responder ByteString)
rsCalls, Map
MessageId
(Map Peer (Maybe ByteString),
Responder (Map Peer (Maybe ByteString)), TimeSpec)
rsBroadcalls :: Map
MessageId
(Map Peer (Maybe ByteString),
Responder (Map Peer (Maybe ByteString)), TimeSpec)
rsBroadcalls :: forall e.
RuntimeState e
-> Map
MessageId
(Map Peer (Maybe ByteString),
Responder (Map Peer (Maybe ByteString)), TimeSpec)
rsBroadcalls} <- StateT (RuntimeState e) m (RuntimeState e)
forall s (m :: * -> *). MonadState s m => m s
get
case MessageId
-> Map MessageId (Responder ByteString)
-> Maybe (Responder ByteString)
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup MessageId
mid Map MessageId (Responder ByteString)
rsCalls of
Maybe (Responder ByteString)
Nothing ->
case MessageId
-> Map
MessageId
(Map Peer (Maybe ByteString),
Responder (Map Peer (Maybe ByteString)), TimeSpec)
-> Maybe
(Map Peer (Maybe ByteString),
Responder (Map Peer (Maybe ByteString)), TimeSpec)
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup MessageId
mid Map
MessageId
(Map Peer (Maybe ByteString),
Responder (Map Peer (Maybe ByteString)), TimeSpec)
rsBroadcalls of
Maybe
(Map Peer (Maybe ByteString),
Responder (Map Peer (Maybe ByteString)), TimeSpec)
Nothing -> () -> StateT (RuntimeState e) m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Just (Map Peer (Maybe ByteString)
responses, Responder (Map Peer (Maybe ByteString))
responder, TimeSpec
expiry) ->
let
responses2 :: Map Peer (Maybe ByteString)
responses2 = Peer
-> Maybe ByteString
-> Map Peer (Maybe ByteString)
-> Map Peer (Maybe ByteString)
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert Peer
source (ByteString -> Maybe ByteString
forall a. a -> Maybe a
Just ByteString
msg) Map Peer (Maybe ByteString)
responses
response :: Map Peer ByteString
response = [(Peer, ByteString)] -> Map Peer ByteString
forall k a. Ord k => [(k, a)] -> Map k a
Map.fromList [
(Peer
peer, ByteString
r)
| (Peer
peer, Just ByteString
r) <- Map Peer (Maybe ByteString) -> [(Peer, Maybe ByteString)]
forall k a. Map k a -> [(k, a)]
Map.toList Map Peer (Maybe ByteString)
responses2
]
peers :: Set Peer
peers = Map Peer (Maybe ByteString) -> Set Peer
forall k a. Map k a -> Set k
Map.keysSet Map Peer (Maybe ByteString)
responses2
in
if Set Peer -> Bool
forall a. Set a -> Bool
Set.null (Set Peer
peers Set Peer -> Set Peer -> Set Peer
forall a. Ord a => Set a -> Set a -> Set a
\\ Map Peer ByteString -> Set Peer
forall k a. Map k a -> Set k
Map.keysSet Map Peer ByteString
response)
then do
Responder (Map Peer (Maybe ByteString))
-> Map Peer (Maybe ByteString) -> StateT (RuntimeState e) m ()
forall (m :: * -> *) a. MonadIO m => Responder a -> a -> m ()
respond Responder (Map Peer (Maybe ByteString))
responder (ByteString -> Maybe ByteString
forall a. a -> Maybe a
Just (ByteString -> Maybe ByteString)
-> Map Peer ByteString -> Map Peer (Maybe ByteString)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Map Peer ByteString
response)
RuntimeState e -> StateT (RuntimeState e) m ()
forall s (m :: * -> *). MonadState s m => s -> m ()
put RuntimeState e
state {
rsBroadcalls :: Map
MessageId
(Map Peer (Maybe ByteString),
Responder (Map Peer (Maybe ByteString)), TimeSpec)
rsBroadcalls = MessageId
-> Map
MessageId
(Map Peer (Maybe ByteString),
Responder (Map Peer (Maybe ByteString)), TimeSpec)
-> Map
MessageId
(Map Peer (Maybe ByteString),
Responder (Map Peer (Maybe ByteString)), TimeSpec)
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete MessageId
mid Map
MessageId
(Map Peer (Maybe ByteString),
Responder (Map Peer (Maybe ByteString)), TimeSpec)
rsBroadcalls
}
else
RuntimeState e -> StateT (RuntimeState e) m ()
forall s (m :: * -> *). MonadState s m => s -> m ()
put RuntimeState e
state {
rsBroadcalls :: Map
MessageId
(Map Peer (Maybe ByteString),
Responder (Map Peer (Maybe ByteString)), TimeSpec)
rsBroadcalls =
MessageId
-> (Map Peer (Maybe ByteString),
Responder (Map Peer (Maybe ByteString)), TimeSpec)
-> Map
MessageId
(Map Peer (Maybe ByteString),
Responder (Map Peer (Maybe ByteString)), TimeSpec)
-> Map
MessageId
(Map Peer (Maybe ByteString),
Responder (Map Peer (Maybe ByteString)), TimeSpec)
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert
MessageId
mid
(Map Peer (Maybe ByteString)
responses2, Responder (Map Peer (Maybe ByteString))
responder, TimeSpec
expiry)
Map
MessageId
(Map Peer (Maybe ByteString),
Responder (Map Peer (Maybe ByteString)), TimeSpec)
rsBroadcalls
}
Just Responder ByteString
responder -> do
Responder ByteString -> ByteString -> StateT (RuntimeState e) m ()
forall (m :: * -> *) a. MonadIO m => Responder a -> a -> m ()
respond Responder ByteString
responder ByteString
msg
RuntimeState e -> StateT (RuntimeState e) m ()
forall s (m :: * -> *). MonadState s m => s -> m ()
put RuntimeState e
state {rsCalls :: Map MessageId (Responder ByteString)
rsCalls = MessageId
-> Map MessageId (Responder ByteString)
-> Map MessageId (Responder ByteString)
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete MessageId
mid Map MessageId (Responder ByteString)
rsCalls}
handleRuntimeMessage (Resend Responder ()
responder) =
{-# SCC "Resend" #-}
StateT (RuntimeState e) m ()
forall e (m :: * -> *).
(EventConstraints e, MonadCatch m, MonadLoggerIO m,
MonadState (RuntimeState e) m) =>
m ()
propagate StateT (RuntimeState e) m ()
-> (() -> StateT (RuntimeState e) m ())
-> StateT (RuntimeState e) m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Responder () -> () -> StateT (RuntimeState e) m ()
forall (m :: * -> *) a. MonadIO m => Responder a -> a -> m ()
respond Responder ()
responder
getPeers :: (Monad m) => StateT (RuntimeState e) m (Set Peer)
getPeers :: forall (m :: * -> *) e.
Monad m =>
StateT (RuntimeState e) m (Set Peer)
getPeers = (RuntimeState e -> Set Peer)
-> StateT (RuntimeState e) m (Set Peer)
forall s (m :: * -> *) a. MonadState s m => (s -> a) -> m a
gets (EventFold ClusterName Peer e -> Set Peer
forall p o e. Ord p => EventFold o p e -> Set p
projParticipants (EventFold ClusterName Peer e -> Set Peer)
-> (RuntimeState e -> EventFold ClusterName Peer e)
-> RuntimeState e
-> Set Peer
forall b c a. (b -> c) -> (a -> b) -> a -> c
. RuntimeState e -> EventFold ClusterName Peer e
forall e. RuntimeState e -> EventFold ClusterName Peer e
rsClusterState)
newMessageId :: (Monad m) => StateT (RuntimeState e) m MessageId
newMessageId :: forall (m :: * -> *) e.
Monad m =>
StateT (RuntimeState e) m MessageId
newMessageId = do
state :: RuntimeState e
state@RuntimeState {MessageId
rsNextId :: MessageId
rsNextId :: forall e. RuntimeState e -> MessageId
rsNextId} <- StateT (RuntimeState e) m (RuntimeState e)
forall s (m :: * -> *). MonadState s m => m s
get
RuntimeState e -> StateT (RuntimeState e) m ()
forall s (m :: * -> *). MonadState s m => s -> m ()
put RuntimeState e
state {rsNextId :: MessageId
rsNextId = MessageId -> MessageId
nextMessageId MessageId
rsNextId}
MessageId -> StateT (RuntimeState e) m MessageId
forall (m :: * -> *) a. Monad m => a -> m a
return MessageId
rsNextId
updateCluster
:: ( EventConstraints e
, MonadCatch m
, MonadLoggerIO m
, MonadState (RuntimeState e) m
)
=> EventFoldT ClusterName Peer e m a
-> m a
updateCluster :: forall e (m :: * -> *) a.
(EventConstraints e, MonadCatch m, MonadLoggerIO m,
MonadState (RuntimeState e) m) =>
EventFoldT ClusterName Peer e m a -> m a
updateCluster EventFoldT ClusterName Peer e m a
action = do
RuntimeState {Peer
rsSelf :: Peer
rsSelf :: forall e. RuntimeState e -> Peer
rsSelf} <- m (RuntimeState e)
forall s (m :: * -> *). MonadState s m => m s
get
Peer -> EventFoldT ClusterName Peer e m a -> m a
forall e (m :: * -> *) a.
(EventConstraints e, MonadCatch m, MonadLoggerIO m,
MonadState (RuntimeState e) m) =>
Peer -> EventFoldT ClusterName Peer e m a -> m a
updateClusterAs Peer
rsSelf EventFoldT ClusterName Peer e m a
action
updateClusterAs
:: forall e m a.
( EventConstraints e
, MonadCatch m
, MonadLoggerIO m
, MonadState (RuntimeState e) m
)
=> Peer
-> EventFoldT ClusterName Peer e m a
-> m a
updateClusterAs :: forall e (m :: * -> *) a.
(EventConstraints e, MonadCatch m, MonadLoggerIO m,
MonadState (RuntimeState e) m) =>
Peer -> EventFoldT ClusterName Peer e m a -> m a
updateClusterAs Peer
asPeer EventFoldT ClusterName Peer e m a
action = do
(EventFold ClusterName Peer e
oldCluster, EventFold ClusterName Peer e -> IO ()
notify)
<- (RuntimeState e
-> (EventFold ClusterName Peer e,
EventFold ClusterName Peer e -> IO ()))
-> m (EventFold ClusterName Peer e,
EventFold ClusterName Peer e -> IO ())
forall s (m :: * -> *) a. MonadState s m => (s -> a) -> m a
gets (RuntimeState e -> EventFold ClusterName Peer e
forall e. RuntimeState e -> EventFold ClusterName Peer e
rsClusterState (RuntimeState e -> EventFold ClusterName Peer e)
-> (RuntimeState e -> EventFold ClusterName Peer e -> IO ())
-> RuntimeState e
-> (EventFold ClusterName Peer e,
EventFold ClusterName Peer e -> IO ())
forall (a :: * -> * -> *) b c c'.
Arrow a =>
a b c -> a b c' -> a b (c, c')
&&& RuntimeState e -> EventFold ClusterName Peer e -> IO ()
forall e. RuntimeState e -> EventFold ClusterName Peer e -> IO ()
rsNotify)
(a
v, UpdateResult ClusterName Peer e
ur) <- Peer
-> EventFold ClusterName Peer e
-> EventFoldT ClusterName Peer e m a
-> m (a, UpdateResult ClusterName Peer e)
forall p o e (m :: * -> *) a.
Ord p =>
p
-> EventFold o p e
-> EventFoldT o p e m a
-> m (a, UpdateResult o p e)
runEventFoldT Peer
asPeer EventFold ClusterName Peer e
oldCluster EventFoldT ClusterName Peer e m a
action
do
let
newCluster :: EventFold ClusterName Peer e
newCluster :: EventFold ClusterName Peer e
newCluster = UpdateResult ClusterName Peer e -> EventFold ClusterName Peer e
forall o p e. UpdateResult o p e -> EventFold o p e
urEventFold UpdateResult ClusterName Peer e
ur
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (EventFold ClusterName Peer e
oldCluster EventFold ClusterName Peer e
-> EventFold ClusterName Peer e -> Bool
forall a. Eq a => a -> a -> Bool
/= EventFold ClusterName Peer e
newCluster) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (EventFold ClusterName Peer e -> IO ()
notify EventFold ClusterName Peer e
newCluster)
(RuntimeState e -> RuntimeState e) -> m ()
forall s (m :: * -> *). MonadState s m => (s -> s) -> m ()
modify'
(
let
doModify :: RuntimeState e -> RuntimeState e
doModify RuntimeState e
state =
EventFold ClusterName Peer e
newCluster EventFold ClusterName Peer e -> RuntimeState e -> RuntimeState e
`seq`
RuntimeState e
state
{ rsClusterState :: EventFold ClusterName Peer e
rsClusterState = EventFold ClusterName Peer e
newCluster
}
in
RuntimeState e -> RuntimeState e
doModify
)
do
Peer
self <- (RuntimeState e -> Peer) -> m Peer
forall s (m :: * -> *) a. MonadState s m => (s -> a) -> m a
gets RuntimeState e -> Peer
forall e. RuntimeState e -> Peer
rsSelf
let
outputs :: Map (EventId Peer) (Output e)
outputs :: Map (EventId Peer) (Output e)
outputs = UpdateResult ClusterName Peer e -> Map (EventId Peer) (Output e)
forall o p e. UpdateResult o p e -> Map (EventId p) (Output e)
urOutputs UpdateResult ClusterName Peer e
ur
byRemotePeer :: Map Peer (Map (EventId Peer) (Output e))
byRemotePeer :: Map Peer (Map (EventId Peer) (Output e))
byRemotePeer =
(Map (EventId Peer) (Output e)
-> Map (EventId Peer) (Output e) -> Map (EventId Peer) (Output e))
-> [Map Peer (Map (EventId Peer) (Output e))]
-> Map Peer (Map (EventId Peer) (Output e))
forall (f :: * -> *) k a.
(Foldable f, Ord k) =>
(a -> a -> a) -> f (Map k a) -> Map k a
Map.unionsWith
Map (EventId Peer) (Output e)
-> Map (EventId Peer) (Output e) -> Map (EventId Peer) (Output e)
forall a. Semigroup a => a -> a -> a
(<>)
[ Peer
-> Map (EventId Peer) (Output e)
-> Map Peer (Map (EventId Peer) (Output e))
forall k a. k -> a -> Map k a
Map.singleton Peer
peer (EventId Peer -> Output e -> Map (EventId Peer) (Output e)
forall k a. k -> a -> Map k a
Map.singleton EventId Peer
eid Output e
o)
| (EventId Peer
eid, Output e
o) <- Map (EventId Peer) (Output e) -> [(EventId Peer, Output e)]
forall k a. Map k a -> [(k, a)]
Map.toList Map (EventId Peer) (Output e)
outputs
, Just Peer
peer <- [EventId Peer -> Maybe Peer
forall p. EventId p -> Maybe p
EF.source EventId Peer
eid]
]
Map (EventId Peer) (Output e) -> m ()
forall (m :: * -> *) e.
(MonadLoggerIO m, MonadState (RuntimeState e) m,
Show (Output e)) =>
Map (EventId Peer) (Output e) -> m ()
respondToWaiting Map (EventId Peer) (Output e)
outputs
[m ()] -> m ()
forall (t :: * -> *) (m :: * -> *) a.
(Foldable t, Monad m) =>
t (m a) -> m ()
sequence_
[ do
Text -> m ()
forall (m :: * -> *). (HasCallStack, MonadLogger m) => Text -> m ()
logDebug (Text -> m ()) -> Text -> m ()
forall a b. (a -> b) -> a -> b
$ Text
"Sending remote outputs: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Map Peer (Map (EventId Peer) (Output e)) -> Text
forall a b. (Show a, IsString b) => a -> b
showt Map Peer (Map (EventId Peer) (Output e))
byRemotePeer
PeerMessage e -> Peer -> m ()
forall (m :: * -> *) e.
(EventConstraints e, MonadCatch m, MonadLoggerIO m,
MonadState (RuntimeState e) m) =>
PeerMessage e -> Peer -> m ()
sendPeer (Map (EventId Peer) (Output e) -> PeerMessage e
forall e. Map (EventId Peer) (Output e) -> PeerMessage e
PMOutputs Map (EventId Peer) (Output e)
outputsForPeer) Peer
peer
| (Peer
peer, Map (EventId Peer) (Output e)
outputsForPeer) <- Map Peer (Map (EventId Peer) (Output e))
-> [(Peer, Map (EventId Peer) (Output e))]
forall k a. Map k a -> [(k, a)]
Map.toList Map Peer (Map (EventId Peer) (Output e))
byRemotePeer
, Peer
peer Peer -> Peer -> Bool
forall a. Eq a => a -> a -> Bool
/= Peer
self
]
a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure a
v
waitOn :: (Monad m)
=> EventId Peer
-> Responder (Output e)
-> StateT (RuntimeState e) m ()
waitOn :: forall (m :: * -> *) e.
Monad m =>
EventId Peer
-> Responder (Output e) -> StateT (RuntimeState e) m ()
waitOn EventId Peer
sid Responder (Output e)
responder =
(RuntimeState e -> RuntimeState e) -> StateT (RuntimeState e) m ()
forall s (m :: * -> *). MonadState s m => (s -> s) -> m ()
modify' (\state :: RuntimeState e
state@RuntimeState {Map (EventId Peer) (Responder (Output e))
rsWaiting :: Map (EventId Peer) (Responder (Output e))
rsWaiting :: forall e.
RuntimeState e -> Map (EventId Peer) (Responder (Output e))
rsWaiting} -> RuntimeState e
state {
rsWaiting :: Map (EventId Peer) (Responder (Output e))
rsWaiting = EventId Peer
-> Responder (Output e)
-> Map (EventId Peer) (Responder (Output e))
-> Map (EventId Peer) (Responder (Output e))
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert EventId Peer
sid Responder (Output e)
responder Map (EventId Peer) (Responder (Output e))
rsWaiting
})
propagate
:: ( EventConstraints e
, MonadCatch m
, MonadLoggerIO m
, MonadState (RuntimeState e) m
)
=> m ()
propagate :: forall e (m :: * -> *).
(EventConstraints e, MonadCatch m, MonadLoggerIO m,
MonadState (RuntimeState e) m) =>
m ()
propagate = do
(Peer
self, EventFold ClusterName Peer e
cluster) <- (RuntimeState e -> (Peer, EventFold ClusterName Peer e))
-> m (Peer, EventFold ClusterName Peer e)
forall s (m :: * -> *) a. MonadState s m => (s -> a) -> m a
gets (RuntimeState e -> Peer
forall e. RuntimeState e -> Peer
rsSelf (RuntimeState e -> Peer)
-> (RuntimeState e -> EventFold ClusterName Peer e)
-> RuntimeState e
-> (Peer, EventFold ClusterName Peer e)
forall (a :: * -> * -> *) b c c'.
Arrow a =>
a b c -> a b c' -> a b (c, c')
&&& RuntimeState e -> EventFold ClusterName Peer e
forall e. RuntimeState e -> EventFold ClusterName Peer e
rsClusterState)
let
targets :: Set Peer
targets = Peer -> Set Peer -> Set Peer
forall a. Ord a => a -> Set a -> Set a
Set.delete Peer
self (Set Peer -> Set Peer) -> Set Peer -> Set Peer
forall a b. (a -> b) -> a -> b
$
EventFold ClusterName Peer e -> Set Peer
forall p o e. Ord p => EventFold o p e -> Set p
EF.allParticipants EventFold ClusterName Peer e
cluster
IO [Peer] -> m [Peer]
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO ([Peer] -> IO [Peer]
forall (m :: * -> *) a. MonadRandom m => [a] -> m [a]
shuffleM (Set Peer -> [Peer]
forall a. Set a -> [a]
Set.toList Set Peer
targets)) m [Peer] -> ([Peer] -> m ()) -> m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
[] -> () -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Peer
target:[Peer]
_ ->
PeerMessage e -> Peer -> m ()
forall (m :: * -> *) e.
(EventConstraints e, MonadCatch m, MonadLoggerIO m,
MonadState (RuntimeState e) m) =>
PeerMessage e -> Peer -> m ()
sendPeer (EventFold ClusterName Peer e -> PeerMessage e
forall e. EventFold ClusterName Peer e -> PeerMessage e
PMFullMerge EventFold ClusterName Peer e
cluster) Peer
target
m ()
forall e (m :: * -> *).
(MonadState (RuntimeState e) m, MonadLogger m) =>
m ()
disconnectObsolete
where
disconnectObsolete
:: ( MonadState (RuntimeState e) m
, MonadLogger m
)
=> m ()
disconnectObsolete :: forall e (m :: * -> *).
(MonadState (RuntimeState e) m, MonadLogger m) =>
m ()
disconnectObsolete = do
(EventFold ClusterName Peer e
cluster, Map Peer (Connection e)
conns) <- (RuntimeState e
-> (EventFold ClusterName Peer e, Map Peer (Connection e)))
-> m (EventFold ClusterName Peer e, Map Peer (Connection e))
forall s (m :: * -> *) a. MonadState s m => (s -> a) -> m a
gets (RuntimeState e -> EventFold ClusterName Peer e
forall e. RuntimeState e -> EventFold ClusterName Peer e
rsClusterState (RuntimeState e -> EventFold ClusterName Peer e)
-> (RuntimeState e -> Map Peer (Connection e))
-> RuntimeState e
-> (EventFold ClusterName Peer e, Map Peer (Connection e))
forall (a :: * -> * -> *) b c c'.
Arrow a =>
a b c -> a b c' -> a b (c, c')
&&& RuntimeState e -> Map Peer (Connection e)
forall e. RuntimeState e -> Map Peer (Connection e)
rsConnections)
let obsolete :: Set Peer
obsolete = Map Peer (Connection e) -> Set Peer
forall k a. Map k a -> Set k
Map.keysSet Map Peer (Connection e)
conns Set Peer -> Set Peer -> Set Peer
forall a. Ord a => Set a -> Set a -> Set a
\\ EventFold ClusterName Peer e -> Set Peer
forall p o e. Ord p => EventFold o p e -> Set p
EF.allParticipants EventFold ClusterName Peer e
cluster
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (Set Peer -> Bool
forall a. Set a -> Bool
Set.null Set Peer
obsolete) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
Text -> m ()
forall (m :: * -> *). (HasCallStack, MonadLogger m) => Text -> m ()
logInfo (Text -> m ()) -> Text -> m ()
forall a b. (a -> b) -> a -> b
$ Text
"Disconnecting obsolete: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Set Peer -> Text
forall a b. (Show a, IsString b) => a -> b
showt Set Peer
obsolete
(Peer -> m ()) -> Set Peer -> m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ Peer -> m ()
forall (m :: * -> *) e.
(MonadLogger m, MonadState (RuntimeState e) m) =>
Peer -> m ()
disconnect Set Peer
obsolete
respondToWaiting
:: forall m e.
( MonadLoggerIO m
, MonadState (RuntimeState e) m
, Show (Output e)
)
=> Map (EventId Peer) (Output e)
-> m ()
respondToWaiting :: forall (m :: * -> *) e.
(MonadLoggerIO m, MonadState (RuntimeState e) m,
Show (Output e)) =>
Map (EventId Peer) (Output e) -> m ()
respondToWaiting Map (EventId Peer) (Output e)
available = do
RuntimeState e
rs <- m (RuntimeState e)
forall s (m :: * -> *). MonadState s m => m s
get
Text -> m ()
forall (m :: * -> *). (HasCallStack, MonadLogger m) => Text -> m ()
logDebug
(Text -> m ()) -> Text -> m ()
forall a b. (a -> b) -> a -> b
$ Text
"Responding to: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> (Map (EventId Peer) (Output e), Set (EventId Peer)) -> Text
forall a b. (Show a, IsString b) => a -> b
showt (Map (EventId Peer) (Output e)
available, Map (EventId Peer) (Responder (Output e)) -> Set (EventId Peer)
forall k a. Map k a -> Set k
Map.keysSet (RuntimeState e -> Map (EventId Peer) (Responder (Output e))
forall e.
RuntimeState e -> Map (EventId Peer) (Responder (Output e))
rsWaiting RuntimeState e
rs))
((EventId Peer, Output e) -> m ())
-> [(EventId Peer, Output e)] -> m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (EventId Peer, Output e) -> m ()
respondToOne (Map (EventId Peer) (Output e) -> [(EventId Peer, Output e)]
forall k a. Map k a -> [(k, a)]
Map.toList Map (EventId Peer) (Output e)
available)
where
respondToOne
:: (EventId Peer, Output e)
-> m ()
respondToOne :: (EventId Peer, Output e) -> m ()
respondToOne (EventId Peer
sid, Output e
output) = do
state :: RuntimeState e
state@RuntimeState {Map (EventId Peer) (Responder (Output e))
rsWaiting :: Map (EventId Peer) (Responder (Output e))
rsWaiting :: forall e.
RuntimeState e -> Map (EventId Peer) (Responder (Output e))
rsWaiting} <- m (RuntimeState e)
forall s (m :: * -> *). MonadState s m => m s
get
case EventId Peer
-> Map (EventId Peer) (Responder (Output e))
-> Maybe (Responder (Output e))
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup EventId Peer
sid Map (EventId Peer) (Responder (Output e))
rsWaiting of
Maybe (Responder (Output e))
Nothing -> () -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Just Responder (Output e)
responder -> do
Responder (Output e) -> Output e -> m ()
forall (m :: * -> *) a. MonadIO m => Responder a -> a -> m ()
respond Responder (Output e)
responder Output e
output
RuntimeState e -> m ()
forall s (m :: * -> *). MonadState s m => s -> m ()
put RuntimeState e
state {rsWaiting :: Map (EventId Peer) (Responder (Output e))
rsWaiting = EventId Peer
-> Map (EventId Peer) (Responder (Output e))
-> Map (EventId Peer) (Responder (Output e))
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete EventId Peer
sid Map (EventId Peer) (Responder (Output e))
rsWaiting}
data StartupMode e
= NewCluster
Peer
ClusterName
| JoinCluster
Peer
ClusterName
Peer
| Recover
Peer
(EventFold ClusterName Peer e)
deriving stock instance
( Show e
, Show (Output e)
, Show (State e)
)
=>
Show (StartupMode e)
makeRuntimeState :: (EventConstraints e, MonadLoggerIO m)
=> (Peer -> EventFold ClusterName Peer e -> IO ())
-> StartupMode e
-> m (RuntimeState e)
makeRuntimeState :: forall e (m :: * -> *).
(EventConstraints e, MonadLoggerIO m) =>
(Peer -> EventFold ClusterName Peer e -> IO ())
-> StartupMode e -> m (RuntimeState e)
makeRuntimeState
Peer -> EventFold ClusterName Peer e -> IO ()
notify
(NewCluster Peer
self ClusterName
clusterId)
= do
Text -> m ()
forall (m :: * -> *). (HasCallStack, MonadLogger m) => Text -> m ()
logInfo Text
"Starting a new cluster."
(Peer -> EventFold ClusterName Peer e -> IO ())
-> StartupMode e -> m (RuntimeState e)
forall e (m :: * -> *).
(EventConstraints e, MonadLoggerIO m) =>
(Peer -> EventFold ClusterName Peer e -> IO ())
-> StartupMode e -> m (RuntimeState e)
makeRuntimeState
Peer -> EventFold ClusterName Peer e -> IO ()
notify
(Peer -> EventFold ClusterName Peer e -> StartupMode e
forall e. Peer -> EventFold ClusterName Peer e -> StartupMode e
Recover Peer
self (ClusterName -> Peer -> EventFold ClusterName Peer e
forall o p e.
(Default (State e), Event p e, Ord p) =>
o -> p -> EventFold o p e
EF.new ClusterName
clusterId Peer
self))
makeRuntimeState
Peer -> EventFold ClusterName Peer e -> IO ()
notify
(JoinCluster Peer
self ClusterName
_clusterName Peer
targetPeer)
= do
Text -> m ()
forall (m :: * -> *). (HasCallStack, MonadLogger m) => Text -> m ()
logInfo (Text -> m ()) -> Text -> m ()
forall a b. (a -> b) -> a -> b
$ Text
"Trying to join an existing cluster on " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> AddressDescription -> Text
forall a b. (Show a, IsString b) => a -> b
showt AddressDescription
addr
JoinOk EventFold ClusterName Peer e
cluster <-
JoinRequest -> m (JoinResponse e)
forall e (m :: * -> *).
(EventConstraints e, MonadLoggerIO m) =>
JoinRequest -> m (JoinResponse e)
requestJoin
(JoinRequest -> m (JoinResponse e))
-> (Peer -> JoinRequest) -> Peer -> m (JoinResponse e)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Peer -> JoinRequest
JoinRequest
(Peer -> m (JoinResponse e)) -> Peer -> m (JoinResponse e)
forall a b. (a -> b) -> a -> b
$ Peer
self
Text -> m ()
forall (m :: * -> *). (HasCallStack, MonadLogger m) => Text -> m ()
logInfo (Text -> m ()) -> Text -> m ()
forall a b. (a -> b) -> a -> b
$ Text
"Join response with cluster: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> EventFold ClusterName Peer e -> Text
forall a b. (Show a, IsString b) => a -> b
showt EventFold ClusterName Peer e
cluster
(Peer -> EventFold ClusterName Peer e -> IO ())
-> StartupMode e -> m (RuntimeState e)
forall e (m :: * -> *).
(EventConstraints e, MonadLoggerIO m) =>
(Peer -> EventFold ClusterName Peer e -> IO ())
-> StartupMode e -> m (RuntimeState e)
makeRuntimeState Peer -> EventFold ClusterName Peer e -> IO ()
notify (Peer -> EventFold ClusterName Peer e -> StartupMode e
forall e. Peer -> EventFold ClusterName Peer e -> StartupMode e
Recover Peer
self EventFold ClusterName Peer e
cluster)
where
requestJoin :: (EventConstraints e, MonadLoggerIO m)
=> JoinRequest
-> m (JoinResponse e)
requestJoin :: forall e (m :: * -> *).
(EventConstraints e, MonadLoggerIO m) =>
JoinRequest -> m (JoinResponse e)
requestJoin JoinRequest
joinMsg = ((JoinRequest -> m (JoinResponse e))
-> JoinRequest -> m (JoinResponse e)
forall a b. (a -> b) -> a -> b
$ JoinRequest
joinMsg) ((JoinRequest -> m (JoinResponse e)) -> m (JoinResponse e))
-> m (JoinRequest -> m (JoinResponse e)) -> m (JoinResponse e)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< AddressDescription
-> Maybe ClientParams -> m (JoinRequest -> m (JoinResponse e))
forall request response (m :: * -> *) (n :: * -> *).
(Binary request, Binary response, MonadIO m, MonadLoggerIO n,
Show response) =>
AddressDescription
-> Maybe ClientParams -> n (request -> m response)
connectServer AddressDescription
addr Maybe ClientParams
forall a. Maybe a
Nothing
addr :: AddressDescription
addr :: AddressDescription
addr =
Text -> AddressDescription
AddressDescription
(
Peer -> Text
unPeer Peer
targetPeer
Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
":" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> PortNumber -> Text
forall a b. (Show a, IsString b) => a -> b
showt PortNumber
joinMessagePort
)
makeRuntimeState
Peer -> EventFold ClusterName Peer e -> IO ()
notify
(Recover Peer
self EventFold ClusterName Peer e
clusterState)
= do
MessageId
rsNextId <- m MessageId
forall (m :: * -> *). MonadIO m => m MessageId
newSequence
RuntimeState e -> m (RuntimeState e)
forall (m :: * -> *) a. Monad m => a -> m a
return
RuntimeState :: forall e.
Peer
-> EventFold ClusterName Peer e
-> Map Peer (Connection e)
-> Map (EventId Peer) (Responder (Output e))
-> Map MessageId (Responder ByteString)
-> Map
MessageId
(Map Peer (Maybe ByteString),
Responder (Map Peer (Maybe ByteString)), TimeSpec)
-> MessageId
-> (EventFold ClusterName Peer e -> IO ())
-> Map (EventId Peer) (Responder (JoinResponse e))
-> RuntimeState e
RuntimeState
{ rsSelf :: Peer
rsSelf = Peer
self
, rsClusterState :: EventFold ClusterName Peer e
rsClusterState = EventFold ClusterName Peer e
clusterState
, rsConnections :: Map Peer (Connection e)
rsConnections = Map Peer (Connection e)
forall a. Monoid a => a
mempty
, rsWaiting :: Map (EventId Peer) (Responder (Output e))
rsWaiting = Map (EventId Peer) (Responder (Output e))
forall a. Monoid a => a
mempty
, rsCalls :: Map MessageId (Responder ByteString)
rsCalls = Map MessageId (Responder ByteString)
forall a. Monoid a => a
mempty
, rsBroadcalls :: Map
MessageId
(Map Peer (Maybe ByteString),
Responder (Map Peer (Maybe ByteString)), TimeSpec)
rsBroadcalls = Map
MessageId
(Map Peer (Maybe ByteString),
Responder (Map Peer (Maybe ByteString)), TimeSpec)
forall a. Monoid a => a
mempty
, MessageId
rsNextId :: MessageId
rsNextId :: MessageId
rsNextId
, rsNotify :: EventFold ClusterName Peer e -> IO ()
rsNotify = Peer -> EventFold ClusterName Peer e -> IO ()
notify Peer
self
, rsJoins :: Map (EventId Peer) (Responder (JoinResponse e))
rsJoins = Map (EventId Peer) (Responder (JoinResponse e))
forall a. Monoid a => a
mempty
}
newtype JoinRequest = JoinRequest Peer
deriving stock ((forall x. JoinRequest -> Rep JoinRequest x)
-> (forall x. Rep JoinRequest x -> JoinRequest)
-> Generic JoinRequest
forall x. Rep JoinRequest x -> JoinRequest
forall x. JoinRequest -> Rep JoinRequest x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep JoinRequest x -> JoinRequest
$cfrom :: forall x. JoinRequest -> Rep JoinRequest x
Generic, Int -> JoinRequest -> ShowS
[JoinRequest] -> ShowS
JoinRequest -> String
(Int -> JoinRequest -> ShowS)
-> (JoinRequest -> String)
-> ([JoinRequest] -> ShowS)
-> Show JoinRequest
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [JoinRequest] -> ShowS
$cshowList :: [JoinRequest] -> ShowS
show :: JoinRequest -> String
$cshow :: JoinRequest -> String
showsPrec :: Int -> JoinRequest -> ShowS
$cshowsPrec :: Int -> JoinRequest -> ShowS
Show)
instance Binary JoinRequest
newSequence :: (MonadIO m) => m MessageId
newSequence :: forall (m :: * -> *). MonadIO m => m MessageId
newSequence = do
UUID
sid <- m UUID
forall (m :: * -> *). MonadIO m => m UUID
getUUID
MessageId -> m MessageId
forall (f :: * -> *) a. Applicative f => a -> f a
pure (UUID -> Word64 -> MessageId
M UUID
sid Word64
0)
where
getUUID :: (MonadIO m) => m UUID
getUUID :: forall (m :: * -> *). MonadIO m => m UUID
getUUID = IO (Maybe UUID) -> m (Maybe UUID)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (Maybe UUID)
nextUUID m (Maybe UUID) -> (Maybe UUID -> m UUID) -> m UUID
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= m UUID -> (UUID -> m UUID) -> Maybe UUID -> m UUID
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (m ()
wait m () -> m UUID -> m UUID
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> m UUID
forall (m :: * -> *). MonadIO m => m UUID
getUUID) UUID -> m UUID
forall (m :: * -> *) a. Monad m => a -> m a
return
where
wait :: m ()
wait = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Int -> IO ()
threadDelay Int
oneMillisecond)
oneMillisecond :: Int
oneMillisecond = Int
1000
nextMessageId :: MessageId -> MessageId
nextMessageId :: MessageId -> MessageId
nextMessageId (M UUID
sequenceId Word64
ord) = UUID -> Word64 -> MessageId
M UUID
sequenceId (Word64 -> Word64
forall a. Enum a => a -> a
succ Word64
ord)
getClusterName :: Runtime e -> ClusterName
getClusterName :: forall e. Runtime e -> ClusterName
getClusterName = Runtime e -> ClusterName
forall e. Runtime e -> ClusterName
rClusterId
joinMessagePort :: PortNumber
joinMessagePort :: PortNumber
joinMessagePort = PortNumber
5289
respond :: (MonadIO m) => Responder a -> a -> m ()
respond :: forall (m :: * -> *) a. MonadIO m => Responder a -> a -> m ()
respond Responder a
responder = m Responded -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m Responded -> m ()) -> (a -> m Responded) -> a -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Responder a -> a -> m Responded
forall (m :: * -> *) a.
MonadIO m =>
Responder a -> a -> m Responded
Fork.respond Responder a
responder
getStats :: (MonadIO m) => Runtime e -> m Stats
getStats :: forall (m :: * -> *) e. MonadIO m => Runtime e -> m Stats
getStats Runtime e
runtime = do
TimeSpec
now <- IO TimeSpec -> m TimeSpec
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO TimeSpec
forall (m :: * -> *). MonadTimeSpec m => m TimeSpec
getTime
Map Peer TimeSpec
stats <- IO (Map Peer TimeSpec) -> m (Map Peer TimeSpec)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Map Peer TimeSpec) -> m (Map Peer TimeSpec))
-> IO (Map Peer TimeSpec) -> m (Map Peer TimeSpec)
forall a b. (a -> b) -> a -> b
$ IORef (Map Peer TimeSpec) -> IO (Map Peer TimeSpec)
forall a. IORef a -> IO a
readIORef (Runtime e -> IORef (Map Peer TimeSpec)
forall e. Runtime e -> IORef (Map Peer TimeSpec)
rStats Runtime e
runtime)
Stats -> m Stats
forall (f :: * -> *) a. Applicative f => a -> f a
pure
Stats :: Map Peer DiffTime -> Stats
Stats
{ timeWithoutProgress :: Map Peer DiffTime
timeWithoutProgress = TimeSpec -> TimeSpec -> DiffTime
diffTimeSpec TimeSpec
now (TimeSpec -> DiffTime) -> Map Peer TimeSpec -> Map Peer DiffTime
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Map Peer TimeSpec
stats
}