{-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE NumericUnderscores #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE StandaloneDeriving #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE UndecidableInstances #-}
module OM.Legion.Runtime (
forkLegionary,
Runtime,
StartupMode(..),
MonadConstraints,
applyFast,
applyConsistent,
readState,
call,
cast,
broadcall,
broadcast,
eject,
getSelf,
getClusterName,
getStats,
Stats(..),
) where
import Control.Arrow (Arrow((&&&)))
import Control.Concurrent (threadDelay)
import Control.Exception.Safe (MonadCatch, tryAny)
import Control.Monad (unless, void, when)
import Control.Monad.IO.Class (MonadIO(liftIO))
import Control.Monad.IO.Unlift (MonadUnliftIO)
import Control.Monad.Logger.CallStack (LoggingT(runLoggingT),
MonadLoggerIO(askLoggerIO), LogStr, MonadLogger, logDebug, logError,
logInfo)
import Control.Monad.State (MonadState(get, put), StateT, evalStateT,
gets, modify')
import Control.Monad.Trans.Class (MonadTrans(lift))
import Data.Aeson (ToJSON)
import Data.Binary (Binary)
import Data.ByteString.Lazy (ByteString)
import Data.CRDT.EventFold (Event(Output, State),
UpdateResult(urEventFold, urOutputs), EventFold, EventId, diffSize,
events, infimumId, projParticipants)
import Data.CRDT.EventFold.Monad (MonadUpdateEF(diffMerge, disassociate,
event, fullMerge, participate), EventFoldT, runEventFoldT)
import Data.Default.Class (Default)
import Data.Function ((&))
import Data.IORef (IORef, atomicModifyIORef', newIORef, readIORef)
import Data.Map (Map)
import Data.Set ((\\), Set)
import Data.Time (DiffTime, diffTimeToPicoseconds, picosecondsToDiffTime)
import Data.UUID (UUID)
import Data.UUID.V1 (nextUUID)
import GHC.Generics (Generic)
import Network.Socket (PortNumber)
import OM.Fork (Actor(Msg, actorChan), Race, Responder, race)
import OM.Legion.Connection (JoinResponse(JoinOk),
RuntimeState(RuntimeState, rsBroadcalls, rsCalls, rsClusterState,
rsConnections, rsJoins, rsNextId, rsNotify, rsSelf, rsWaiting),
EventConstraints, disconnect, peerMessagePort, sendPeer)
import OM.Legion.MsgChan (MessageId(M), Peer(unPeer), PeerMessage(PMCall,
PMCallResponse, PMCast, PMFullMerge, PMMerge, PMOutputs), ClusterName)
import OM.Legion.RChan (JoinRequest(JoinRequest),
RuntimeMessage(ApplyConsistent, ApplyFast, Broadcall, Broadcast,
Call, Cast, Eject, FullMerge, GetStats, HandleCallResponse, Join,
Merge, Outputs, ReadState, Resend, SendCallResponse), RChan, newRChan,
readRChan)
import OM.Logging (withPrefix)
import OM.Show (showj, showt)
import OM.Socket (AddressDescription(AddressDescription), connectServer,
openIngress, openServer)
import OM.Time (MonadTimeSpec(getTime), addTime, diffTimeSpec)
import Prelude (Applicative(pure), Either(Left, Right), Enum(succ),
Eq((/=)), Functor(fmap), Maybe(Just, Nothing), Monad((>>), (>>=),
return), Monoid(mempty), Ord((<=), (>=)), Semigroup((<>)), ($), (.),
(<$>), (=<<), IO, Int, MonadFail, Show, String, fst, mapM_, maybe,
otherwise, seq, sequence_)
import System.Clock (TimeSpec)
import System.Random.Shuffle (shuffleM)
import qualified Data.Binary as Binary
import qualified Data.CRDT.EventFold as EF
import qualified Data.Map as Map
import qualified Data.Set as Set
import qualified OM.Fork as Fork
import qualified Streaming.Prelude as Stream
{-# ANN module ("HLint: ignore Redundant <$>" :: String) #-}
{-# ANN module ("HLint: ignore Use underscore" :: String) #-}
type MonadConstraints m =
( MonadCatch m
, MonadFail m
, MonadLoggerIO m
, MonadTimeSpec m
, MonadUnliftIO m
, Race
)
forkLegionary
:: ( EventConstraints e
, MonadConstraints m
)
=> (ByteString -> IO ByteString)
-> (ByteString -> IO ())
-> (Peer -> EventFold ClusterName Peer e -> IO ())
-> Int
-> StartupMode e
-> m (Runtime e)
forkLegionary :: forall e (m :: * -> *).
(EventConstraints e, MonadConstraints m) =>
(ByteString -> IO ByteString)
-> (ByteString -> IO ())
-> (Peer -> EventFold ClusterName Peer e -> IO ())
-> Int
-> StartupMode e
-> m (Runtime e)
forkLegionary
ByteString -> IO ByteString
handleUserCall
ByteString -> IO ()
handleUserCast
Peer -> EventFold ClusterName Peer e -> IO ()
notify
Int
resendInterval
StartupMode e
startupMode
= do
Text -> m ()
forall (m :: * -> *). (HasCallStack, MonadLogger m) => Text -> m ()
logInfo (Text -> m ()) -> Text -> m ()
forall a b. (a -> b) -> a -> b
$ Text
"Starting up with the following Mode: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> StartupMode e -> Text
forall a b. (Show a, IsString b) => a -> b
showt StartupMode e
startupMode
rts <- (Peer -> EventFold ClusterName Peer e -> IO ())
-> StartupMode e -> m (RuntimeState e)
forall e (m :: * -> *).
(EventConstraints e, MonadLoggerIO m) =>
(Peer -> EventFold ClusterName Peer e -> IO ())
-> StartupMode e -> m (RuntimeState e)
makeRuntimeState Peer -> EventFold ClusterName Peer e -> IO ()
notify StartupMode e
startupMode
runtimeChan <- newRChan
logging <- withPrefix (logPrefix (rsSelf rts)) <$> askLoggerIO
rStats <- liftIO $ newIORef mempty
(`runLoggingT` logging) $
executeRuntime
handleUserCall
handleUserCast
resendInterval
rts
runtimeChan
rStats
let
clusterId :: ClusterName
clusterId = EventFold ClusterName Peer e -> ClusterName
forall o p e. EventFold o p e -> o
EF.origin (RuntimeState e -> EventFold ClusterName Peer e
forall e. RuntimeState e -> EventFold ClusterName Peer e
rsClusterState RuntimeState e
rts)
return
Runtime
{ rChan = runtimeChan
, rSelf = rsSelf rts
, rClusterId = clusterId
, rStats
}
where
logPrefix :: Peer -> LogStr
logPrefix :: Peer -> LogStr
logPrefix Peer
self_ = LogStr
"[" LogStr -> LogStr -> LogStr
forall a. Semigroup a => a -> a -> a
<> Peer -> LogStr
forall a b. (Show a, IsString b) => a -> b
showt Peer
self_ LogStr -> LogStr -> LogStr
forall a. Semigroup a => a -> a -> a
<> LogStr
"] "
data Runtime e = Runtime
{ forall e. Runtime e -> RChan e
rChan :: RChan e
, forall e. Runtime e -> Peer
rSelf :: Peer
, forall e. Runtime e -> ClusterName
rClusterId :: ClusterName
, forall e. Runtime e -> IORef (Map Peer TimeSpec)
rStats :: IORef (Map Peer TimeSpec)
}
instance Actor (Runtime e) where
type Msg (Runtime e) = RuntimeMessage e
actorChan :: Runtime e -> Msg (Runtime e) -> IO ()
actorChan = RChan e -> Msg (RChan e) -> IO ()
RChan e -> RuntimeMessage e -> IO ()
forall a. Actor a => a -> Msg a -> IO ()
actorChan (RChan e -> RuntimeMessage e -> IO ())
-> (Runtime e -> RChan e) -> Runtime e -> RuntimeMessage e -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Runtime e -> RChan e
forall e. Runtime e -> RChan e
rChan
newtype Stats = Stats
{ Stats -> Map Peer DiffTime
timeWithoutProgress :: Map Peer DiffTime
}
deriving stock ((forall x. Stats -> Rep Stats x)
-> (forall x. Rep Stats x -> Stats) -> Generic Stats
forall x. Rep Stats x -> Stats
forall x. Stats -> Rep Stats x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. Stats -> Rep Stats x
from :: forall x. Stats -> Rep Stats x
$cto :: forall x. Rep Stats x -> Stats
to :: forall x. Rep Stats x -> Stats
Generic, Int -> Stats -> ShowS
[Stats] -> ShowS
Stats -> String
(Int -> Stats -> ShowS)
-> (Stats -> String) -> ([Stats] -> ShowS) -> Show Stats
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> Stats -> ShowS
showsPrec :: Int -> Stats -> ShowS
$cshow :: Stats -> String
show :: Stats -> String
$cshowList :: [Stats] -> ShowS
showList :: [Stats] -> ShowS
Show, Stats -> Stats -> Bool
(Stats -> Stats -> Bool) -> (Stats -> Stats -> Bool) -> Eq Stats
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: Stats -> Stats -> Bool
== :: Stats -> Stats -> Bool
$c/= :: Stats -> Stats -> Bool
/= :: Stats -> Stats -> Bool
Eq)
deriving anyclass ([Stats] -> Value
[Stats] -> Encoding
Stats -> Bool
Stats -> Value
Stats -> Encoding
(Stats -> Value)
-> (Stats -> Encoding)
-> ([Stats] -> Value)
-> ([Stats] -> Encoding)
-> (Stats -> Bool)
-> ToJSON Stats
forall a.
(a -> Value)
-> (a -> Encoding)
-> ([a] -> Value)
-> ([a] -> Encoding)
-> (a -> Bool)
-> ToJSON a
$ctoJSON :: Stats -> Value
toJSON :: Stats -> Value
$ctoEncoding :: Stats -> Encoding
toEncoding :: Stats -> Encoding
$ctoJSONList :: [Stats] -> Value
toJSONList :: [Stats] -> Value
$ctoEncodingList :: [Stats] -> Encoding
toEncodingList :: [Stats] -> Encoding
$comitField :: Stats -> Bool
omitField :: Stats -> Bool
ToJSON)
instance Binary Stats where
get :: Get Stats
get =
Map Peer DiffTime -> Stats
Stats (Map Peer DiffTime -> Stats)
-> Get (Map Peer DiffTime) -> Get Stats
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ((Integer -> DiffTime) -> Map Peer Integer -> Map Peer DiffTime
forall a b. (a -> b) -> Map Peer a -> Map Peer b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Integer -> DiffTime
picosecondsToDiffTime (Map Peer Integer -> Map Peer DiffTime)
-> Get (Map Peer Integer) -> Get (Map Peer DiffTime)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Get (Map Peer Integer)
forall t. Binary t => Get t
Binary.get)
put :: Stats -> Put
put (Stats Map Peer DiffTime
timeWithoutProgress) =
Map Peer Integer -> Put
forall t. Binary t => t -> Put
Binary.put (DiffTime -> Integer
diffTimeToPicoseconds (DiffTime -> Integer) -> Map Peer DiffTime -> Map Peer Integer
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Map Peer DiffTime
timeWithoutProgress)
applyFast :: (MonadIO m)
=> Runtime e
-> e
-> m (Output e)
applyFast :: forall (m :: * -> *) e. MonadIO m => Runtime e -> e -> m (Output e)
applyFast Runtime e
runtime e
e = Runtime e
-> (Responder (Output e) -> Msg (Runtime e)) -> m (Output e)
forall actor (m :: * -> *) a.
(Actor actor, MonadIO m) =>
actor -> (Responder a -> Msg actor) -> m a
Fork.call Runtime e
runtime (e -> Responder (Output e) -> RuntimeMessage e
forall e. e -> Responder (Output e) -> RuntimeMessage e
ApplyFast e
e)
applyConsistent :: (MonadIO m)
=> Runtime e
-> e
-> m (Output e)
applyConsistent :: forall (m :: * -> *) e. MonadIO m => Runtime e -> e -> m (Output e)
applyConsistent Runtime e
runtime e
e = Runtime e
-> (Responder (Output e) -> Msg (Runtime e)) -> m (Output e)
forall actor (m :: * -> *) a.
(Actor actor, MonadIO m) =>
actor -> (Responder a -> Msg actor) -> m a
Fork.call Runtime e
runtime (e -> Responder (Output e) -> RuntimeMessage e
forall e. e -> Responder (Output e) -> RuntimeMessage e
ApplyConsistent e
e)
readState :: (MonadIO m)
=> Runtime e
-> m (EventFold ClusterName Peer e)
readState :: forall (m :: * -> *) e.
MonadIO m =>
Runtime e -> m (EventFold ClusterName Peer e)
readState Runtime e
runtime = Runtime e
-> (Responder (EventFold ClusterName Peer e) -> Msg (Runtime e))
-> m (EventFold ClusterName Peer e)
forall actor (m :: * -> *) a.
(Actor actor, MonadIO m) =>
actor -> (Responder a -> Msg actor) -> m a
Fork.call Runtime e
runtime Responder (EventFold ClusterName Peer e) -> Msg (Runtime e)
Responder (EventFold ClusterName Peer e) -> RuntimeMessage e
forall e.
Responder (EventFold ClusterName Peer e) -> RuntimeMessage e
ReadState
call :: (MonadIO m) => Runtime e -> Peer -> ByteString -> m ByteString
call :: forall (m :: * -> *) e.
MonadIO m =>
Runtime e -> Peer -> ByteString -> m ByteString
call Runtime e
runtime Peer
target ByteString
msg = Runtime e
-> (Responder ByteString -> Msg (Runtime e)) -> m ByteString
forall actor (m :: * -> *) a.
(Actor actor, MonadIO m) =>
actor -> (Responder a -> Msg actor) -> m a
Fork.call Runtime e
runtime (Peer -> ByteString -> Responder ByteString -> RuntimeMessage e
forall e.
Peer -> ByteString -> Responder ByteString -> RuntimeMessage e
Call Peer
target ByteString
msg)
sendCallResponse
:: (MonadIO m)
=> RChan e
-> Peer
-> MessageId
-> ByteString
-> m ()
sendCallResponse :: forall (m :: * -> *) e.
MonadIO m =>
RChan e -> Peer -> MessageId -> ByteString -> m ()
sendCallResponse RChan e
runtimeChan Peer
target MessageId
mid ByteString
msg =
RChan e -> Msg (RChan e) -> m ()
forall actor (m :: * -> *).
(Actor actor, MonadIO m) =>
actor -> Msg actor -> m ()
Fork.cast RChan e
runtimeChan (Peer -> MessageId -> ByteString -> RuntimeMessage e
forall e. Peer -> MessageId -> ByteString -> RuntimeMessage e
SendCallResponse Peer
target MessageId
mid ByteString
msg)
cast :: (MonadIO m) => Runtime e -> Peer -> ByteString -> m ()
cast :: forall (m :: * -> *) e.
MonadIO m =>
Runtime e -> Peer -> ByteString -> m ()
cast Runtime e
runtime Peer
target ByteString
message = Runtime e -> Msg (Runtime e) -> m ()
forall actor (m :: * -> *).
(Actor actor, MonadIO m) =>
actor -> Msg actor -> m ()
Fork.cast Runtime e
runtime (Peer -> ByteString -> RuntimeMessage e
forall e. Peer -> ByteString -> RuntimeMessage e
Cast Peer
target ByteString
message)
broadcall :: (MonadIO m)
=> Runtime e
-> DiffTime
-> ByteString
-> m (Map Peer (Maybe ByteString))
broadcall :: forall (m :: * -> *) e.
MonadIO m =>
Runtime e
-> DiffTime -> ByteString -> m (Map Peer (Maybe ByteString))
broadcall Runtime e
runtime DiffTime
timeout ByteString
msg = Runtime e
-> (Responder (Map Peer (Maybe ByteString)) -> Msg (Runtime e))
-> m (Map Peer (Maybe ByteString))
forall actor (m :: * -> *) a.
(Actor actor, MonadIO m) =>
actor -> (Responder a -> Msg actor) -> m a
Fork.call Runtime e
runtime (DiffTime
-> ByteString
-> Responder (Map Peer (Maybe ByteString))
-> RuntimeMessage e
forall e.
DiffTime
-> ByteString
-> Responder (Map Peer (Maybe ByteString))
-> RuntimeMessage e
Broadcall DiffTime
timeout ByteString
msg)
broadcast :: (MonadIO m) => Runtime e -> ByteString -> m ()
broadcast :: forall (m :: * -> *) e.
MonadIO m =>
Runtime e -> ByteString -> m ()
broadcast Runtime e
runtime ByteString
msg = Runtime e -> Msg (Runtime e) -> m ()
forall actor (m :: * -> *).
(Actor actor, MonadIO m) =>
actor -> Msg actor -> m ()
Fork.cast Runtime e
runtime (ByteString -> RuntimeMessage e
forall e. ByteString -> RuntimeMessage e
Broadcast ByteString
msg)
eject :: (MonadIO m) => Runtime e -> Peer -> m ()
eject :: forall (m :: * -> *) e. MonadIO m => Runtime e -> Peer -> m ()
eject Runtime e
runtime Peer
peer = Runtime e -> (Responder () -> Msg (Runtime e)) -> m ()
forall actor (m :: * -> *) a.
(Actor actor, MonadIO m) =>
actor -> (Responder a -> Msg actor) -> m a
Fork.call Runtime e
runtime (Peer -> Responder () -> RuntimeMessage e
forall e. Peer -> Responder () -> RuntimeMessage e
Eject Peer
peer)
getSelf :: Runtime e -> Peer
getSelf :: forall e. Runtime e -> Peer
getSelf = Runtime e -> Peer
forall e. Runtime e -> Peer
rSelf
executeRuntime
:: ( Binary (Output e)
, Binary (State e)
, Binary e
, Default (State e)
, Eq (Output e)
, Eq e
, Event Peer e
, MonadCatch m
, MonadFail m
, MonadLoggerIO m
, MonadTimeSpec m
, MonadUnliftIO m
, Race
, Show (Output e)
, Show (State e)
, Show e
, ToJSON (Output e)
, ToJSON (State e)
, ToJSON e
)
=> (ByteString -> IO ByteString)
-> (ByteString -> IO ())
-> Int
-> RuntimeState e
-> RChan e
-> IORef (Map Peer TimeSpec)
-> m ()
executeRuntime :: forall e (m :: * -> *).
(Binary (Output e), Binary (State e), Binary e, Default (State e),
Eq (Output e), Eq e, Event Peer e, MonadCatch m, MonadFail m,
MonadLoggerIO m, MonadTimeSpec m, MonadUnliftIO m, Race,
Show (Output e), Show (State e), Show e, ToJSON (Output e),
ToJSON (State e), ToJSON e) =>
(ByteString -> IO ByteString)
-> (ByteString -> IO ())
-> Int
-> RuntimeState e
-> RChan e
-> IORef (Map Peer TimeSpec)
-> m ()
executeRuntime
ByteString -> IO ByteString
handleUserCall
ByteString -> IO ()
handleUserCast
Int
resendInterval
RuntimeState e
rts
RChan e
runtimeChan
IORef (Map Peer TimeSpec)
peerStats
= do
ProcessName -> m () -> m ()
forall (m :: * -> *) a.
(MonadCatch m, MonadLogger m, MonadUnliftIO m, Race) =>
ProcessName -> m a -> m ()
race ProcessName
"om-legion peer listener" m ()
forall (m :: * -> *). (MonadLoggerIO m, MonadFail m) => m ()
runPeerListener
ProcessName -> m () -> m ()
forall (m :: * -> *) a.
(MonadCatch m, MonadLogger m, MonadUnliftIO m, Race) =>
ProcessName -> m a -> m ()
race ProcessName
"om-legion join listener" m ()
forall (m :: * -> *).
(MonadCatch m, MonadFail m, MonadLogger m, MonadUnliftIO m,
Race) =>
m ()
runJoinListener
ProcessName -> m () -> m ()
forall (m :: * -> *) a.
(MonadCatch m, MonadLogger m, MonadUnliftIO m, Race) =>
ProcessName -> m a -> m ()
race ProcessName
"om-legion periodic resend" m ()
forall (m :: * -> *). MonadIO m => m ()
runPeriodicResent
ProcessName -> m Any -> m ()
forall (m :: * -> *) a.
(MonadCatch m, MonadLogger m, MonadUnliftIO m, Race) =>
ProcessName -> m a -> m ()
race ProcessName
"om-legion message handler" (m Any -> m ()) -> m Any -> m ()
forall a b. (a -> b) -> a -> b
$
(StateT (RuntimeState e) m Any -> RuntimeState e -> m Any
forall (m :: * -> *) s a. Monad m => StateT s m a -> s -> m a
`evalStateT` RuntimeState e
rts)
(
let
handleMessages :: StateT (RuntimeState e) m Any
handleMessages = do
msg <- RChan e -> StateT (RuntimeState e) m (RuntimeMessage e)
forall (m :: * -> *) e.
MonadIO m =>
RChan e -> m (RuntimeMessage e)
readRChan RChan e
runtimeChan
RuntimeState {rsClusterState = cluster1} <- get
logDebug $ "Handling: " <> showt msg
handleRuntimeMessage msg
RuntimeState {rsClusterState = cluster2} <- get
when (cluster1 /= cluster2) $
logDebug $ "New Cluster State: " <> showj cluster2
handleBroadcallTimeouts
handleOutstandingJoins
handleMessages
in do
IO () -> StateT (RuntimeState e) m ()
forall a. IO a -> StateT (RuntimeState e) m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> StateT (RuntimeState e) m ())
-> IO () -> StateT (RuntimeState e) m ()
forall a b. (a -> b) -> a -> b
$ RuntimeState e -> EventFold ClusterName Peer e -> IO ()
forall e. RuntimeState e -> EventFold ClusterName Peer e -> IO ()
rsNotify RuntimeState e
rts (RuntimeState e -> EventFold ClusterName Peer e
forall e. RuntimeState e -> EventFold ClusterName Peer e
rsClusterState RuntimeState e
rts)
StateT (RuntimeState e) m Any
handleMessages
)
where
runPeerListener :: (MonadLoggerIO m, MonadFail m) => m ()
runPeerListener :: forall (m :: * -> *). (MonadLoggerIO m, MonadFail m) => m ()
runPeerListener =
let
addy :: AddressDescription
addy :: AddressDescription
addy =
Text -> AddressDescription
AddressDescription
(
Peer -> Text
unPeer (RuntimeState e -> Peer
forall e. RuntimeState e -> Peer
rsSelf RuntimeState e
rts)
Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
":" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> PortNumber -> Text
forall a b. (Show a, IsString b) => a -> b
showt PortNumber
peerMessagePort
)
in
AddressDescription -> Stream (Of (Peer, PeerMessage e)) m ()
forall i (m :: * -> *) never_returns.
(Binary i, MonadFail m, MonadIO m, Race) =>
AddressDescription -> Stream (Of i) m never_returns
openIngress AddressDescription
addy
Stream (Of (Peer, PeerMessage e)) m ()
-> ((Peer, PeerMessage e) -> Stream (Of (RuntimeMessage e)) m ())
-> Stream (Of (RuntimeMessage e)) m ()
forall (m :: * -> *) (f :: * -> *) a r x.
(Monad m, Functor f) =>
Stream (Of a) m r -> (a -> Stream f m x) -> Stream f m r
`Stream.for`
(\ (Peer
msgSource, PeerMessage e
msg) -> do
IO () -> Stream (Of (RuntimeMessage e)) m ()
forall a. IO a -> Stream (Of (RuntimeMessage e)) m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Stream (Of (RuntimeMessage e)) m ())
-> IO () -> Stream (Of (RuntimeMessage e)) m ()
forall a b. (a -> b) -> a -> b
$ do
now <- IO TimeSpec
forall (m :: * -> *). MonadTimeSpec m => m TimeSpec
getTime
atomicModifyIORef'
peerStats
(\Map Peer TimeSpec
peerTimes -> (Peer -> TimeSpec -> Map Peer TimeSpec -> Map Peer TimeSpec
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert Peer
msgSource TimeSpec
now Map Peer TimeSpec
peerTimes, ()))
m () -> Stream (Of (RuntimeMessage e)) m ()
forall (m :: * -> *) a.
Monad m =>
m a -> Stream (Of (RuntimeMessage e)) m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m () -> Stream (Of (RuntimeMessage e)) m ())
-> m () -> Stream (Of (RuntimeMessage e)) m ()
forall a b. (a -> b) -> a -> b
$ Text -> m ()
forall (m :: * -> *). (HasCallStack, MonadLogger m) => Text -> m ()
logDebug (Text -> m ()) -> Text -> m ()
forall a b. (a -> b) -> a -> b
$ Text
"Handling: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> (Peer, PeerMessage e) -> Text
forall a b. (Show a, IsString b) => a -> b
showt (Peer
msgSource :: Peer, PeerMessage e
msg)
case PeerMessage e
msg of
PMFullMerge EventFold ClusterName Peer e
ps -> RuntimeMessage e -> Stream (Of (RuntimeMessage e)) m ()
forall (m :: * -> *) a. Monad m => a -> Stream (Of a) m ()
Stream.yield (EventFold ClusterName Peer e -> RuntimeMessage e
forall e. EventFold ClusterName Peer e -> RuntimeMessage e
FullMerge EventFold ClusterName Peer e
ps)
PMOutputs Map (EventId Peer) (Output e)
outputs -> RuntimeMessage e -> Stream (Of (RuntimeMessage e)) m ()
forall (m :: * -> *) a. Monad m => a -> Stream (Of a) m ()
Stream.yield (Map (EventId Peer) (Output e) -> RuntimeMessage e
forall e. Map (EventId Peer) (Output e) -> RuntimeMessage e
Outputs Map (EventId Peer) (Output e)
outputs)
PMMerge Diff ClusterName Peer e
ps -> RuntimeMessage e -> Stream (Of (RuntimeMessage e)) m ()
forall (m :: * -> *) a. Monad m => a -> Stream (Of a) m ()
Stream.yield (Diff ClusterName Peer e -> RuntimeMessage e
forall e. Diff ClusterName Peer e -> RuntimeMessage e
Merge Diff ClusterName Peer e
ps)
PMCall Peer
source MessageId
mid ByteString
callMsg ->
(IO (Either SomeException ByteString)
-> Stream
(Of (RuntimeMessage e)) m (Either SomeException ByteString)
forall a. IO a -> Stream (Of (RuntimeMessage e)) m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either SomeException ByteString)
-> Stream
(Of (RuntimeMessage e)) m (Either SomeException ByteString))
-> (IO ByteString -> IO (Either SomeException ByteString))
-> IO ByteString
-> Stream
(Of (RuntimeMessage e)) m (Either SomeException ByteString)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO ByteString -> IO (Either SomeException ByteString)
forall (m :: * -> *) a.
(HasCallStack, MonadCatch m) =>
m a -> m (Either SomeException a)
tryAny) (ByteString -> IO ByteString
handleUserCall ByteString
callMsg) Stream (Of (RuntimeMessage e)) m (Either SomeException ByteString)
-> (Either SomeException ByteString
-> Stream (Of (RuntimeMessage e)) m ())
-> Stream (Of (RuntimeMessage e)) m ()
forall a b.
Stream (Of (RuntimeMessage e)) m a
-> (a -> Stream (Of (RuntimeMessage e)) m b)
-> Stream (Of (RuntimeMessage e)) m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Left SomeException
err ->
m () -> Stream (Of (RuntimeMessage e)) m ()
forall (m :: * -> *) a.
Monad m =>
m a -> Stream (Of (RuntimeMessage e)) m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m () -> Stream (Of (RuntimeMessage e)) m ())
-> (Text -> m ()) -> Text -> Stream (Of (RuntimeMessage e)) m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> m ()
forall (m :: * -> *). (HasCallStack, MonadLogger m) => Text -> m ()
logError
(Text -> Stream (Of (RuntimeMessage e)) m ())
-> Text -> Stream (Of (RuntimeMessage e)) m ()
forall a b. (a -> b) -> a -> b
$ Text
"User call handling failed with: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> SomeException -> Text
forall a b. (Show a, IsString b) => a -> b
showt SomeException
err
Right ByteString
v -> RChan e
-> Peer
-> MessageId
-> ByteString
-> Stream (Of (RuntimeMessage e)) m ()
forall (m :: * -> *) e.
MonadIO m =>
RChan e -> Peer -> MessageId -> ByteString -> m ()
sendCallResponse RChan e
runtimeChan Peer
source MessageId
mid ByteString
v
PMCast ByteString
castMsg -> IO () -> Stream (Of (RuntimeMessage e)) m ()
forall a. IO a -> Stream (Of (RuntimeMessage e)) m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (ByteString -> IO ()
handleUserCast ByteString
castMsg)
PMCallResponse Peer
source MessageId
mid ByteString
responseMsg ->
RuntimeMessage e -> Stream (Of (RuntimeMessage e)) m ()
forall (m :: * -> *) a. Monad m => a -> Stream (Of a) m ()
Stream.yield (Peer -> MessageId -> ByteString -> RuntimeMessage e
forall e. Peer -> MessageId -> ByteString -> RuntimeMessage e
HandleCallResponse Peer
source MessageId
mid ByteString
responseMsg)
)
Stream (Of (RuntimeMessage e)) m ()
-> (Stream (Of (RuntimeMessage e)) m () -> m ()) -> m ()
forall a b. a -> (a -> b) -> b
& (RuntimeMessage e -> m ())
-> Stream (Of (RuntimeMessage e)) m () -> m ()
forall (m :: * -> *) a x r.
Monad m =>
(a -> m x) -> Stream (Of a) m r -> m r
Stream.mapM_ (RChan e -> Msg (RChan e) -> m ()
forall actor (m :: * -> *).
(Actor actor, MonadIO m) =>
actor -> Msg actor -> m ()
Fork.cast RChan e
runtimeChan)
runJoinListener
:: ( MonadCatch m
, MonadFail m
, MonadLogger m
, MonadUnliftIO m
, Race
)
=> m ()
runJoinListener :: forall (m :: * -> *).
(MonadCatch m, MonadFail m, MonadLogger m, MonadUnliftIO m,
Race) =>
m ()
runJoinListener =
let
addy :: AddressDescription
addy :: AddressDescription
addy =
Text -> AddressDescription
AddressDescription
(
Peer -> Text
unPeer (RuntimeState e -> Peer
forall e. RuntimeState e -> Peer
rsSelf RuntimeState e
rts)
Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
":" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> PortNumber -> Text
forall a b. (Show a, IsString b) => a -> b
showt PortNumber
joinMessagePort
)
in
AddressDescription
-> Maybe (IO ServerParams)
-> Stream (Of (JoinRequest, JoinResponse e -> m Responded)) m ()
forall request response (m :: * -> *) never_returns.
(Binary request, Binary response, MonadLogger m, MonadCatch m,
MonadFail m, MonadUnliftIO m, Race) =>
AddressDescription
-> Maybe (IO ServerParams)
-> Stream (Of (request, response -> m Responded)) m never_returns
openServer AddressDescription
addy Maybe (IO ServerParams)
forall a. Maybe a
Nothing
Stream (Of (JoinRequest, JoinResponse e -> m Responded)) m ()
-> (Stream (Of (JoinRequest, JoinResponse e -> m Responded)) m ()
-> m ())
-> m ()
forall a b. a -> (a -> b) -> b
& ((JoinRequest, JoinResponse e -> m Responded) -> m Responded)
-> Stream (Of (JoinRequest, JoinResponse e -> m Responded)) m ()
-> m ()
forall (m :: * -> *) a x r.
Monad m =>
(a -> m x) -> Stream (Of a) m r -> m r
Stream.mapM_
(\(JoinRequest
req, JoinResponse e -> m Responded
respond_) ->
RChan e
-> (Responder (JoinResponse e) -> Msg (RChan e))
-> m (JoinResponse e)
forall actor (m :: * -> *) a.
(Actor actor, MonadIO m) =>
actor -> (Responder a -> Msg actor) -> m a
Fork.call RChan e
runtimeChan (JoinRequest -> Responder (JoinResponse e) -> RuntimeMessage e
forall e.
JoinRequest -> Responder (JoinResponse e) -> RuntimeMessage e
Join JoinRequest
req) m (JoinResponse e)
-> (JoinResponse e -> m Responded) -> m Responded
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= JoinResponse e -> m Responded
respond_
)
runPeriodicResent :: (MonadIO m) => m ()
runPeriodicResent :: forall (m :: * -> *). MonadIO m => m ()
runPeriodicResent =
let
periodicResend :: (MonadIO m) => m ()
periodicResend :: forall (m :: * -> *). MonadIO m => m ()
periodicResend = do
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Int -> IO ()
threadDelay Int
resendInterval
RChan e -> (Responder () -> Msg (RChan e)) -> m ()
forall actor (m :: * -> *) a.
(Actor actor, MonadIO m) =>
actor -> (Responder a -> Msg actor) -> m a
Fork.call RChan e
runtimeChan Responder () -> Msg (RChan e)
Responder () -> RuntimeMessage e
forall e. Responder () -> RuntimeMessage e
Resend
m ()
forall (m :: * -> *). MonadIO m => m ()
periodicResend
in
m ()
forall (m :: * -> *). MonadIO m => m ()
periodicResend
handleOutstandingJoins :: (MonadLoggerIO m) => StateT (RuntimeState e) m ()
handleOutstandingJoins :: forall (m :: * -> *) e.
MonadLoggerIO m =>
StateT (RuntimeState e) m ()
handleOutstandingJoins = do
state@RuntimeState {rsJoins, rsClusterState} <- StateT (RuntimeState e) m (RuntimeState e)
forall s (m :: * -> *). MonadState s m => m s
get
let
(consistent, pending) =
Map.partitionWithKey
(\EventId Peer
k Responder (JoinResponse e)
_ -> EventId Peer
k EventId Peer -> EventId Peer -> Bool
forall a. Ord a => a -> a -> Bool
<= EventFold ClusterName Peer e -> EventId Peer
forall o p e. EventFold o p e -> EventId p
infimumId EventFold ClusterName Peer e
rsClusterState)
rsJoins
put state {rsJoins = pending}
sequence_ [
do
logInfo $ "Completing join (" <> showt sid <> ")."
respond responder (JoinOk rsClusterState)
| (sid, responder) <- Map.toList consistent
]
handleBroadcallTimeouts
:: ( MonadIO m
, MonadTimeSpec m
)
=> StateT (RuntimeState e) m ()
handleBroadcallTimeouts :: forall (m :: * -> *) e.
(MonadIO m, MonadTimeSpec m) =>
StateT (RuntimeState e) m ()
handleBroadcallTimeouts = do
broadcalls <- (RuntimeState e
-> Map
MessageId
(Map Peer (Maybe ByteString),
Responder (Map Peer (Maybe ByteString)), TimeSpec))
-> StateT
(RuntimeState e)
m
(Map
MessageId
(Map Peer (Maybe ByteString),
Responder (Map Peer (Maybe ByteString)), TimeSpec))
forall s (m :: * -> *) a. MonadState s m => (s -> a) -> m a
gets RuntimeState e
-> Map
MessageId
(Map Peer (Maybe ByteString),
Responder (Map Peer (Maybe ByteString)), TimeSpec)
forall e.
RuntimeState e
-> Map
MessageId
(Map Peer (Maybe ByteString),
Responder (Map Peer (Maybe ByteString)), TimeSpec)
rsBroadcalls
now <- getTime
sequence_ [
do
respond responder responses
modify' (\RuntimeState e
rs -> RuntimeState e
rs {
rsBroadcalls = Map.delete messageId (rsBroadcalls rs)
})
| (messageId, (responses, responder, expiry)) <- Map.toList broadcalls
, now >= expiry
]
handleRuntimeMessage
:: ( Binary (Output e)
, Binary (State e)
, Binary e
, Default (State e)
, Eq (Output e)
, Eq e
, Event Peer e
, MonadLoggerIO m
, MonadTimeSpec m
, Show (Output e)
, Show (State e)
, Show e
, ToJSON (Output e)
, ToJSON (State e)
, ToJSON e
)
=> RuntimeMessage e
-> StateT (RuntimeState e) m ()
handleRuntimeMessage :: forall e (m :: * -> *).
(Binary (Output e), Binary (State e), Binary e, Default (State e),
Eq (Output e), Eq e, Event Peer e, MonadLoggerIO m,
MonadTimeSpec m, Show (Output e), Show (State e), Show e,
ToJSON (Output e), ToJSON (State e), ToJSON e) =>
RuntimeMessage e -> StateT (RuntimeState e) m ()
handleRuntimeMessage (Outputs Map (EventId Peer) (Output e)
outputs) =
{-# SCC "Outputs" #-}
Map (EventId Peer) (Output e) -> StateT (RuntimeState e) m ()
forall (m :: * -> *) e.
(MonadLoggerIO m, MonadState (RuntimeState e) m,
Show (Output e)) =>
Map (EventId Peer) (Output e) -> m ()
respondToWaiting Map (EventId Peer) (Output e)
outputs
handleRuntimeMessage (GetStats Responder (EventFold ClusterName Peer e)
responder) =
{-# SCC "GetStats" #-}
Responder (EventFold ClusterName Peer e)
-> EventFold ClusterName Peer e -> StateT (RuntimeState e) m ()
forall (m :: * -> *) a. MonadIO m => Responder a -> a -> m ()
respond Responder (EventFold ClusterName Peer e)
responder (EventFold ClusterName Peer e -> StateT (RuntimeState e) m ())
-> StateT (RuntimeState e) m (EventFold ClusterName Peer e)
-> StateT (RuntimeState e) m ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< (RuntimeState e -> EventFold ClusterName Peer e)
-> StateT (RuntimeState e) m (EventFold ClusterName Peer e)
forall s (m :: * -> *) a. MonadState s m => (s -> a) -> m a
gets RuntimeState e -> EventFold ClusterName Peer e
forall e. RuntimeState e -> EventFold ClusterName Peer e
rsClusterState
handleRuntimeMessage (ApplyFast e
e Responder (Output e)
responder) =
{-# SCC "ApplyFast" #-}
EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
-> StateT (RuntimeState e) m ()
forall e (m :: * -> *) a.
(EventConstraints e, MonadLoggerIO m,
MonadState (RuntimeState e) m) =>
EventFoldT ClusterName Peer e m a -> m a
updateCluster (EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
-> StateT (RuntimeState e) m ())
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
-> StateT (RuntimeState e) m ()
forall a b. (a -> b) -> a -> b
$
(Output e, EventId Peer) -> Output e
forall a b. (a, b) -> a
fst ((Output e, EventId Peer) -> Output e)
-> EventFoldT
ClusterName
Peer
e
(StateT (RuntimeState e) m)
(Output e, EventId Peer)
-> EventFoldT
ClusterName Peer e (StateT (RuntimeState e) m) (Output e)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> e
-> EventFoldT
ClusterName
Peer
e
(StateT (RuntimeState e) m)
(Output e, EventId Peer)
forall o p e (m :: * -> *).
MonadUpdateEF o p e m =>
e -> m (Output e, EventId p)
event e
e EventFoldT
ClusterName Peer e (StateT (RuntimeState e) m) (Output e)
-> (Output e
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ())
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
forall a b.
EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) a
-> (a
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) b)
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Responder (Output e)
-> Output e
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
forall (m :: * -> *) a. MonadIO m => Responder a -> a -> m ()
respond Responder (Output e)
responder
handleRuntimeMessage (ApplyConsistent e
e Responder (Output e)
responder) =
{-# SCC "ApplyConsistent" #-}
(
do
EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
-> StateT (RuntimeState e) m ()
forall e (m :: * -> *) a.
(EventConstraints e, MonadLoggerIO m,
MonadState (RuntimeState e) m) =>
EventFoldT ClusterName Peer e m a -> m a
updateCluster (EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
-> StateT (RuntimeState e) m ())
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
-> StateT (RuntimeState e) m ()
forall a b. (a -> b) -> a -> b
$ do
(_v, sid) <- e
-> EventFoldT
ClusterName
Peer
e
(StateT (RuntimeState e) m)
(Output e, EventId Peer)
forall o p e (m :: * -> *).
MonadUpdateEF o p e m =>
e -> m (Output e, EventId p)
event e
e
lift (waitOn sid responder)
rs <- StateT (RuntimeState e) m (RuntimeState e)
forall s (m :: * -> *). MonadState s m => m s
get
logDebug $ "Waiting: " <> showt (rsWaiting rs)
)
handleRuntimeMessage (Eject Peer
peer Responder ()
responder) =
{-# SCC "Eject" #-}
do
Peer
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
-> StateT (RuntimeState e) m ()
forall e (m :: * -> *) a.
(EventConstraints e, MonadLoggerIO m,
MonadState (RuntimeState e) m) =>
Peer -> EventFoldT ClusterName Peer e m a -> m a
updateClusterAs Peer
peer (EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
-> StateT (RuntimeState e) m ())
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
-> StateT (RuntimeState e) m ()
forall a b. (a -> b) -> a -> b
$
EventFoldT
ClusterName Peer e (StateT (RuntimeState e) m) (EventId Peer)
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (EventFoldT
ClusterName Peer e (StateT (RuntimeState e) m) (EventId Peer)
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ())
-> EventFoldT
ClusterName Peer e (StateT (RuntimeState e) m) (EventId Peer)
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
forall a b. (a -> b) -> a -> b
$ Peer
-> EventFoldT
ClusterName Peer e (StateT (RuntimeState e) m) (EventId Peer)
forall o p e (m :: * -> *).
MonadUpdateEF o p e m =>
p -> m (EventId p)
disassociate Peer
peer
StateT (RuntimeState e) m ()
forall e (m :: * -> *).
(EventConstraints e, MonadLoggerIO m,
MonadState (RuntimeState e) m) =>
m ()
propagate
IO () -> StateT (RuntimeState e) m ()
forall a. IO a -> StateT (RuntimeState e) m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> StateT (RuntimeState e) m ())
-> IO () -> StateT (RuntimeState e) m ()
forall a b. (a -> b) -> a -> b
$ Int -> IO ()
threadDelay Int
500_000
Responder () -> () -> StateT (RuntimeState e) m ()
forall (m :: * -> *) a. MonadIO m => Responder a -> a -> m ()
respond Responder ()
responder ()
handleRuntimeMessage (Merge Diff ClusterName Peer e
other) =
{-# SCC "Merge" #-}
EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
-> StateT (RuntimeState e) m ()
forall e (m :: * -> *) a.
(EventConstraints e, MonadLoggerIO m,
MonadState (RuntimeState e) m) =>
EventFoldT ClusterName Peer e m a -> m a
updateCluster (EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
-> StateT (RuntimeState e) m ())
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
-> StateT (RuntimeState e) m ()
forall a b. (a -> b) -> a -> b
$
Diff ClusterName Peer e
-> EventFoldT
ClusterName
Peer
e
(StateT (RuntimeState e) m)
(Either (MergeError ClusterName Peer e) ())
forall o p e (m :: * -> *).
MonadUpdateEF o p e m =>
Diff o p e -> m (Either (MergeError o p e) ())
diffMerge Diff ClusterName Peer e
other EventFoldT
ClusterName
Peer
e
(StateT (RuntimeState e) m)
(Either (MergeError ClusterName Peer e) ())
-> (Either (MergeError ClusterName Peer e) ()
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ())
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
forall a b.
EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) a
-> (a
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) b)
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Left MergeError ClusterName Peer e
err -> Text
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
forall (m :: * -> *). (HasCallStack, MonadLogger m) => Text -> m ()
logError (Text
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ())
-> Text
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
forall a b. (a -> b) -> a -> b
$ Text
"Bad cluster merge: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> MergeError ClusterName Peer e -> Text
forall a b. (Show a, IsString b) => a -> b
showt MergeError ClusterName Peer e
err
Right () -> () -> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
forall a.
a -> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
handleRuntimeMessage (FullMerge EventFold ClusterName Peer e
other) =
{-# SCC "FullMerge" #-}
EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
-> StateT (RuntimeState e) m ()
forall e (m :: * -> *) a.
(EventConstraints e, MonadLoggerIO m,
MonadState (RuntimeState e) m) =>
EventFoldT ClusterName Peer e m a -> m a
updateCluster (EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
-> StateT (RuntimeState e) m ())
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
-> StateT (RuntimeState e) m ()
forall a b. (a -> b) -> a -> b
$
EventFold ClusterName Peer e
-> EventFoldT
ClusterName
Peer
e
(StateT (RuntimeState e) m)
(Either (MergeError ClusterName Peer e) ())
forall o p e (m :: * -> *).
MonadUpdateEF o p e m =>
EventFold o p e -> m (Either (MergeError o p e) ())
fullMerge EventFold ClusterName Peer e
other EventFoldT
ClusterName
Peer
e
(StateT (RuntimeState e) m)
(Either (MergeError ClusterName Peer e) ())
-> (Either (MergeError ClusterName Peer e) ()
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ())
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
forall a b.
EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) a
-> (a
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) b)
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Left MergeError ClusterName Peer e
err -> Text
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
forall (m :: * -> *). (HasCallStack, MonadLogger m) => Text -> m ()
logError (Text
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ())
-> Text
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
forall a b. (a -> b) -> a -> b
$ Text
"Bad cluster merge: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> MergeError ClusterName Peer e -> Text
forall a b. (Show a, IsString b) => a -> b
showt MergeError ClusterName Peer e
err
Right () -> () -> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
forall a.
a -> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
handleRuntimeMessage (Join (JoinRequest Peer
peer) Responder (JoinResponse e)
responder) =
{-# SCC "Join" #-}
do
Text -> StateT (RuntimeState e) m ()
forall (m :: * -> *). (HasCallStack, MonadLogger m) => Text -> m ()
logInfo (Text -> StateT (RuntimeState e) m ())
-> Text -> StateT (RuntimeState e) m ()
forall a b. (a -> b) -> a -> b
$ Text
"Handling join from peer: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Peer -> Text
forall a b. (Show a, IsString b) => a -> b
showt Peer
peer
EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
-> StateT (RuntimeState e) m ()
forall e (m :: * -> *) a.
(EventConstraints e, MonadLoggerIO m,
MonadState (RuntimeState e) m) =>
EventFoldT ClusterName Peer e m a -> m a
updateCluster (do
EventFoldT
ClusterName Peer e (StateT (RuntimeState e) m) (EventId Peer)
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (EventFoldT
ClusterName Peer e (StateT (RuntimeState e) m) (EventId Peer)
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ())
-> EventFoldT
ClusterName Peer e (StateT (RuntimeState e) m) (EventId Peer)
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
forall a b. (a -> b) -> a -> b
$ Peer
-> EventFoldT
ClusterName Peer e (StateT (RuntimeState e) m) (EventId Peer)
forall o p e (m :: * -> *).
MonadUpdateEF o p e m =>
p -> m (EventId p)
disassociate Peer
peer
EventFoldT
ClusterName Peer e (StateT (RuntimeState e) m) (EventId Peer)
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (EventFoldT
ClusterName Peer e (StateT (RuntimeState e) m) (EventId Peer)
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ())
-> EventFoldT
ClusterName Peer e (StateT (RuntimeState e) m) (EventId Peer)
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
forall a b. (a -> b) -> a -> b
$ Peer
-> EventFoldT
ClusterName Peer e (StateT (RuntimeState e) m) (EventId Peer)
forall o p e (m :: * -> *).
MonadUpdateEF o p e m =>
p -> m (EventId p)
participate Peer
peer
)
RuntimeState {rsClusterState} <- StateT (RuntimeState e) m (RuntimeState e)
forall s (m :: * -> *). MonadState s m => m s
get
logInfo $ "Join immediately with: " <> showt rsClusterState
respond responder (JoinOk rsClusterState)
handleRuntimeMessage (ReadState Responder (EventFold ClusterName Peer e)
responder) =
{-# SCC "ReadState" #-}
Responder (EventFold ClusterName Peer e)
-> EventFold ClusterName Peer e -> StateT (RuntimeState e) m ()
forall (m :: * -> *) a. MonadIO m => Responder a -> a -> m ()
respond Responder (EventFold ClusterName Peer e)
responder (EventFold ClusterName Peer e -> StateT (RuntimeState e) m ())
-> (RuntimeState e -> EventFold ClusterName Peer e)
-> RuntimeState e
-> StateT (RuntimeState e) m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. RuntimeState e -> EventFold ClusterName Peer e
forall e. RuntimeState e -> EventFold ClusterName Peer e
rsClusterState (RuntimeState e -> StateT (RuntimeState e) m ())
-> StateT (RuntimeState e) m (RuntimeState e)
-> StateT (RuntimeState e) m ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< StateT (RuntimeState e) m (RuntimeState e)
forall s (m :: * -> *). MonadState s m => m s
get
handleRuntimeMessage (Call Peer
target ByteString
msg Responder ByteString
responder) =
{-# SCC "Call" #-}
do
mid <- StateT (RuntimeState e) m MessageId
forall (m :: * -> *) e.
Monad m =>
StateT (RuntimeState e) m MessageId
newMessageId
source <- gets rsSelf
setCallResponder mid
sendPeer (PMCall source mid msg) target
where
setCallResponder :: (Monad m)
=> MessageId
-> StateT (RuntimeState e) m ()
setCallResponder :: forall (m :: * -> *) e.
Monad m =>
MessageId -> StateT (RuntimeState e) m ()
setCallResponder MessageId
mid = do
state@RuntimeState {rsCalls} <- StateT (RuntimeState e) m (RuntimeState e)
forall s (m :: * -> *). MonadState s m => m s
get
put state {
rsCalls = Map.insert mid responder rsCalls
}
handleRuntimeMessage (Cast Peer
target ByteString
msg) =
{-# SCC "Cast" #-}
PeerMessage e -> Peer -> StateT (RuntimeState e) m ()
forall (m :: * -> *) e.
(EventConstraints e, MonadLoggerIO m,
MonadState (RuntimeState e) m) =>
PeerMessage e -> Peer -> m ()
sendPeer (ByteString -> PeerMessage e
forall e. ByteString -> PeerMessage e
PMCast ByteString
msg) Peer
target
handleRuntimeMessage (Broadcall DiffTime
timeout ByteString
msg Responder (Map Peer (Maybe ByteString))
responder) =
{-# SCC "Broadcall" #-}
do
expiry <- DiffTime -> TimeSpec -> TimeSpec
addTime DiffTime
timeout (TimeSpec -> TimeSpec)
-> StateT (RuntimeState e) m TimeSpec
-> StateT (RuntimeState e) m TimeSpec
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> StateT (RuntimeState e) m TimeSpec
forall (m :: * -> *). MonadTimeSpec m => m TimeSpec
getTime
mid <- newMessageId
source <- gets rsSelf
setBroadcallResponder expiry mid
mapM_ (sendPeer (PMCall source mid msg)) =<< getPeers
where
setBroadcallResponder :: (Monad m)
=> TimeSpec
-> MessageId
-> StateT (RuntimeState e) m ()
setBroadcallResponder :: forall (m :: * -> *) e.
Monad m =>
TimeSpec -> MessageId -> StateT (RuntimeState e) m ()
setBroadcallResponder TimeSpec
expiry MessageId
mid = do
peers <- StateT (RuntimeState e) m (Set Peer)
forall (m :: * -> *) e.
Monad m =>
StateT (RuntimeState e) m (Set Peer)
getPeers
state@RuntimeState {rsBroadcalls} <- get
put state {
rsBroadcalls =
Map.insert
mid
(
Map.fromList [(peer, Nothing) | peer <- Set.toList peers],
responder,
expiry
)
rsBroadcalls
}
handleRuntimeMessage (Broadcast ByteString
msg) =
{-# SCC "Broadcast" #-}
(Peer -> StateT (RuntimeState e) m ())
-> Set Peer -> StateT (RuntimeState e) m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (PeerMessage e -> Peer -> StateT (RuntimeState e) m ()
forall (m :: * -> *) e.
(EventConstraints e, MonadLoggerIO m,
MonadState (RuntimeState e) m) =>
PeerMessage e -> Peer -> m ()
sendPeer (ByteString -> PeerMessage e
forall e. ByteString -> PeerMessage e
PMCast ByteString
msg)) (Set Peer -> StateT (RuntimeState e) m ())
-> StateT (RuntimeState e) m (Set Peer)
-> StateT (RuntimeState e) m ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< StateT (RuntimeState e) m (Set Peer)
forall (m :: * -> *) e.
Monad m =>
StateT (RuntimeState e) m (Set Peer)
getPeers
handleRuntimeMessage (SendCallResponse Peer
target MessageId
mid ByteString
msg) =
{-# SCC "SendCallResponse" #-}
do
source <- (RuntimeState e -> Peer) -> StateT (RuntimeState e) m Peer
forall s (m :: * -> *) a. MonadState s m => (s -> a) -> m a
gets RuntimeState e -> Peer
forall e. RuntimeState e -> Peer
rsSelf
sendPeer (PMCallResponse source mid msg) target
handleRuntimeMessage (HandleCallResponse Peer
source MessageId
mid ByteString
msg) =
{-# SCC "HandleCallResponse" #-}
do
state@RuntimeState {rsCalls, rsBroadcalls} <- StateT (RuntimeState e) m (RuntimeState e)
forall s (m :: * -> *). MonadState s m => m s
get
case Map.lookup mid rsCalls of
Maybe (Responder ByteString)
Nothing ->
case MessageId
-> Map
MessageId
(Map Peer (Maybe ByteString),
Responder (Map Peer (Maybe ByteString)), TimeSpec)
-> Maybe
(Map Peer (Maybe ByteString),
Responder (Map Peer (Maybe ByteString)), TimeSpec)
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup MessageId
mid Map
MessageId
(Map Peer (Maybe ByteString),
Responder (Map Peer (Maybe ByteString)), TimeSpec)
rsBroadcalls of
Maybe
(Map Peer (Maybe ByteString),
Responder (Map Peer (Maybe ByteString)), TimeSpec)
Nothing -> () -> StateT (RuntimeState e) m ()
forall a. a -> StateT (RuntimeState e) m a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Just (Map Peer (Maybe ByteString)
responses, Responder (Map Peer (Maybe ByteString))
responder, TimeSpec
expiry) ->
let
responses2 :: Map Peer (Maybe ByteString)
responses2 = Peer
-> Maybe ByteString
-> Map Peer (Maybe ByteString)
-> Map Peer (Maybe ByteString)
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert Peer
source (ByteString -> Maybe ByteString
forall a. a -> Maybe a
Just ByteString
msg) Map Peer (Maybe ByteString)
responses
response :: Map Peer ByteString
response = [(Peer, ByteString)] -> Map Peer ByteString
forall k a. Ord k => [(k, a)] -> Map k a
Map.fromList [
(Peer
peer, ByteString
r)
| (Peer
peer, Just ByteString
r) <- Map Peer (Maybe ByteString) -> [(Peer, Maybe ByteString)]
forall k a. Map k a -> [(k, a)]
Map.toList Map Peer (Maybe ByteString)
responses2
]
peers :: Set Peer
peers = Map Peer (Maybe ByteString) -> Set Peer
forall k a. Map k a -> Set k
Map.keysSet Map Peer (Maybe ByteString)
responses2
in
if Set Peer -> Bool
forall a. Set a -> Bool
Set.null (Set Peer
peers Set Peer -> Set Peer -> Set Peer
forall a. Ord a => Set a -> Set a -> Set a
\\ Map Peer ByteString -> Set Peer
forall k a. Map k a -> Set k
Map.keysSet Map Peer ByteString
response)
then do
Responder (Map Peer (Maybe ByteString))
-> Map Peer (Maybe ByteString) -> StateT (RuntimeState e) m ()
forall (m :: * -> *) a. MonadIO m => Responder a -> a -> m ()
respond Responder (Map Peer (Maybe ByteString))
responder (ByteString -> Maybe ByteString
forall a. a -> Maybe a
Just (ByteString -> Maybe ByteString)
-> Map Peer ByteString -> Map Peer (Maybe ByteString)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Map Peer ByteString
response)
RuntimeState e -> StateT (RuntimeState e) m ()
forall s (m :: * -> *). MonadState s m => s -> m ()
put RuntimeState e
state {
rsBroadcalls = Map.delete mid rsBroadcalls
}
else
RuntimeState e -> StateT (RuntimeState e) m ()
forall s (m :: * -> *). MonadState s m => s -> m ()
put RuntimeState e
state {
rsBroadcalls =
Map.insert
mid
(responses2, responder, expiry)
rsBroadcalls
}
Just Responder ByteString
responder -> do
Responder ByteString -> ByteString -> StateT (RuntimeState e) m ()
forall (m :: * -> *) a. MonadIO m => Responder a -> a -> m ()
respond Responder ByteString
responder ByteString
msg
RuntimeState e -> StateT (RuntimeState e) m ()
forall s (m :: * -> *). MonadState s m => s -> m ()
put RuntimeState e
state {rsCalls = Map.delete mid rsCalls}
handleRuntimeMessage (Resend Responder ()
responder) =
{-# SCC "Resend" #-}
StateT (RuntimeState e) m ()
forall e (m :: * -> *).
(EventConstraints e, MonadLoggerIO m,
MonadState (RuntimeState e) m) =>
m ()
propagate StateT (RuntimeState e) m ()
-> (() -> StateT (RuntimeState e) m ())
-> StateT (RuntimeState e) m ()
forall a b.
StateT (RuntimeState e) m a
-> (a -> StateT (RuntimeState e) m b)
-> StateT (RuntimeState e) m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Responder () -> () -> StateT (RuntimeState e) m ()
forall (m :: * -> *) a. MonadIO m => Responder a -> a -> m ()
respond Responder ()
responder
getPeers :: (Monad m) => StateT (RuntimeState e) m (Set Peer)
getPeers :: forall (m :: * -> *) e.
Monad m =>
StateT (RuntimeState e) m (Set Peer)
getPeers = (RuntimeState e -> Set Peer)
-> StateT (RuntimeState e) m (Set Peer)
forall s (m :: * -> *) a. MonadState s m => (s -> a) -> m a
gets (EventFold ClusterName Peer e -> Set Peer
forall p o e. Ord p => EventFold o p e -> Set p
projParticipants (EventFold ClusterName Peer e -> Set Peer)
-> (RuntimeState e -> EventFold ClusterName Peer e)
-> RuntimeState e
-> Set Peer
forall b c a. (b -> c) -> (a -> b) -> a -> c
. RuntimeState e -> EventFold ClusterName Peer e
forall e. RuntimeState e -> EventFold ClusterName Peer e
rsClusterState)
newMessageId :: (Monad m) => StateT (RuntimeState e) m MessageId
newMessageId :: forall (m :: * -> *) e.
Monad m =>
StateT (RuntimeState e) m MessageId
newMessageId = do
state@RuntimeState {rsNextId} <- StateT (RuntimeState e) m (RuntimeState e)
forall s (m :: * -> *). MonadState s m => m s
get
put state {rsNextId = nextMessageId rsNextId}
return rsNextId
updateCluster
:: ( EventConstraints e
, MonadLoggerIO m
, MonadState (RuntimeState e) m
)
=> EventFoldT ClusterName Peer e m a
-> m a
updateCluster :: forall e (m :: * -> *) a.
(EventConstraints e, MonadLoggerIO m,
MonadState (RuntimeState e) m) =>
EventFoldT ClusterName Peer e m a -> m a
updateCluster EventFoldT ClusterName Peer e m a
action = do
RuntimeState {rsSelf} <- m (RuntimeState e)
forall s (m :: * -> *). MonadState s m => m s
get
updateClusterAs rsSelf action
updateClusterAs
:: forall e m a.
( EventConstraints e
, MonadLoggerIO m
, MonadState (RuntimeState e) m
)
=> Peer
-> EventFoldT ClusterName Peer e m a
-> m a
updateClusterAs :: forall e (m :: * -> *) a.
(EventConstraints e, MonadLoggerIO m,
MonadState (RuntimeState e) m) =>
Peer -> EventFoldT ClusterName Peer e m a -> m a
updateClusterAs Peer
asPeer EventFoldT ClusterName Peer e m a
action = do
(oldCluster, notify)
<- (RuntimeState e
-> (EventFold ClusterName Peer e,
EventFold ClusterName Peer e -> IO ()))
-> m (EventFold ClusterName Peer e,
EventFold ClusterName Peer e -> IO ())
forall s (m :: * -> *) a. MonadState s m => (s -> a) -> m a
gets (RuntimeState e -> EventFold ClusterName Peer e
forall e. RuntimeState e -> EventFold ClusterName Peer e
rsClusterState (RuntimeState e -> EventFold ClusterName Peer e)
-> (RuntimeState e -> EventFold ClusterName Peer e -> IO ())
-> RuntimeState e
-> (EventFold ClusterName Peer e,
EventFold ClusterName Peer e -> IO ())
forall b c c'. (b -> c) -> (b -> c') -> b -> (c, c')
forall (a :: * -> * -> *) b c c'.
Arrow a =>
a b c -> a b c' -> a b (c, c')
&&& RuntimeState e -> EventFold ClusterName Peer e -> IO ()
forall e. RuntimeState e -> EventFold ClusterName Peer e -> IO ()
rsNotify)
(v, ur) <- runEventFoldT asPeer oldCluster action
do
let
newCluster :: EventFold ClusterName Peer e
newCluster = UpdateResult ClusterName Peer e -> EventFold ClusterName Peer e
forall o p e. UpdateResult o p e -> EventFold o p e
urEventFold UpdateResult ClusterName Peer e
ur
when (oldCluster /= newCluster) $ liftIO (notify newCluster)
modify'
(
let
doModify RuntimeState e
state =
EventFold ClusterName Peer e
newCluster EventFold ClusterName Peer e -> RuntimeState e -> RuntimeState e
forall a b. a -> b -> b
`seq`
RuntimeState e
state
{ rsClusterState = newCluster
}
in
doModify
)
do
self <- gets rsSelf
let
outputs :: Map (EventId Peer) (Output e)
outputs = UpdateResult ClusterName Peer e -> Map (EventId Peer) (Output e)
forall o p e. UpdateResult o p e -> Map (EventId p) (Output e)
urOutputs UpdateResult ClusterName Peer e
ur
byRemotePeer :: Map Peer (Map (EventId Peer) (Output e))
byRemotePeer =
(Map (EventId Peer) (Output e)
-> Map (EventId Peer) (Output e) -> Map (EventId Peer) (Output e))
-> [Map Peer (Map (EventId Peer) (Output e))]
-> Map Peer (Map (EventId Peer) (Output e))
forall (f :: * -> *) k a.
(Foldable f, Ord k) =>
(a -> a -> a) -> f (Map k a) -> Map k a
Map.unionsWith
Map (EventId Peer) (Output e)
-> Map (EventId Peer) (Output e) -> Map (EventId Peer) (Output e)
forall a. Semigroup a => a -> a -> a
(<>)
[ Peer
-> Map (EventId Peer) (Output e)
-> Map Peer (Map (EventId Peer) (Output e))
forall k a. k -> a -> Map k a
Map.singleton Peer
peer (EventId Peer -> Output e -> Map (EventId Peer) (Output e)
forall k a. k -> a -> Map k a
Map.singleton EventId Peer
eid Output e
o)
| (EventId Peer
eid, Output e
o) <- Map (EventId Peer) (Output e) -> [(EventId Peer, Output e)]
forall k a. Map k a -> [(k, a)]
Map.toList Map (EventId Peer) (Output e)
outputs
, Just Peer
peer <- [EventId Peer -> Maybe Peer
forall p. EventId p -> Maybe p
EF.source EventId Peer
eid]
]
respondToWaiting outputs
sequence_
[ do
logDebug $ "Sending remote outputs: " <> showt byRemotePeer
sendPeer (PMOutputs outputsForPeer) peer
| (peer, outputsForPeer) <- Map.toList byRemotePeer
, peer /= self
]
pure v
waitOn :: (Monad m)
=> EventId Peer
-> Responder (Output e)
-> StateT (RuntimeState e) m ()
waitOn :: forall (m :: * -> *) e.
Monad m =>
EventId Peer
-> Responder (Output e) -> StateT (RuntimeState e) m ()
waitOn EventId Peer
sid Responder (Output e)
responder =
(RuntimeState e -> RuntimeState e) -> StateT (RuntimeState e) m ()
forall s (m :: * -> *). MonadState s m => (s -> s) -> m ()
modify' (\state :: RuntimeState e
state@RuntimeState {Map (EventId Peer) (Responder (Output e))
rsWaiting :: forall e.
RuntimeState e -> Map (EventId Peer) (Responder (Output e))
rsWaiting :: Map (EventId Peer) (Responder (Output e))
rsWaiting} -> RuntimeState e
state {
rsWaiting = Map.insert sid responder rsWaiting
})
propagate
:: ( EventConstraints e
, MonadLoggerIO m
, MonadState (RuntimeState e) m
)
=> m ()
propagate :: forall e (m :: * -> *).
(EventConstraints e, MonadLoggerIO m,
MonadState (RuntimeState e) m) =>
m ()
propagate = do
(self, cluster) <- (RuntimeState e -> (Peer, EventFold ClusterName Peer e))
-> m (Peer, EventFold ClusterName Peer e)
forall s (m :: * -> *) a. MonadState s m => (s -> a) -> m a
gets (RuntimeState e -> Peer
forall e. RuntimeState e -> Peer
rsSelf (RuntimeState e -> Peer)
-> (RuntimeState e -> EventFold ClusterName Peer e)
-> RuntimeState e
-> (Peer, EventFold ClusterName Peer e)
forall b c c'. (b -> c) -> (b -> c') -> b -> (c, c')
forall (a :: * -> * -> *) b c c'.
Arrow a =>
a b c -> a b c' -> a b (c, c')
&&& RuntimeState e -> EventFold ClusterName Peer e
forall e. RuntimeState e -> EventFold ClusterName Peer e
rsClusterState)
let
targets = Peer -> Set Peer -> Set Peer
forall a. Ord a => a -> Set a -> Set a
Set.delete Peer
self (Set Peer -> Set Peer) -> Set Peer -> Set Peer
forall a b. (a -> b) -> a -> b
$
EventFold ClusterName Peer e -> Set Peer
forall p o e. Ord p => EventFold o p e -> Set p
EF.allParticipants EventFold ClusterName Peer e
cluster
liftIO (shuffleM (Set.toList targets)) >>= \case
[] -> () -> m ()
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Peer
target:[Peer]
_ ->
case Peer
-> EventFold ClusterName Peer e -> Maybe (Diff ClusterName Peer e)
forall o p e. Ord p => p -> EventFold o p e -> Maybe (Diff o p e)
events Peer
target EventFold ClusterName Peer e
cluster of
Maybe (Diff ClusterName Peer e)
Nothing -> do
Text -> m ()
forall (m :: * -> *). (HasCallStack, MonadLogger m) => Text -> m ()
logInfo
(Text -> m ()) -> Text -> m ()
forall a b. (a -> b) -> a -> b
$ Text
"Sending full merge because the target's join event "
Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"has not reaached the infimum"
PeerMessage e -> Peer -> m ()
forall (m :: * -> *) e.
(EventConstraints e, MonadLoggerIO m,
MonadState (RuntimeState e) m) =>
PeerMessage e -> Peer -> m ()
sendPeer (EventFold ClusterName Peer e -> PeerMessage e
forall e. EventFold ClusterName Peer e -> PeerMessage e
PMFullMerge EventFold ClusterName Peer e
cluster) Peer
target
Just Diff ClusterName Peer e
diff
| Diff ClusterName Peer e -> Int
forall o p e. Diff o p e -> Int
diffSize Diff ClusterName Peer e
diff Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0 ->
PeerMessage e -> Peer -> m ()
forall (m :: * -> *) e.
(EventConstraints e, MonadLoggerIO m,
MonadState (RuntimeState e) m) =>
PeerMessage e -> Peer -> m ()
sendPeer (Diff ClusterName Peer e -> PeerMessage e
forall e. Diff ClusterName Peer e -> PeerMessage e
PMMerge Diff ClusterName Peer e
diff) Peer
target
| Bool
otherwise ->
PeerMessage e -> Peer -> m ()
forall (m :: * -> *) e.
(EventConstraints e, MonadLoggerIO m,
MonadState (RuntimeState e) m) =>
PeerMessage e -> Peer -> m ()
sendPeer (EventFold ClusterName Peer e -> PeerMessage e
forall e. EventFold ClusterName Peer e -> PeerMessage e
PMFullMerge EventFold ClusterName Peer e
cluster) Peer
target
disconnectObsolete
where
disconnectObsolete
:: ( MonadState (RuntimeState e) m
, MonadLogger m
)
=> m ()
disconnectObsolete :: forall e (m :: * -> *).
(MonadState (RuntimeState e) m, MonadLogger m) =>
m ()
disconnectObsolete = do
(cluster, conns) <- (RuntimeState e
-> (EventFold ClusterName Peer e, Map Peer (Connection e)))
-> m (EventFold ClusterName Peer e, Map Peer (Connection e))
forall s (m :: * -> *) a. MonadState s m => (s -> a) -> m a
gets (RuntimeState e -> EventFold ClusterName Peer e
forall e. RuntimeState e -> EventFold ClusterName Peer e
rsClusterState (RuntimeState e -> EventFold ClusterName Peer e)
-> (RuntimeState e -> Map Peer (Connection e))
-> RuntimeState e
-> (EventFold ClusterName Peer e, Map Peer (Connection e))
forall b c c'. (b -> c) -> (b -> c') -> b -> (c, c')
forall (a :: * -> * -> *) b c c'.
Arrow a =>
a b c -> a b c' -> a b (c, c')
&&& RuntimeState e -> Map Peer (Connection e)
forall e. RuntimeState e -> Map Peer (Connection e)
rsConnections)
let obsolete = Map Peer (Connection e) -> Set Peer
forall k a. Map k a -> Set k
Map.keysSet Map Peer (Connection e)
conns Set Peer -> Set Peer -> Set Peer
forall a. Ord a => Set a -> Set a -> Set a
\\ EventFold ClusterName Peer e -> Set Peer
forall p o e. Ord p => EventFold o p e -> Set p
EF.allParticipants EventFold ClusterName Peer e
cluster
unless (Set.null obsolete) $
logInfo $ "Disconnecting obsolete: " <> showt obsolete
mapM_ disconnect obsolete
respondToWaiting
:: forall m e.
( MonadLoggerIO m
, MonadState (RuntimeState e) m
, Show (Output e)
)
=> Map (EventId Peer) (Output e)
-> m ()
respondToWaiting :: forall (m :: * -> *) e.
(MonadLoggerIO m, MonadState (RuntimeState e) m,
Show (Output e)) =>
Map (EventId Peer) (Output e) -> m ()
respondToWaiting Map (EventId Peer) (Output e)
available = do
rs <- m (RuntimeState e)
forall s (m :: * -> *). MonadState s m => m s
get
logDebug
$ "Responding to: " <> showt (available, Map.keysSet (rsWaiting rs))
mapM_ respondToOne (Map.toList available)
where
respondToOne
:: (EventId Peer, Output e)
-> m ()
respondToOne :: (EventId Peer, Output e) -> m ()
respondToOne (EventId Peer
sid, Output e
output) = do
state@RuntimeState {rsWaiting} <- m (RuntimeState e)
forall s (m :: * -> *). MonadState s m => m s
get
case Map.lookup sid rsWaiting of
Maybe (Responder (Output e))
Nothing -> () -> m ()
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Just Responder (Output e)
responder -> do
Responder (Output e) -> Output e -> m ()
forall (m :: * -> *) a. MonadIO m => Responder a -> a -> m ()
respond Responder (Output e)
responder Output e
output
RuntimeState e -> m ()
forall s (m :: * -> *). MonadState s m => s -> m ()
put RuntimeState e
state {rsWaiting = Map.delete sid rsWaiting}
data StartupMode e
= NewCluster
Peer
ClusterName
| JoinCluster
Peer
ClusterName
Peer
| Recover
Peer
(EventFold ClusterName Peer e)
deriving stock instance
( Show e
, Show (Output e)
, Show (State e)
)
=>
Show (StartupMode e)
makeRuntimeState :: (EventConstraints e, MonadLoggerIO m)
=> (Peer -> EventFold ClusterName Peer e -> IO ())
-> StartupMode e
-> m (RuntimeState e)
makeRuntimeState :: forall e (m :: * -> *).
(EventConstraints e, MonadLoggerIO m) =>
(Peer -> EventFold ClusterName Peer e -> IO ())
-> StartupMode e -> m (RuntimeState e)
makeRuntimeState
Peer -> EventFold ClusterName Peer e -> IO ()
notify
(NewCluster Peer
self ClusterName
clusterId)
= do
Text -> m ()
forall (m :: * -> *). (HasCallStack, MonadLogger m) => Text -> m ()
logInfo Text
"Starting a new cluster."
(Peer -> EventFold ClusterName Peer e -> IO ())
-> StartupMode e -> m (RuntimeState e)
forall e (m :: * -> *).
(EventConstraints e, MonadLoggerIO m) =>
(Peer -> EventFold ClusterName Peer e -> IO ())
-> StartupMode e -> m (RuntimeState e)
makeRuntimeState
Peer -> EventFold ClusterName Peer e -> IO ()
notify
(Peer -> EventFold ClusterName Peer e -> StartupMode e
forall e. Peer -> EventFold ClusterName Peer e -> StartupMode e
Recover Peer
self (ClusterName -> Peer -> EventFold ClusterName Peer e
forall o p e.
(Default (State e), Event p e, Ord p) =>
o -> p -> EventFold o p e
EF.new ClusterName
clusterId Peer
self))
makeRuntimeState
Peer -> EventFold ClusterName Peer e -> IO ()
notify
(JoinCluster Peer
self ClusterName
_clusterName Peer
targetPeer)
= do
Text -> m ()
forall (m :: * -> *). (HasCallStack, MonadLogger m) => Text -> m ()
logInfo (Text -> m ()) -> Text -> m ()
forall a b. (a -> b) -> a -> b
$ Text
"Trying to join an existing cluster on " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> AddressDescription -> Text
forall a b. (Show a, IsString b) => a -> b
showt AddressDescription
addr
JoinOk cluster <-
JoinRequest -> m (JoinResponse e)
forall e (m :: * -> *).
(EventConstraints e, MonadLoggerIO m) =>
JoinRequest -> m (JoinResponse e)
requestJoin
(JoinRequest -> m (JoinResponse e))
-> (Peer -> JoinRequest) -> Peer -> m (JoinResponse e)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Peer -> JoinRequest
JoinRequest
(Peer -> m (JoinResponse e)) -> Peer -> m (JoinResponse e)
forall a b. (a -> b) -> a -> b
$ Peer
self
logInfo $ "Join response with cluster: " <> showt cluster
makeRuntimeState notify (Recover self cluster)
where
requestJoin :: (EventConstraints e, MonadLoggerIO m)
=> JoinRequest
-> m (JoinResponse e)
requestJoin :: forall e (m :: * -> *).
(EventConstraints e, MonadLoggerIO m) =>
JoinRequest -> m (JoinResponse e)
requestJoin JoinRequest
joinMsg = ((JoinRequest -> m (JoinResponse e))
-> JoinRequest -> m (JoinResponse e)
forall a b. (a -> b) -> a -> b
$ JoinRequest
joinMsg) ((JoinRequest -> m (JoinResponse e)) -> m (JoinResponse e))
-> m (JoinRequest -> m (JoinResponse e)) -> m (JoinResponse e)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< AddressDescription
-> Maybe ClientParams -> m (JoinRequest -> m (JoinResponse e))
forall (n :: * -> *) request (m :: * -> *) response.
(Binary request, Binary response, MonadIO m, MonadLoggerIO n,
Show response) =>
AddressDescription
-> Maybe ClientParams -> n (request -> m response)
connectServer AddressDescription
addr Maybe ClientParams
forall a. Maybe a
Nothing
addr :: AddressDescription
addr :: AddressDescription
addr =
Text -> AddressDescription
AddressDescription
(
Peer -> Text
unPeer Peer
targetPeer
Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
":" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> PortNumber -> Text
forall a b. (Show a, IsString b) => a -> b
showt PortNumber
joinMessagePort
)
makeRuntimeState
Peer -> EventFold ClusterName Peer e -> IO ()
notify
(Recover Peer
self EventFold ClusterName Peer e
clusterState)
= do
rsNextId <- m MessageId
forall (m :: * -> *). MonadIO m => m MessageId
newSequence
return
RuntimeState
{ rsSelf = self
, rsClusterState = clusterState
, rsConnections = mempty
, rsWaiting = mempty
, rsCalls = mempty
, rsBroadcalls = mempty
, rsNextId
, rsNotify = notify self
, rsJoins = mempty
}
newSequence :: (MonadIO m) => m MessageId
newSequence :: forall (m :: * -> *). MonadIO m => m MessageId
newSequence = do
sid <- m UUID
forall (m :: * -> *). MonadIO m => m UUID
getUUID
pure (M sid 0)
where
getUUID :: (MonadIO m) => m UUID
getUUID :: forall (m :: * -> *). MonadIO m => m UUID
getUUID = IO (Maybe UUID) -> m (Maybe UUID)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (Maybe UUID)
nextUUID m (Maybe UUID) -> (Maybe UUID -> m UUID) -> m UUID
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= m UUID -> (UUID -> m UUID) -> Maybe UUID -> m UUID
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (m ()
wait m () -> m UUID -> m UUID
forall a b. m a -> m b -> m b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> m UUID
forall (m :: * -> *). MonadIO m => m UUID
getUUID) UUID -> m UUID
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return
where
wait :: m ()
wait = IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Int -> IO ()
threadDelay Int
oneMillisecond)
oneMillisecond :: Int
oneMillisecond = Int
1000
nextMessageId :: MessageId -> MessageId
nextMessageId :: MessageId -> MessageId
nextMessageId (M UUID
sequenceId Word64
ord) = UUID -> Word64 -> MessageId
M UUID
sequenceId (Word64 -> Word64
forall a. Enum a => a -> a
succ Word64
ord)
getClusterName :: Runtime e -> ClusterName
getClusterName :: forall e. Runtime e -> ClusterName
getClusterName = Runtime e -> ClusterName
forall e. Runtime e -> ClusterName
rClusterId
joinMessagePort :: PortNumber
joinMessagePort :: PortNumber
joinMessagePort = PortNumber
5289
respond :: (MonadIO m) => Responder a -> a -> m ()
respond :: forall (m :: * -> *) a. MonadIO m => Responder a -> a -> m ()
respond Responder a
responder = m Responded -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m Responded -> m ()) -> (a -> m Responded) -> a -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Responder a -> a -> m Responded
forall (m :: * -> *) a.
MonadIO m =>
Responder a -> a -> m Responded
Fork.respond Responder a
responder
getStats :: (MonadIO m) => Runtime e -> m Stats
getStats :: forall (m :: * -> *) e. MonadIO m => Runtime e -> m Stats
getStats Runtime e
runtime = do
now <- IO TimeSpec -> m TimeSpec
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO TimeSpec
forall (m :: * -> *). MonadTimeSpec m => m TimeSpec
getTime
stats <- liftIO $ readIORef (rStats runtime)
pure
Stats
{ timeWithoutProgress = diffTimeSpec now <$> stats
}