{-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE NumericUnderscores #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE StandaloneDeriving #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE UndecidableInstances #-}

{- | The meat of the Legion runtime implementation. -}
module OM.Legion.Runtime (
  -- * Starting the framework runtime.
  forkLegionary,
  Runtime,
  StartupMode(..),

  -- * Constraints
  MonadConstraints,

  -- * Runtime Interface.
  applyFast,
  applyConsistent,
  readState,
  call,
  cast,
  broadcall,
  broadcast,
  eject,
  getSelf,
  getClusterName,
  getStats,
  Stats(..),
) where

import Control.Arrow ((&&&))
import Control.Concurrent (Chan, newChan, readChan, threadDelay,
  writeChan)
import Control.Exception.Safe (MonadCatch, tryAny)
import Control.Monad (unless, void, when)
import Control.Monad.IO.Class (MonadIO(liftIO))
import Control.Monad.IO.Unlift (MonadUnliftIO)
import Control.Monad.Logger.CallStack (LoggingT(runLoggingT),
  MonadLoggerIO(askLoggerIO), LogStr, MonadLogger, logDebug, logError,
  logInfo)
import Control.Monad.State (MonadState(get, put), StateT, evalStateT,
  gets, modify')
import Control.Monad.Trans.Class (lift)
import Data.Aeson (ToJSON)
import Data.Binary (Binary)
import Data.ByteString.Lazy (ByteString)
import Data.CRDT.EventFold (Event(Output, State),
  UpdateResult(urEventFold, urOutputs), Diff, EventFold, EventId,
  infimumId, projParticipants)
import Data.CRDT.EventFold.Monad (MonadUpdateEF(diffMerge, disassociate,
  event, fullMerge, participate), EventFoldT, runEventFoldT)
import Data.Conduit ((.|), awaitForever, runConduit, yield)
import Data.Default.Class (Default)
import Data.IORef (IORef, atomicModifyIORef', newIORef, readIORef)
import Data.Map (Map)
import Data.Set ((\\), Set)
import Data.Time (DiffTime, diffTimeToPicoseconds, picosecondsToDiffTime)
import Data.UUID (UUID)
import Data.UUID.V1 (nextUUID)
import GHC.Generics (Generic)
import Network.Socket (PortNumber)
import OM.Fork (Actor(Msg, actorChan), Race, Responder, race)
import OM.Legion.Conduit (chanToSink)
import OM.Legion.Connection (JoinResponse(JoinOk),
  RuntimeState(RuntimeState, rsBroadcalls, rsCalls, rsClusterState,
  rsConnections, rsJoins, rsNextId, rsNotify, rsSelf, rsWaiting),
  EventConstraints, disconnect, peerMessagePort, sendPeer)
import OM.Legion.MsgChan (MessageId(M), Peer(unPeer), PeerMessage(PMCall,
  PMCallResponse, PMCast, PMFullMerge, PMMerge, PMOutputs), ClusterName)
import OM.Logging (withPrefix)
import OM.Show (showj, showt)
import OM.Socket (AddressDescription(AddressDescription), connectServer,
  openIngress, openServer)
import OM.Time (MonadTimeSpec(getTime), addTime, diffTimeSpec)
import System.Clock (TimeSpec)
import System.Random.Shuffle (shuffleM)
import qualified Data.Binary as Binary
import qualified Data.CRDT.EventFold as EF
import qualified Data.Map as Map
import qualified Data.Set as Set
import qualified OM.Fork as Fork

{-# ANN module ("HLint: ignore Redundant <$>" :: String) #-}
{-# ANN module ("HLint: ignore Use underscore" :: String) #-}

{- |
  Shorthand for all the monad constraints, mainly use so that
  documentation renders better.
-}
type MonadConstraints m =
  ( MonadCatch m
  , MonadFail m
  , MonadLoggerIO m
  , MonadTimeSpec m
  , MonadUnliftIO m
  , Race
  )


{- | Fork the Legion runtime system. -}
forkLegionary
  :: ( EventConstraints e
     , MonadConstraints m
     )
  => (ByteString -> IO ByteString) {- ^ Handle a user call request. -}
  -> (ByteString -> IO ()) {- ^ Handle a user cast message. -}
  -> (Peer -> EventFold ClusterName Peer e -> IO ())
     {- ^ Callback when the cluster-wide eventfold changes. -}
  -> Int
     {- ^
       The propagation interval, in microseconds (for use with
       `threadDelay`).
     -}
  -> StartupMode e
     {- ^
       How to start the runtime, by creating new cluster or joining an
       existing cluster.
     -}
  -> m (Runtime e)
forkLegionary :: forall e (m :: * -> *).
(EventConstraints e, MonadConstraints m) =>
(ByteString -> IO ByteString)
-> (ByteString -> IO ())
-> (Peer -> EventFold ClusterName Peer e -> IO ())
-> Int
-> StartupMode e
-> m (Runtime e)
forkLegionary
    ByteString -> IO ByteString
handleUserCall
    ByteString -> IO ()
handleUserCast
    Peer -> EventFold ClusterName Peer e -> IO ()
notify
    Int
resendInterval
    StartupMode e
startupMode
  = do
    Text -> m ()
forall (m :: * -> *). (HasCallStack, MonadLogger m) => Text -> m ()
logInfo (Text -> m ()) -> Text -> m ()
forall a b. (a -> b) -> a -> b
$ Text
"Starting up with the following Mode: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> StartupMode e -> Text
forall a b. (Show a, IsString b) => a -> b
showt StartupMode e
startupMode
    RuntimeState e
rts <- (Peer -> EventFold ClusterName Peer e -> IO ())
-> StartupMode e -> m (RuntimeState e)
forall e (m :: * -> *).
(EventConstraints e, MonadLoggerIO m) =>
(Peer -> EventFold ClusterName Peer e -> IO ())
-> StartupMode e -> m (RuntimeState e)
makeRuntimeState Peer -> EventFold ClusterName Peer e -> IO ()
notify StartupMode e
startupMode
    RChan e
runtimeChan <- Chan (RuntimeMessage e) -> RChan e
forall e. Chan (RuntimeMessage e) -> RChan e
RChan (Chan (RuntimeMessage e) -> RChan e)
-> m (Chan (RuntimeMessage e)) -> m (RChan e)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO (Chan (RuntimeMessage e)) -> m (Chan (RuntimeMessage e))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (Chan (RuntimeMessage e))
forall a. IO (Chan a)
newChan
    Loc -> Text -> LogLevel -> LogStr -> IO ()
logging <- LogStr
-> (Loc -> Text -> LogLevel -> LogStr -> IO ())
-> Loc
-> Text
-> LogLevel
-> LogStr
-> IO ()
withPrefix (Peer -> LogStr
logPrefix (RuntimeState e -> Peer
forall e. RuntimeState e -> Peer
rsSelf RuntimeState e
rts)) ((Loc -> Text -> LogLevel -> LogStr -> IO ())
 -> Loc -> Text -> LogLevel -> LogStr -> IO ())
-> m (Loc -> Text -> LogLevel -> LogStr -> IO ())
-> m (Loc -> Text -> LogLevel -> LogStr -> IO ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> m (Loc -> Text -> LogLevel -> LogStr -> IO ())
forall (m :: * -> *).
MonadLoggerIO m =>
m (Loc -> Text -> LogLevel -> LogStr -> IO ())
askLoggerIO
    IORef (Map Peer TimeSpec)
rStats <- IO (IORef (Map Peer TimeSpec)) -> m (IORef (Map Peer TimeSpec))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef (Map Peer TimeSpec)) -> m (IORef (Map Peer TimeSpec)))
-> IO (IORef (Map Peer TimeSpec)) -> m (IORef (Map Peer TimeSpec))
forall a b. (a -> b) -> a -> b
$ Map Peer TimeSpec -> IO (IORef (Map Peer TimeSpec))
forall a. a -> IO (IORef a)
newIORef Map Peer TimeSpec
forall a. Monoid a => a
mempty
    (LoggingT m ()
-> (Loc -> Text -> LogLevel -> LogStr -> IO ()) -> m ()
forall (m :: * -> *) a.
LoggingT m a -> (Loc -> Text -> LogLevel -> LogStr -> IO ()) -> m a
`runLoggingT` Loc -> Text -> LogLevel -> LogStr -> IO ()
logging) (LoggingT m () -> m ()) -> LoggingT m () -> m ()
forall a b. (a -> b) -> a -> b
$
      (ByteString -> IO ByteString)
-> (ByteString -> IO ())
-> Int
-> RuntimeState e
-> RChan e
-> IORef (Map Peer TimeSpec)
-> LoggingT m ()
forall e (m :: * -> *).
(Binary (Output e), Binary (State e), Binary e, Default (State e),
 Eq (Output e), Eq e, Event Peer e, MonadCatch m, MonadFail m,
 MonadLoggerIO m, MonadTimeSpec m, MonadUnliftIO m, Race,
 Show (Output e), Show (State e), Show e, ToJSON (Output e),
 ToJSON (State e), ToJSON e) =>
(ByteString -> IO ByteString)
-> (ByteString -> IO ())
-> Int
-> RuntimeState e
-> RChan e
-> IORef (Map Peer TimeSpec)
-> m ()
executeRuntime
        ByteString -> IO ByteString
handleUserCall
        ByteString -> IO ()
handleUserCast
        Int
resendInterval
        RuntimeState e
rts
        RChan e
runtimeChan
        IORef (Map Peer TimeSpec)
rStats
    let
      clusterId :: ClusterName
      clusterId :: ClusterName
clusterId = EventFold ClusterName Peer e -> ClusterName
forall o p e. EventFold o p e -> o
EF.origin (RuntimeState e -> EventFold ClusterName Peer e
forall e. RuntimeState e -> EventFold ClusterName Peer e
rsClusterState RuntimeState e
rts)
    Runtime e -> m (Runtime e)
forall (m :: * -> *) a. Monad m => a -> m a
return
      Runtime :: forall e.
RChan e
-> Peer -> ClusterName -> IORef (Map Peer TimeSpec) -> Runtime e
Runtime
        { rChan :: RChan e
rChan = RChan e
runtimeChan
        , rSelf :: Peer
rSelf = RuntimeState e -> Peer
forall e. RuntimeState e -> Peer
rsSelf RuntimeState e
rts
        , rClusterId :: ClusterName
rClusterId = ClusterName
clusterId
        , IORef (Map Peer TimeSpec)
rStats :: IORef (Map Peer TimeSpec)
rStats :: IORef (Map Peer TimeSpec)
rStats
        }
  where
    logPrefix :: Peer -> LogStr
    logPrefix :: Peer -> LogStr
logPrefix Peer
self_ = LogStr
"[" LogStr -> LogStr -> LogStr
forall a. Semigroup a => a -> a -> a
<> Peer -> LogStr
forall a b. (Show a, IsString b) => a -> b
showt Peer
self_ LogStr -> LogStr -> LogStr
forall a. Semigroup a => a -> a -> a
<> LogStr
"] "


{- | A handle on the Legion runtime. -}
data Runtime e = Runtime
  {      forall e. Runtime e -> RChan e
rChan :: RChan e
  ,      forall e. Runtime e -> Peer
rSelf :: Peer
  , forall e. Runtime e -> ClusterName
rClusterId :: ClusterName
  ,     forall e. Runtime e -> IORef (Map Peer TimeSpec)
rStats :: IORef (Map Peer TimeSpec)
  }
instance Actor (Runtime e) where
  type Msg (Runtime e) = RuntimeMessage e
  actorChan :: Runtime e -> Msg (Runtime e) -> IO ()
actorChan = RChan e -> RuntimeMessage e -> IO ()
forall a. Actor a => a -> Msg a -> IO ()
actorChan (RChan e -> RuntimeMessage e -> IO ())
-> (Runtime e -> RChan e) -> Runtime e -> RuntimeMessage e -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Runtime e -> RChan e
forall e. Runtime e -> RChan e
rChan


{- |
  Some basic stats that can be used to intuit the health of the cluster.
  We currently only report on how long it has been since some peer has
  made some progress.
-}
newtype Stats = Stats
  { Stats -> Map Peer DiffTime
timeWithoutProgress :: Map Peer DiffTime
                           {- ^
                             How long it has been since a 'Peer' has
                             made progress (if it is 'divergent'). If
                             the peer is completely up to date as far as
                             we know, then it does not appear in the map
                             at all. Only peers which we are expecting
                             to make progress appear.
                           -}
  }
  deriving stock ((forall x. Stats -> Rep Stats x)
-> (forall x. Rep Stats x -> Stats) -> Generic Stats
forall x. Rep Stats x -> Stats
forall x. Stats -> Rep Stats x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep Stats x -> Stats
$cfrom :: forall x. Stats -> Rep Stats x
Generic, Int -> Stats -> ShowS
[Stats] -> ShowS
Stats -> String
(Int -> Stats -> ShowS)
-> (Stats -> String) -> ([Stats] -> ShowS) -> Show Stats
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [Stats] -> ShowS
$cshowList :: [Stats] -> ShowS
show :: Stats -> String
$cshow :: Stats -> String
showsPrec :: Int -> Stats -> ShowS
$cshowsPrec :: Int -> Stats -> ShowS
Show, Stats -> Stats -> Bool
(Stats -> Stats -> Bool) -> (Stats -> Stats -> Bool) -> Eq Stats
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: Stats -> Stats -> Bool
$c/= :: Stats -> Stats -> Bool
== :: Stats -> Stats -> Bool
$c== :: Stats -> Stats -> Bool
Eq)
  deriving anyclass ([Stats] -> Encoding
[Stats] -> Value
Stats -> Encoding
Stats -> Value
(Stats -> Value)
-> (Stats -> Encoding)
-> ([Stats] -> Value)
-> ([Stats] -> Encoding)
-> ToJSON Stats
forall a.
(a -> Value)
-> (a -> Encoding)
-> ([a] -> Value)
-> ([a] -> Encoding)
-> ToJSON a
toEncodingList :: [Stats] -> Encoding
$ctoEncodingList :: [Stats] -> Encoding
toJSONList :: [Stats] -> Value
$ctoJSONList :: [Stats] -> Value
toEncoding :: Stats -> Encoding
$ctoEncoding :: Stats -> Encoding
toJSON :: Stats -> Value
$ctoJSON :: Stats -> Value
ToJSON)
instance Binary Stats where
  get :: Get Stats
get =
    Map Peer DiffTime -> Stats
Stats (Map Peer DiffTime -> Stats)
-> Get (Map Peer DiffTime) -> Get Stats
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ((Integer -> DiffTime) -> Map Peer Integer -> Map Peer DiffTime
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Integer -> DiffTime
picosecondsToDiffTime (Map Peer Integer -> Map Peer DiffTime)
-> Get (Map Peer Integer) -> Get (Map Peer DiffTime)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Get (Map Peer Integer)
forall t. Binary t => Get t
Binary.get)
  put :: Stats -> Put
put (Stats Map Peer DiffTime
timeWithoutProgress) =
    Map Peer Integer -> Put
forall t. Binary t => t -> Put
Binary.put (DiffTime -> Integer
diffTimeToPicoseconds (DiffTime -> Integer) -> Map Peer DiffTime -> Map Peer Integer
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Map Peer DiffTime
timeWithoutProgress)

      
{- | The type of the runtime message channel. -}
newtype RChan e = RChan {
    forall e. RChan e -> Chan (RuntimeMessage e)
unRChan :: Chan (RuntimeMessage e)
  }
instance Actor (RChan e) where
  type Msg (RChan e) = RuntimeMessage e
  actorChan :: RChan e -> Msg (RChan e) -> IO ()
actorChan = Chan (RuntimeMessage e) -> RuntimeMessage e -> IO ()
forall a. Chan a -> a -> IO ()
writeChan (Chan (RuntimeMessage e) -> RuntimeMessage e -> IO ())
-> (RChan e -> Chan (RuntimeMessage e))
-> RChan e
-> RuntimeMessage e
-> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. RChan e -> Chan (RuntimeMessage e)
forall e. RChan e -> Chan (RuntimeMessage e)
unRChan


{- |
  Update the distributed cluster state by applying an event. The event
  output will be returned immediately and may not reflect a totally
  consistent view of the cluster. The state update itself, however,
  is guaranteed to be applied atomically and consistently throughout
  the cluster.
-}
applyFast :: (MonadIO m)
  => Runtime e     {- ^ The runtime handle. -}
  -> e             {- ^ The event to be applied. -}
  -> m (Output e)  {- ^ Returns the possibly inconsistent event output. -}
applyFast :: forall (m :: * -> *) e. MonadIO m => Runtime e -> e -> m (Output e)
applyFast Runtime e
runtime e
e = Runtime e
-> (Responder (Output e) -> Msg (Runtime e)) -> m (Output e)
forall actor (m :: * -> *) a.
(Actor actor, MonadIO m) =>
actor -> (Responder a -> Msg actor) -> m a
Fork.call Runtime e
runtime (e -> Responder (Output e) -> RuntimeMessage e
forall e. e -> Responder (Output e) -> RuntimeMessage e
ApplyFast e
e)


{- |
  Update the distributed cluster state by applying an event. Both the
  event output and resulting state will be totally consistent throughout
  the cluster.
-}
applyConsistent :: (MonadIO m)
  => Runtime e     {- ^ The runtime handle. -}
  -> e             {- ^ The event to be applied. -}
  -> m (Output e)  {- ^ Returns the strongly consistent event output. -}
applyConsistent :: forall (m :: * -> *) e. MonadIO m => Runtime e -> e -> m (Output e)
applyConsistent Runtime e
runtime e
e = Runtime e
-> (Responder (Output e) -> Msg (Runtime e)) -> m (Output e)
forall actor (m :: * -> *) a.
(Actor actor, MonadIO m) =>
actor -> (Responder a -> Msg actor) -> m a
Fork.call Runtime e
runtime (e -> Responder (Output e) -> RuntimeMessage e
forall e. e -> Responder (Output e) -> RuntimeMessage e
ApplyConsistent e
e)


{- | Read the current powerstate value. -}
readState :: (MonadIO m)
  => Runtime e
  -> m (EventFold ClusterName Peer e)
readState :: forall (m :: * -> *) e.
MonadIO m =>
Runtime e -> m (EventFold ClusterName Peer e)
readState Runtime e
runtime = Runtime e
-> (Responder (EventFold ClusterName Peer e) -> Msg (Runtime e))
-> m (EventFold ClusterName Peer e)
forall actor (m :: * -> *) a.
(Actor actor, MonadIO m) =>
actor -> (Responder a -> Msg actor) -> m a
Fork.call Runtime e
runtime Responder (EventFold ClusterName Peer e) -> Msg (Runtime e)
forall e.
Responder (EventFold ClusterName Peer e) -> RuntimeMessage e
ReadState


{- |
  Send a user message to some other peer, and block until a response
  is received.
-}
call :: (MonadIO m) => Runtime e -> Peer -> ByteString -> m ByteString
call :: forall (m :: * -> *) e.
MonadIO m =>
Runtime e -> Peer -> ByteString -> m ByteString
call Runtime e
runtime Peer
target ByteString
msg = Runtime e
-> (Responder ByteString -> Msg (Runtime e)) -> m ByteString
forall actor (m :: * -> *) a.
(Actor actor, MonadIO m) =>
actor -> (Responder a -> Msg actor) -> m a
Fork.call Runtime e
runtime (Peer -> ByteString -> Responder ByteString -> RuntimeMessage e
forall e.
Peer -> ByteString -> Responder ByteString -> RuntimeMessage e
Call Peer
target ByteString
msg)


{- | Send the result of a call back to the peer that originated it. -}
sendCallResponse :: (MonadIO m)
  => RChan e
  -> Peer
  -> MessageId
  -> ByteString
  -> m ()
sendCallResponse :: forall (m :: * -> *) e.
MonadIO m =>
RChan e -> Peer -> MessageId -> ByteString -> m ()
sendCallResponse RChan e
runtimeChan Peer
target MessageId
mid ByteString
msg =
  RChan e -> Msg (RChan e) -> m ()
forall actor (m :: * -> *).
(Actor actor, MonadIO m) =>
actor -> Msg actor -> m ()
Fork.cast RChan e
runtimeChan (Peer -> MessageId -> ByteString -> RuntimeMessage e
forall e. Peer -> MessageId -> ByteString -> RuntimeMessage e
SendCallResponse Peer
target MessageId
mid ByteString
msg)


{- | Send a user message to some other peer, without waiting on a response. -}
cast :: (MonadIO m) => Runtime e -> Peer -> ByteString -> m ()
cast :: forall (m :: * -> *) e.
MonadIO m =>
Runtime e -> Peer -> ByteString -> m ()
cast Runtime e
runtime Peer
target ByteString
message = Runtime e -> Msg (Runtime e) -> m ()
forall actor (m :: * -> *).
(Actor actor, MonadIO m) =>
actor -> Msg actor -> m ()
Fork.cast Runtime e
runtime (Peer -> ByteString -> RuntimeMessage e
forall e. Peer -> ByteString -> RuntimeMessage e
Cast Peer
target ByteString
message)


{- |
  Send a user message to all peers, and block until a response is received
  from all of them.
-}
broadcall :: (MonadIO m)
  => Runtime e
  -> DiffTime {- ^ The timeout. -}
  -> ByteString
  -> m (Map Peer (Maybe ByteString))
broadcall :: forall (m :: * -> *) e.
MonadIO m =>
Runtime e
-> DiffTime -> ByteString -> m (Map Peer (Maybe ByteString))
broadcall Runtime e
runtime DiffTime
timeout ByteString
msg = Runtime e
-> (Responder (Map Peer (Maybe ByteString)) -> Msg (Runtime e))
-> m (Map Peer (Maybe ByteString))
forall actor (m :: * -> *) a.
(Actor actor, MonadIO m) =>
actor -> (Responder a -> Msg actor) -> m a
Fork.call Runtime e
runtime (DiffTime
-> ByteString
-> Responder (Map Peer (Maybe ByteString))
-> RuntimeMessage e
forall e.
DiffTime
-> ByteString
-> Responder (Map Peer (Maybe ByteString))
-> RuntimeMessage e
Broadcall DiffTime
timeout ByteString
msg)


{- | Send a user message to all peers, without wating on a response. -}
broadcast :: (MonadIO m) => Runtime e -> ByteString -> m ()
broadcast :: forall (m :: * -> *) e.
MonadIO m =>
Runtime e -> ByteString -> m ()
broadcast Runtime e
runtime ByteString
msg = Runtime e -> Msg (Runtime e) -> m ()
forall actor (m :: * -> *).
(Actor actor, MonadIO m) =>
actor -> Msg actor -> m ()
Fork.cast Runtime e
runtime (ByteString -> RuntimeMessage e
forall e. ByteString -> RuntimeMessage e
Broadcast ByteString
msg)


{- | Eject a peer from the cluster. -}
eject :: (MonadIO m) => Runtime e -> Peer -> m ()
eject :: forall (m :: * -> *) e. MonadIO m => Runtime e -> Peer -> m ()
eject Runtime e
runtime Peer
peer = Runtime e -> (Responder () -> Msg (Runtime e)) -> m ()
forall actor (m :: * -> *) a.
(Actor actor, MonadIO m) =>
actor -> (Responder a -> Msg actor) -> m a
Fork.call Runtime e
runtime (Peer -> Responder () -> RuntimeMessage e
forall e. Peer -> Responder () -> RuntimeMessage e
Eject Peer
peer)


{- | Get the identifier for the local peer. -}
getSelf :: Runtime e -> Peer
getSelf :: forall e. Runtime e -> Peer
getSelf = Runtime e -> Peer
forall e. Runtime e -> Peer
rSelf


{- | The types of messages that can be sent to the runtime. -}
data RuntimeMessage e
  = ApplyFast e (Responder (Output e))
  | ApplyConsistent e (Responder (Output e))
  | Eject Peer (Responder ())
  | Merge (Diff ClusterName Peer e)
  | FullMerge (EventFold ClusterName Peer e)
  | Outputs (Map (EventId Peer) (Output e))
  | Join JoinRequest (Responder (JoinResponse e))
  | ReadState (Responder (EventFold ClusterName Peer e))
  | Call Peer ByteString (Responder ByteString)
  | Cast Peer ByteString
  | Broadcall
      DiffTime
      ByteString
      (Responder (Map Peer (Maybe ByteString)))
  | Broadcast ByteString
  | SendCallResponse Peer MessageId ByteString
  | HandleCallResponse Peer MessageId ByteString
  | Resend (Responder ())
  | GetStats (Responder (EventFold ClusterName Peer e))
deriving stock instance
    ( Show e
    , Show (Output e)
    , Show (State e)
    )
  =>
    Show (RuntimeMessage e)


{- |
  Execute the Legion runtime, with the given user definitions, and
  framework settings. This function never returns (except maybe with an
  exception if something goes horribly wrong).
-}
executeRuntime
  :: ( Binary (Output e)
     , Binary (State e)
     , Binary e
     , Default (State e)
     , Eq (Output e)
     , Eq e
     , Event Peer e
     , MonadCatch m
     , MonadFail m
     , MonadLoggerIO m
     , MonadTimeSpec m
     , MonadUnliftIO m
     , Race
     , Show (Output e)
     , Show (State e)
     , Show e
     , ToJSON (Output e)
     , ToJSON (State e)
     , ToJSON e
     )
  => (ByteString -> IO ByteString)
     {- ^ Handle a user call request.  -}
  -> (ByteString -> IO ())
     {- ^ Handle a user cast message. -}
  -> Int
     {- ^
       The propagation interval, in microseconds (for use with
       `threadDelay`).
     -}
  -> RuntimeState e
  -> RChan e
     {- ^
       A source of requests, together with a way to respond to the
       requets.
     -}
  -> IORef (Map Peer TimeSpec)
  -> m ()
executeRuntime :: forall e (m :: * -> *).
(Binary (Output e), Binary (State e), Binary e, Default (State e),
 Eq (Output e), Eq e, Event Peer e, MonadCatch m, MonadFail m,
 MonadLoggerIO m, MonadTimeSpec m, MonadUnliftIO m, Race,
 Show (Output e), Show (State e), Show e, ToJSON (Output e),
 ToJSON (State e), ToJSON e) =>
(ByteString -> IO ByteString)
-> (ByteString -> IO ())
-> Int
-> RuntimeState e
-> RChan e
-> IORef (Map Peer TimeSpec)
-> m ()
executeRuntime
    ByteString -> IO ByteString
handleUserCall
    ByteString -> IO ()
handleUserCast
    Int
resendInterval
    RuntimeState e
rts
    RChan e
runtimeChan
    IORef (Map Peer TimeSpec)
peerStats
  = do
    {- Start the various messages sources. -}
    ProcessName -> m () -> m ()
forall (m :: * -> *) a.
(MonadCatch m, MonadLogger m, MonadUnliftIO m, Race) =>
ProcessName -> m a -> m ()
race ProcessName
"om-legion peer listener" m ()
forall (m :: * -> *). (MonadLoggerIO m, MonadFail m) => m ()
runPeerListener
    ProcessName -> m () -> m ()
forall (m :: * -> *) a.
(MonadCatch m, MonadLogger m, MonadUnliftIO m, Race) =>
ProcessName -> m a -> m ()
race ProcessName
"om-legion join listener" m ()
forall (m :: * -> *). (MonadLoggerIO m, MonadFail m) => m ()
runJoinListener
    ProcessName -> m () -> m ()
forall (m :: * -> *) a.
(MonadCatch m, MonadLogger m, MonadUnliftIO m, Race) =>
ProcessName -> m a -> m ()
race ProcessName
"om-legion periodic resend" m ()
forall (m :: * -> *). MonadIO m => m ()
runPeriodicResent
    ProcessName -> m Any -> m ()
forall (m :: * -> *) a.
(MonadCatch m, MonadLogger m, MonadUnliftIO m, Race) =>
ProcessName -> m a -> m ()
race ProcessName
"om-legion message handler" (m Any -> m ()) -> m Any -> m ()
forall a b. (a -> b) -> a -> b
$
      (StateT (RuntimeState e) m Any -> RuntimeState e -> m Any
forall (m :: * -> *) s a. Monad m => StateT s m a -> s -> m a
`evalStateT` RuntimeState e
rts)
        (
          let
            -- handleMessages :: StateT (RuntimeState e3) m Void
            handleMessages :: StateT (RuntimeState e) m Any
handleMessages = do
              RuntimeMessage e
msg <- IO (RuntimeMessage e)
-> StateT (RuntimeState e) m (RuntimeMessage e)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (RuntimeMessage e)
 -> StateT (RuntimeState e) m (RuntimeMessage e))
-> IO (RuntimeMessage e)
-> StateT (RuntimeState e) m (RuntimeMessage e)
forall a b. (a -> b) -> a -> b
$ Chan (RuntimeMessage e) -> IO (RuntimeMessage e)
forall a. Chan a -> IO a
readChan (RChan e -> Chan (RuntimeMessage e)
forall e. RChan e -> Chan (RuntimeMessage e)
unRChan RChan e
runtimeChan)
              RuntimeState {rsClusterState :: forall e. RuntimeState e -> EventFold ClusterName Peer e
rsClusterState = EventFold ClusterName Peer e
cluster1} <- StateT (RuntimeState e) m (RuntimeState e)
forall s (m :: * -> *). MonadState s m => m s
get
              Text -> StateT (RuntimeState e) m ()
forall (m :: * -> *). (HasCallStack, MonadLogger m) => Text -> m ()
logDebug (Text -> StateT (RuntimeState e) m ())
-> Text -> StateT (RuntimeState e) m ()
forall a b. (a -> b) -> a -> b
$ Text
"Handling: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> RuntimeMessage e -> Text
forall a b. (Show a, IsString b) => a -> b
showt RuntimeMessage e
msg
              RuntimeMessage e -> StateT (RuntimeState e) m ()
forall e (m :: * -> *).
(Binary (Output e), Binary (State e), Binary e, Default (State e),
 Eq (Output e), Eq e, Event Peer e, MonadCatch m, MonadLoggerIO m,
 MonadTimeSpec m, Show (Output e), Show (State e), Show e,
 ToJSON (Output e), ToJSON (State e), ToJSON e) =>
RuntimeMessage e -> StateT (RuntimeState e) m ()
handleRuntimeMessage RuntimeMessage e
msg
              RuntimeState {rsClusterState :: forall e. RuntimeState e -> EventFold ClusterName Peer e
rsClusterState = EventFold ClusterName Peer e
cluster2} <- StateT (RuntimeState e) m (RuntimeState e)
forall s (m :: * -> *). MonadState s m => m s
get
              Bool
-> StateT (RuntimeState e) m () -> StateT (RuntimeState e) m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (EventFold ClusterName Peer e
cluster1 EventFold ClusterName Peer e
-> EventFold ClusterName Peer e -> Bool
forall a. Eq a => a -> a -> Bool
/= EventFold ClusterName Peer e
cluster2) (StateT (RuntimeState e) m () -> StateT (RuntimeState e) m ())
-> StateT (RuntimeState e) m () -> StateT (RuntimeState e) m ()
forall a b. (a -> b) -> a -> b
$
                Text -> StateT (RuntimeState e) m ()
forall (m :: * -> *). (HasCallStack, MonadLogger m) => Text -> m ()
logDebug (Text -> StateT (RuntimeState e) m ())
-> Text -> StateT (RuntimeState e) m ()
forall a b. (a -> b) -> a -> b
$ Text
"New Cluster State: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> EventFold ClusterName Peer e -> Text
forall a b. (ToJSON a, IsString b) => a -> b
showj EventFold ClusterName Peer e
cluster2
                -- propagate
              StateT (RuntimeState e) m ()
forall (m :: * -> *) e.
(MonadIO m, MonadTimeSpec m) =>
StateT (RuntimeState e) m ()
handleBroadcallTimeouts
              StateT (RuntimeState e) m ()
forall (m :: * -> *) e.
MonadLoggerIO m =>
StateT (RuntimeState e) m ()
handleOutstandingJoins
              StateT (RuntimeState e) m Any
handleMessages
          in do
            IO () -> StateT (RuntimeState e) m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> StateT (RuntimeState e) m ())
-> IO () -> StateT (RuntimeState e) m ()
forall a b. (a -> b) -> a -> b
$ RuntimeState e -> EventFold ClusterName Peer e -> IO ()
forall e. RuntimeState e -> EventFold ClusterName Peer e -> IO ()
rsNotify RuntimeState e
rts (RuntimeState e -> EventFold ClusterName Peer e
forall e. RuntimeState e -> EventFold ClusterName Peer e
rsClusterState RuntimeState e
rts)
            StateT (RuntimeState e) m Any
handleMessages
        )
  where
    runPeerListener :: (MonadLoggerIO m, MonadFail m) => m ()
    runPeerListener :: forall (m :: * -> *). (MonadLoggerIO m, MonadFail m) => m ()
runPeerListener =
      let
        addy :: AddressDescription
        addy :: AddressDescription
addy =
          Text -> AddressDescription
AddressDescription
            (
              Peer -> Text
unPeer (RuntimeState e -> Peer
forall e. RuntimeState e -> Peer
rsSelf RuntimeState e
rts)
              Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
":" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> PortNumber -> Text
forall a b. (Show a, IsString b) => a -> b
showt PortNumber
peerMessagePort
            )
      in
        ConduitT () Void m () -> m ()
forall (m :: * -> *) r. Monad m => ConduitT () Void m r -> m r
runConduit (
          AddressDescription -> ConduitT () (Peer, PeerMessage e) m ()
forall i (m :: * -> *).
(Binary i, MonadIO m, MonadFail m) =>
AddressDescription -> ConduitT () i m ()
openIngress AddressDescription
addy
          ConduitT () (Peer, PeerMessage e) m ()
-> ConduitT (Peer, PeerMessage e) Void m ()
-> ConduitT () Void m ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.| ((Peer, PeerMessage e)
 -> ConduitT (Peer, PeerMessage e) (RuntimeMessage e) m ())
-> ConduitT (Peer, PeerMessage e) (RuntimeMessage e) m ()
forall (m :: * -> *) i o r.
Monad m =>
(i -> ConduitT i o m r) -> ConduitT i o m ()
awaitForever (\ (Peer
msgSource, PeerMessage e
msg) -> do
              IO () -> ConduitT (Peer, PeerMessage e) (RuntimeMessage e) m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ConduitT (Peer, PeerMessage e) (RuntimeMessage e) m ())
-> IO () -> ConduitT (Peer, PeerMessage e) (RuntimeMessage e) m ()
forall a b. (a -> b) -> a -> b
$ do
                TimeSpec
now <- IO TimeSpec
forall (m :: * -> *). MonadTimeSpec m => m TimeSpec
getTime
                IORef (Map Peer TimeSpec)
-> (Map Peer TimeSpec -> (Map Peer TimeSpec, ())) -> IO ()
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef'
                  IORef (Map Peer TimeSpec)
peerStats
                  (\Map Peer TimeSpec
peerTimes -> (Peer -> TimeSpec -> Map Peer TimeSpec -> Map Peer TimeSpec
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert Peer
msgSource TimeSpec
now Map Peer TimeSpec
peerTimes, ()))
              Text -> ConduitT (Peer, PeerMessage e) (RuntimeMessage e) m ()
forall (m :: * -> *). (HasCallStack, MonadLogger m) => Text -> m ()
logDebug (Text -> ConduitT (Peer, PeerMessage e) (RuntimeMessage e) m ())
-> Text -> ConduitT (Peer, PeerMessage e) (RuntimeMessage e) m ()
forall a b. (a -> b) -> a -> b
$ Text
"Handling: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> (Peer, PeerMessage e) -> Text
forall a b. (Show a, IsString b) => a -> b
showt (Peer
msgSource :: Peer, PeerMessage e
msg)
              case PeerMessage e
msg of
                PMFullMerge EventFold ClusterName Peer e
ps -> RuntimeMessage e
-> ConduitT (Peer, PeerMessage e) (RuntimeMessage e) m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield (EventFold ClusterName Peer e -> RuntimeMessage e
forall e. EventFold ClusterName Peer e -> RuntimeMessage e
FullMerge EventFold ClusterName Peer e
ps)
                PMOutputs Map (EventId Peer) (Output e)
outputs -> RuntimeMessage e
-> ConduitT (Peer, PeerMessage e) (RuntimeMessage e) m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield (Map (EventId Peer) (Output e) -> RuntimeMessage e
forall e. Map (EventId Peer) (Output e) -> RuntimeMessage e
Outputs Map (EventId Peer) (Output e)
outputs)
                PMMerge Diff ClusterName Peer e
ps -> RuntimeMessage e
-> ConduitT (Peer, PeerMessage e) (RuntimeMessage e) m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield (Diff ClusterName Peer e -> RuntimeMessage e
forall e. Diff ClusterName Peer e -> RuntimeMessage e
Merge Diff ClusterName Peer e
ps)
                PMCall Peer
source MessageId
mid ByteString
callMsg ->
                  (IO (Either SomeException ByteString)
-> ConduitT
     (Peer, PeerMessage e)
     (RuntimeMessage e)
     m
     (Either SomeException ByteString)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either SomeException ByteString)
 -> ConduitT
      (Peer, PeerMessage e)
      (RuntimeMessage e)
      m
      (Either SomeException ByteString))
-> (IO ByteString -> IO (Either SomeException ByteString))
-> IO ByteString
-> ConduitT
     (Peer, PeerMessage e)
     (RuntimeMessage e)
     m
     (Either SomeException ByteString)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO ByteString -> IO (Either SomeException ByteString)
forall (m :: * -> *) a.
MonadCatch m =>
m a -> m (Either SomeException a)
tryAny) (ByteString -> IO ByteString
handleUserCall ByteString
callMsg) ConduitT
  (Peer, PeerMessage e)
  (RuntimeMessage e)
  m
  (Either SomeException ByteString)
-> (Either SomeException ByteString
    -> ConduitT (Peer, PeerMessage e) (RuntimeMessage e) m ())
-> ConduitT (Peer, PeerMessage e) (RuntimeMessage e) m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
                    Left SomeException
err ->
                      Text -> ConduitT (Peer, PeerMessage e) (RuntimeMessage e) m ()
forall (m :: * -> *). (HasCallStack, MonadLogger m) => Text -> m ()
logError
                        (Text -> ConduitT (Peer, PeerMessage e) (RuntimeMessage e) m ())
-> Text -> ConduitT (Peer, PeerMessage e) (RuntimeMessage e) m ()
forall a b. (a -> b) -> a -> b
$ Text
"User call handling failed with: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> SomeException -> Text
forall a b. (Show a, IsString b) => a -> b
showt SomeException
err
                    Right ByteString
v -> RChan e
-> Peer
-> MessageId
-> ByteString
-> ConduitT (Peer, PeerMessage e) (RuntimeMessage e) m ()
forall (m :: * -> *) e.
MonadIO m =>
RChan e -> Peer -> MessageId -> ByteString -> m ()
sendCallResponse RChan e
runtimeChan Peer
source MessageId
mid ByteString
v
                PMCast ByteString
castMsg -> IO () -> ConduitT (Peer, PeerMessage e) (RuntimeMessage e) m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (ByteString -> IO ()
handleUserCast ByteString
castMsg)
                PMCallResponse Peer
source MessageId
mid ByteString
responseMsg ->
                  RuntimeMessage e
-> ConduitT (Peer, PeerMessage e) (RuntimeMessage e) m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield (Peer -> MessageId -> ByteString -> RuntimeMessage e
forall e. Peer -> MessageId -> ByteString -> RuntimeMessage e
HandleCallResponse Peer
source MessageId
mid ByteString
responseMsg)
             )
          ConduitT (Peer, PeerMessage e) (RuntimeMessage e) m ()
-> ConduitT (RuntimeMessage e) Void m ()
-> ConduitT (Peer, PeerMessage e) Void m ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.| Chan (RuntimeMessage e) -> ConduitT (RuntimeMessage e) Void m ()
forall (m :: * -> *) a void.
MonadIO m =>
Chan a -> ConduitT a void m ()
chanToSink (RChan e -> Chan (RuntimeMessage e)
forall e. RChan e -> Chan (RuntimeMessage e)
unRChan RChan e
runtimeChan)
        )

    runJoinListener :: (MonadLoggerIO m, MonadFail m) => m ()
    runJoinListener :: forall (m :: * -> *). (MonadLoggerIO m, MonadFail m) => m ()
runJoinListener =
      let
        addy :: AddressDescription
        addy :: AddressDescription
addy =
          Text -> AddressDescription
AddressDescription
            (
              Peer -> Text
unPeer (RuntimeState e -> Peer
forall e. RuntimeState e -> Peer
rsSelf RuntimeState e
rts)
              Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
":" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> PortNumber -> Text
forall a b. (Show a, IsString b) => a -> b
showt PortNumber
joinMessagePort
            )
      in
        ConduitT () Void m () -> m ()
forall (m :: * -> *) r. Monad m => ConduitT () Void m r -> m r
runConduit (
          () -> ConduitT () Void m ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
          ConduitT () Void m ()
-> ConduitT Void Void m () -> ConduitT () Void m ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.| AddressDescription
-> Maybe (IO ServerParams)
-> ConduitT Void (JoinRequest, JoinResponse e -> m Responded) m ()
forall request response (m :: * -> *).
(Binary request, Binary response, MonadFail m, MonadLoggerIO m,
 Show request, Show response) =>
AddressDescription
-> Maybe (IO ServerParams)
-> ConduitT Void (request, response -> m Responded) m ()
openServer AddressDescription
addy Maybe (IO ServerParams)
forall a. Maybe a
Nothing
          ConduitT Void (JoinRequest, JoinResponse e -> m Responded) m ()
-> ConduitT (JoinRequest, JoinResponse e -> m Responded) Void m ()
-> ConduitT Void Void m ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.| ((JoinRequest, JoinResponse e -> m Responded)
 -> ConduitT
      (JoinRequest, JoinResponse e -> m Responded) Void m Responded)
-> ConduitT (JoinRequest, JoinResponse e -> m Responded) Void m ()
forall (m :: * -> *) i o r.
Monad m =>
(i -> ConduitT i o m r) -> ConduitT i o m ()
awaitForever (\(JoinRequest
req, JoinResponse e -> m Responded
respond_) -> m Responded
-> ConduitT
     (JoinRequest, JoinResponse e -> m Responded) Void m Responded
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m Responded
 -> ConduitT
      (JoinRequest, JoinResponse e -> m Responded) Void m Responded)
-> m Responded
-> ConduitT
     (JoinRequest, JoinResponse e -> m Responded) Void m Responded
forall a b. (a -> b) -> a -> b
$
               RChan e
-> (Responder (JoinResponse e) -> Msg (RChan e))
-> m (JoinResponse e)
forall actor (m :: * -> *) a.
(Actor actor, MonadIO m) =>
actor -> (Responder a -> Msg actor) -> m a
Fork.call RChan e
runtimeChan (JoinRequest -> Responder (JoinResponse e) -> RuntimeMessage e
forall e.
JoinRequest -> Responder (JoinResponse e) -> RuntimeMessage e
Join JoinRequest
req) m (JoinResponse e)
-> (JoinResponse e -> m Responded) -> m Responded
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= JoinResponse e -> m Responded
respond_
             )
        )

    runPeriodicResent :: (MonadIO m) => m ()
    runPeriodicResent :: forall (m :: * -> *). MonadIO m => m ()
runPeriodicResent =
      let
        periodicResend :: (MonadIO m) => m ()
        periodicResend :: forall (m :: * -> *). MonadIO m => m ()
periodicResend = do
          IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Int -> IO ()
threadDelay Int
resendInterval
          RChan e -> (Responder () -> Msg (RChan e)) -> m ()
forall actor (m :: * -> *) a.
(Actor actor, MonadIO m) =>
actor -> (Responder a -> Msg actor) -> m a
Fork.call RChan e
runtimeChan Responder () -> Msg (RChan e)
forall e. Responder () -> RuntimeMessage e
Resend
          m ()
forall (m :: * -> *). MonadIO m => m ()
periodicResend
      in
        m ()
forall (m :: * -> *). MonadIO m => m ()
periodicResend


{- | Handle any outstanding joins. -}
handleOutstandingJoins :: (MonadLoggerIO m) => StateT (RuntimeState e) m ()
handleOutstandingJoins :: forall (m :: * -> *) e.
MonadLoggerIO m =>
StateT (RuntimeState e) m ()
handleOutstandingJoins = do
  state :: RuntimeState e
state@RuntimeState {Map (EventId Peer) (Responder (JoinResponse e))
rsJoins :: Map (EventId Peer) (Responder (JoinResponse e))
rsJoins :: forall e.
RuntimeState e -> Map (EventId Peer) (Responder (JoinResponse e))
rsJoins, EventFold ClusterName Peer e
rsClusterState :: EventFold ClusterName Peer e
rsClusterState :: forall e. RuntimeState e -> EventFold ClusterName Peer e
rsClusterState} <- StateT (RuntimeState e) m (RuntimeState e)
forall s (m :: * -> *). MonadState s m => m s
get
  let
    (Map (EventId Peer) (Responder (JoinResponse e))
consistent, Map (EventId Peer) (Responder (JoinResponse e))
pending) =
      (EventId Peer -> Responder (JoinResponse e) -> Bool)
-> Map (EventId Peer) (Responder (JoinResponse e))
-> (Map (EventId Peer) (Responder (JoinResponse e)),
    Map (EventId Peer) (Responder (JoinResponse e)))
forall k a. (k -> a -> Bool) -> Map k a -> (Map k a, Map k a)
Map.partitionWithKey
        (\EventId Peer
k Responder (JoinResponse e)
_ -> EventId Peer
k EventId Peer -> EventId Peer -> Bool
forall a. Ord a => a -> a -> Bool
<= EventFold ClusterName Peer e -> EventId Peer
forall o p e. EventFold o p e -> EventId p
infimumId EventFold ClusterName Peer e
rsClusterState)
        Map (EventId Peer) (Responder (JoinResponse e))
rsJoins
  RuntimeState e -> StateT (RuntimeState e) m ()
forall s (m :: * -> *). MonadState s m => s -> m ()
put RuntimeState e
state {rsJoins :: Map (EventId Peer) (Responder (JoinResponse e))
rsJoins = Map (EventId Peer) (Responder (JoinResponse e))
pending}
  [StateT (RuntimeState e) m ()] -> StateT (RuntimeState e) m ()
forall (t :: * -> *) (m :: * -> *) a.
(Foldable t, Monad m) =>
t (m a) -> m ()
sequence_ [
      do
        Text -> StateT (RuntimeState e) m ()
forall (m :: * -> *). (HasCallStack, MonadLogger m) => Text -> m ()
logInfo (Text -> StateT (RuntimeState e) m ())
-> Text -> StateT (RuntimeState e) m ()
forall a b. (a -> b) -> a -> b
$ Text
"Completing join (" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> EventId Peer -> Text
forall a b. (Show a, IsString b) => a -> b
showt EventId Peer
sid Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
")."
        Responder (JoinResponse e)
-> JoinResponse e -> StateT (RuntimeState e) m ()
forall (m :: * -> *) a. MonadIO m => Responder a -> a -> m ()
respond Responder (JoinResponse e)
responder (EventFold ClusterName Peer e -> JoinResponse e
forall e. EventFold ClusterName Peer e -> JoinResponse e
JoinOk EventFold ClusterName Peer e
rsClusterState)
      | (EventId Peer
sid, Responder (JoinResponse e)
responder) <- Map (EventId Peer) (Responder (JoinResponse e))
-> [(EventId Peer, Responder (JoinResponse e))]
forall k a. Map k a -> [(k, a)]
Map.toList Map (EventId Peer) (Responder (JoinResponse e))
consistent
    ]


{- | Handle any broadcall timeouts. -}
handleBroadcallTimeouts
  :: ( MonadIO m
     , MonadTimeSpec m
     )
  => StateT (RuntimeState e) m ()
handleBroadcallTimeouts :: forall (m :: * -> *) e.
(MonadIO m, MonadTimeSpec m) =>
StateT (RuntimeState e) m ()
handleBroadcallTimeouts = do
  Map
  MessageId
  (Map Peer (Maybe ByteString),
   Responder (Map Peer (Maybe ByteString)), TimeSpec)
broadcalls <- (RuntimeState e
 -> Map
      MessageId
      (Map Peer (Maybe ByteString),
       Responder (Map Peer (Maybe ByteString)), TimeSpec))
-> StateT
     (RuntimeState e)
     m
     (Map
        MessageId
        (Map Peer (Maybe ByteString),
         Responder (Map Peer (Maybe ByteString)), TimeSpec))
forall s (m :: * -> *) a. MonadState s m => (s -> a) -> m a
gets RuntimeState e
-> Map
     MessageId
     (Map Peer (Maybe ByteString),
      Responder (Map Peer (Maybe ByteString)), TimeSpec)
forall e.
RuntimeState e
-> Map
     MessageId
     (Map Peer (Maybe ByteString),
      Responder (Map Peer (Maybe ByteString)), TimeSpec)
rsBroadcalls
  TimeSpec
now <- StateT (RuntimeState e) m TimeSpec
forall (m :: * -> *). MonadTimeSpec m => m TimeSpec
getTime
  [StateT (RuntimeState e) m ()] -> StateT (RuntimeState e) m ()
forall (t :: * -> *) (m :: * -> *) a.
(Foldable t, Monad m) =>
t (m a) -> m ()
sequence_ [
      do
        Responder (Map Peer (Maybe ByteString))
-> Map Peer (Maybe ByteString) -> StateT (RuntimeState e) m ()
forall (m :: * -> *) a. MonadIO m => Responder a -> a -> m ()
respond Responder (Map Peer (Maybe ByteString))
responder Map Peer (Maybe ByteString)
responses
        (RuntimeState e -> RuntimeState e) -> StateT (RuntimeState e) m ()
forall s (m :: * -> *). MonadState s m => (s -> s) -> m ()
modify' (\RuntimeState e
rs -> RuntimeState e
rs {
            rsBroadcalls :: Map
  MessageId
  (Map Peer (Maybe ByteString),
   Responder (Map Peer (Maybe ByteString)), TimeSpec)
rsBroadcalls = MessageId
-> Map
     MessageId
     (Map Peer (Maybe ByteString),
      Responder (Map Peer (Maybe ByteString)), TimeSpec)
-> Map
     MessageId
     (Map Peer (Maybe ByteString),
      Responder (Map Peer (Maybe ByteString)), TimeSpec)
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete MessageId
messageId (RuntimeState e
-> Map
     MessageId
     (Map Peer (Maybe ByteString),
      Responder (Map Peer (Maybe ByteString)), TimeSpec)
forall e.
RuntimeState e
-> Map
     MessageId
     (Map Peer (Maybe ByteString),
      Responder (Map Peer (Maybe ByteString)), TimeSpec)
rsBroadcalls RuntimeState e
rs)
          })
      | (MessageId
messageId, (Map Peer (Maybe ByteString)
responses, Responder (Map Peer (Maybe ByteString))
responder, TimeSpec
expiry)) <- Map
  MessageId
  (Map Peer (Maybe ByteString),
   Responder (Map Peer (Maybe ByteString)), TimeSpec)
-> [(MessageId,
     (Map Peer (Maybe ByteString),
      Responder (Map Peer (Maybe ByteString)), TimeSpec))]
forall k a. Map k a -> [(k, a)]
Map.toList Map
  MessageId
  (Map Peer (Maybe ByteString),
   Responder (Map Peer (Maybe ByteString)), TimeSpec)
broadcalls
      , TimeSpec
now TimeSpec -> TimeSpec -> Bool
forall a. Ord a => a -> a -> Bool
>= TimeSpec
expiry
    ]


{- | Execute the incoming messages. -}
handleRuntimeMessage
  :: ( Binary (Output e)
     , Binary (State e)
     , Binary e
     , Default (State e)
     , Eq (Output e)
     , Eq e
     , Event Peer e
     , MonadCatch m
     , MonadLoggerIO m
     , MonadTimeSpec m
     , Show (Output e)
     , Show (State e)
     , Show e
     , ToJSON (Output e)
     , ToJSON (State e)
     , ToJSON e
     )
  => RuntimeMessage e
  -> StateT (RuntimeState e) m ()

handleRuntimeMessage :: forall e (m :: * -> *).
(Binary (Output e), Binary (State e), Binary e, Default (State e),
 Eq (Output e), Eq e, Event Peer e, MonadCatch m, MonadLoggerIO m,
 MonadTimeSpec m, Show (Output e), Show (State e), Show e,
 ToJSON (Output e), ToJSON (State e), ToJSON e) =>
RuntimeMessage e -> StateT (RuntimeState e) m ()
handleRuntimeMessage (Outputs Map (EventId Peer) (Output e)
outputs) =
  {-# SCC "Outputs" #-}
  Map (EventId Peer) (Output e) -> StateT (RuntimeState e) m ()
forall (m :: * -> *) e.
(MonadLoggerIO m, MonadState (RuntimeState e) m,
 Show (Output e)) =>
Map (EventId Peer) (Output e) -> m ()
respondToWaiting Map (EventId Peer) (Output e)
outputs

handleRuntimeMessage (GetStats Responder (EventFold ClusterName Peer e)
responder) =
  {-# SCC "GetStats" #-}
  Responder (EventFold ClusterName Peer e)
-> EventFold ClusterName Peer e -> StateT (RuntimeState e) m ()
forall (m :: * -> *) a. MonadIO m => Responder a -> a -> m ()
respond Responder (EventFold ClusterName Peer e)
responder (EventFold ClusterName Peer e -> StateT (RuntimeState e) m ())
-> StateT (RuntimeState e) m (EventFold ClusterName Peer e)
-> StateT (RuntimeState e) m ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< (RuntimeState e -> EventFold ClusterName Peer e)
-> StateT (RuntimeState e) m (EventFold ClusterName Peer e)
forall s (m :: * -> *) a. MonadState s m => (s -> a) -> m a
gets RuntimeState e -> EventFold ClusterName Peer e
forall e. RuntimeState e -> EventFold ClusterName Peer e
rsClusterState

handleRuntimeMessage (ApplyFast e
e Responder (Output e)
responder) =
  {-# SCC "ApplyFast" #-}
  EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
-> StateT (RuntimeState e) m ()
forall e (m :: * -> *) a.
(EventConstraints e, MonadCatch m, MonadLoggerIO m,
 MonadState (RuntimeState e) m) =>
EventFoldT ClusterName Peer e m a -> m a
updateCluster (EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
 -> StateT (RuntimeState e) m ())
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
-> StateT (RuntimeState e) m ()
forall a b. (a -> b) -> a -> b
$
    (Output e, EventId Peer) -> Output e
forall a b. (a, b) -> a
fst ((Output e, EventId Peer) -> Output e)
-> EventFoldT
     ClusterName
     Peer
     e
     (StateT (RuntimeState e) m)
     (Output e, EventId Peer)
-> EventFoldT
     ClusterName Peer e (StateT (RuntimeState e) m) (Output e)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> e
-> EventFoldT
     ClusterName
     Peer
     e
     (StateT (RuntimeState e) m)
     (Output e, EventId Peer)
forall o p e (m :: * -> *).
MonadUpdateEF o p e m =>
e -> m (Output e, EventId p)
event e
e EventFoldT
  ClusterName Peer e (StateT (RuntimeState e) m) (Output e)
-> (Output e
    -> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ())
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Responder (Output e)
-> Output e
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
forall (m :: * -> *) a. MonadIO m => Responder a -> a -> m ()
respond Responder (Output e)
responder

handleRuntimeMessage (ApplyConsistent e
e Responder (Output e)
responder) =
  {-# SCC "ApplyConsistent" #-}
  (
    do
      EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
-> StateT (RuntimeState e) m ()
forall e (m :: * -> *) a.
(EventConstraints e, MonadCatch m, MonadLoggerIO m,
 MonadState (RuntimeState e) m) =>
EventFoldT ClusterName Peer e m a -> m a
updateCluster (EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
 -> StateT (RuntimeState e) m ())
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
-> StateT (RuntimeState e) m ()
forall a b. (a -> b) -> a -> b
$ do
        (Output e
_v, EventId Peer
sid) <- e
-> EventFoldT
     ClusterName
     Peer
     e
     (StateT (RuntimeState e) m)
     (Output e, EventId Peer)
forall o p e (m :: * -> *).
MonadUpdateEF o p e m =>
e -> m (Output e, EventId p)
event e
e
        StateT (RuntimeState e) m ()
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (EventId Peer
-> Responder (Output e) -> StateT (RuntimeState e) m ()
forall (m :: * -> *) e.
Monad m =>
EventId Peer
-> Responder (Output e) -> StateT (RuntimeState e) m ()
waitOn EventId Peer
sid Responder (Output e)
responder)
      RuntimeState e
rs <- StateT (RuntimeState e) m (RuntimeState e)
forall s (m :: * -> *). MonadState s m => m s
get
      Text -> StateT (RuntimeState e) m ()
forall (m :: * -> *). (HasCallStack, MonadLogger m) => Text -> m ()
logDebug (Text -> StateT (RuntimeState e) m ())
-> Text -> StateT (RuntimeState e) m ()
forall a b. (a -> b) -> a -> b
$ Text
"Waiting: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Map (EventId Peer) (Responder (Output e)) -> Text
forall a b. (Show a, IsString b) => a -> b
showt (RuntimeState e -> Map (EventId Peer) (Responder (Output e))
forall e.
RuntimeState e -> Map (EventId Peer) (Responder (Output e))
rsWaiting RuntimeState e
rs)
  )

handleRuntimeMessage (Eject Peer
peer Responder ()
responder) =
  {-# SCC "Eject" #-}
  do
    Peer
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
-> StateT (RuntimeState e) m ()
forall e (m :: * -> *) a.
(EventConstraints e, MonadCatch m, MonadLoggerIO m,
 MonadState (RuntimeState e) m) =>
Peer -> EventFoldT ClusterName Peer e m a -> m a
updateClusterAs Peer
peer (EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
 -> StateT (RuntimeState e) m ())
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
-> StateT (RuntimeState e) m ()
forall a b. (a -> b) -> a -> b
$
      EventFoldT
  ClusterName Peer e (StateT (RuntimeState e) m) (EventId Peer)
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (EventFoldT
   ClusterName Peer e (StateT (RuntimeState e) m) (EventId Peer)
 -> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ())
-> EventFoldT
     ClusterName Peer e (StateT (RuntimeState e) m) (EventId Peer)
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
forall a b. (a -> b) -> a -> b
$ Peer
-> EventFoldT
     ClusterName Peer e (StateT (RuntimeState e) m) (EventId Peer)
forall o p e (m :: * -> *).
MonadUpdateEF o p e m =>
p -> m (EventId p)
disassociate Peer
peer
    StateT (RuntimeState e) m ()
forall e (m :: * -> *).
(EventConstraints e, MonadCatch m, MonadLoggerIO m,
 MonadState (RuntimeState e) m) =>
m ()
propagate
    {- ↓
      This is an awful hack. The problem is that 'propagate' uses
      'sendPeer', but 'sendPeer' itself is asynchronous (though it should be
      very fast). The correct solution is a bit tricky. We can either figure
      out some way to block all the way down through the internals of the
      connection management, or else maybe extend the "join port" server
      endpoint to accept eject notifications as well as join requests.
    -}
    IO () -> StateT (RuntimeState e) m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> StateT (RuntimeState e) m ())
-> IO () -> StateT (RuntimeState e) m ()
forall a b. (a -> b) -> a -> b
$ Int -> IO ()
threadDelay Int
500_000
    Responder () -> () -> StateT (RuntimeState e) m ()
forall (m :: * -> *) a. MonadIO m => Responder a -> a -> m ()
respond Responder ()
responder ()

handleRuntimeMessage (Merge Diff ClusterName Peer e
other) =
  {-# SCC "Merge" #-}
  EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
-> StateT (RuntimeState e) m ()
forall e (m :: * -> *) a.
(EventConstraints e, MonadCatch m, MonadLoggerIO m,
 MonadState (RuntimeState e) m) =>
EventFoldT ClusterName Peer e m a -> m a
updateCluster (EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
 -> StateT (RuntimeState e) m ())
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
-> StateT (RuntimeState e) m ()
forall a b. (a -> b) -> a -> b
$
    Diff ClusterName Peer e
-> EventFoldT
     ClusterName
     Peer
     e
     (StateT (RuntimeState e) m)
     (Either (MergeError ClusterName Peer e) ())
forall o p e (m :: * -> *).
MonadUpdateEF o p e m =>
Diff o p e -> m (Either (MergeError o p e) ())
diffMerge Diff ClusterName Peer e
other EventFoldT
  ClusterName
  Peer
  e
  (StateT (RuntimeState e) m)
  (Either (MergeError ClusterName Peer e) ())
-> (Either (MergeError ClusterName Peer e) ()
    -> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ())
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
      Left MergeError ClusterName Peer e
err -> Text
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
forall (m :: * -> *). (HasCallStack, MonadLogger m) => Text -> m ()
logError (Text
 -> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ())
-> Text
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
forall a b. (a -> b) -> a -> b
$ Text
"Bad cluster merge: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> MergeError ClusterName Peer e -> Text
forall a b. (Show a, IsString b) => a -> b
showt MergeError ClusterName Peer e
err
      Right () -> () -> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()

handleRuntimeMessage (FullMerge EventFold ClusterName Peer e
other) =
  {-# SCC "FullMerge" #-}
  EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
-> StateT (RuntimeState e) m ()
forall e (m :: * -> *) a.
(EventConstraints e, MonadCatch m, MonadLoggerIO m,
 MonadState (RuntimeState e) m) =>
EventFoldT ClusterName Peer e m a -> m a
updateCluster (EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
 -> StateT (RuntimeState e) m ())
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
-> StateT (RuntimeState e) m ()
forall a b. (a -> b) -> a -> b
$
    EventFold ClusterName Peer e
-> EventFoldT
     ClusterName
     Peer
     e
     (StateT (RuntimeState e) m)
     (Either (MergeError ClusterName Peer e) ())
forall o p e (m :: * -> *).
MonadUpdateEF o p e m =>
EventFold o p e -> m (Either (MergeError o p e) ())
fullMerge EventFold ClusterName Peer e
other EventFoldT
  ClusterName
  Peer
  e
  (StateT (RuntimeState e) m)
  (Either (MergeError ClusterName Peer e) ())
-> (Either (MergeError ClusterName Peer e) ()
    -> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ())
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
      Left MergeError ClusterName Peer e
err -> Text
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
forall (m :: * -> *). (HasCallStack, MonadLogger m) => Text -> m ()
logError (Text
 -> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ())
-> Text
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
forall a b. (a -> b) -> a -> b
$ Text
"Bad cluster merge: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> MergeError ClusterName Peer e -> Text
forall a b. (Show a, IsString b) => a -> b
showt MergeError ClusterName Peer e
err
      Right () -> () -> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()

handleRuntimeMessage (Join (JoinRequest Peer
peer) Responder (JoinResponse e)
responder) =
  {-# SCC "Join" #-}
  do
    Text -> StateT (RuntimeState e) m ()
forall (m :: * -> *). (HasCallStack, MonadLogger m) => Text -> m ()
logInfo (Text -> StateT (RuntimeState e) m ())
-> Text -> StateT (RuntimeState e) m ()
forall a b. (a -> b) -> a -> b
$ Text
"Handling join from peer: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Peer -> Text
forall a b. (Show a, IsString b) => a -> b
showt Peer
peer
    EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
-> StateT (RuntimeState e) m ()
forall e (m :: * -> *) a.
(EventConstraints e, MonadCatch m, MonadLoggerIO m,
 MonadState (RuntimeState e) m) =>
EventFoldT ClusterName Peer e m a -> m a
updateCluster (do
        EventFoldT
  ClusterName Peer e (StateT (RuntimeState e) m) (EventId Peer)
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (EventFoldT
   ClusterName Peer e (StateT (RuntimeState e) m) (EventId Peer)
 -> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ())
-> EventFoldT
     ClusterName Peer e (StateT (RuntimeState e) m) (EventId Peer)
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
forall a b. (a -> b) -> a -> b
$ Peer
-> EventFoldT
     ClusterName Peer e (StateT (RuntimeState e) m) (EventId Peer)
forall o p e (m :: * -> *).
MonadUpdateEF o p e m =>
p -> m (EventId p)
disassociate Peer
peer
        EventFoldT
  ClusterName Peer e (StateT (RuntimeState e) m) (EventId Peer)
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (EventFoldT
   ClusterName Peer e (StateT (RuntimeState e) m) (EventId Peer)
 -> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ())
-> EventFoldT
     ClusterName Peer e (StateT (RuntimeState e) m) (EventId Peer)
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
forall a b. (a -> b) -> a -> b
$ Peer
-> EventFoldT
     ClusterName Peer e (StateT (RuntimeState e) m) (EventId Peer)
forall o p e (m :: * -> *).
MonadUpdateEF o p e m =>
p -> m (EventId p)
participate Peer
peer
      )
    RuntimeState {EventFold ClusterName Peer e
rsClusterState :: EventFold ClusterName Peer e
rsClusterState :: forall e. RuntimeState e -> EventFold ClusterName Peer e
rsClusterState} <- StateT (RuntimeState e) m (RuntimeState e)
forall s (m :: * -> *). MonadState s m => m s
get
    Text -> StateT (RuntimeState e) m ()
forall (m :: * -> *). (HasCallStack, MonadLogger m) => Text -> m ()
logInfo (Text -> StateT (RuntimeState e) m ())
-> Text -> StateT (RuntimeState e) m ()
forall a b. (a -> b) -> a -> b
$ Text
"Join immediately with: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> EventFold ClusterName Peer e -> Text
forall a b. (Show a, IsString b) => a -> b
showt EventFold ClusterName Peer e
rsClusterState
    Responder (JoinResponse e)
-> JoinResponse e -> StateT (RuntimeState e) m ()
forall (m :: * -> *) a. MonadIO m => Responder a -> a -> m ()
respond Responder (JoinResponse e)
responder (EventFold ClusterName Peer e -> JoinResponse e
forall e. EventFold ClusterName Peer e -> JoinResponse e
JoinOk EventFold ClusterName Peer e
rsClusterState)

handleRuntimeMessage (ReadState Responder (EventFold ClusterName Peer e)
responder) =
  {-# SCC "ReadState" #-}
  Responder (EventFold ClusterName Peer e)
-> EventFold ClusterName Peer e -> StateT (RuntimeState e) m ()
forall (m :: * -> *) a. MonadIO m => Responder a -> a -> m ()
respond Responder (EventFold ClusterName Peer e)
responder (EventFold ClusterName Peer e -> StateT (RuntimeState e) m ())
-> (RuntimeState e -> EventFold ClusterName Peer e)
-> RuntimeState e
-> StateT (RuntimeState e) m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. RuntimeState e -> EventFold ClusterName Peer e
forall e. RuntimeState e -> EventFold ClusterName Peer e
rsClusterState (RuntimeState e -> StateT (RuntimeState e) m ())
-> StateT (RuntimeState e) m (RuntimeState e)
-> StateT (RuntimeState e) m ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< StateT (RuntimeState e) m (RuntimeState e)
forall s (m :: * -> *). MonadState s m => m s
get

handleRuntimeMessage (Call Peer
target ByteString
msg Responder ByteString
responder) =
    {-# SCC "Call" #-}
    do
      MessageId
mid <- StateT (RuntimeState e) m MessageId
forall (m :: * -> *) e.
Monad m =>
StateT (RuntimeState e) m MessageId
newMessageId
      Peer
source <- (RuntimeState e -> Peer) -> StateT (RuntimeState e) m Peer
forall s (m :: * -> *) a. MonadState s m => (s -> a) -> m a
gets RuntimeState e -> Peer
forall e. RuntimeState e -> Peer
rsSelf
      MessageId -> StateT (RuntimeState e) m ()
forall (m :: * -> *) e.
Monad m =>
MessageId -> StateT (RuntimeState e) m ()
setCallResponder MessageId
mid
      PeerMessage e -> Peer -> StateT (RuntimeState e) m ()
forall (m :: * -> *) e.
(EventConstraints e, MonadCatch m, MonadLoggerIO m,
 MonadState (RuntimeState e) m) =>
PeerMessage e -> Peer -> m ()
sendPeer (Peer -> MessageId -> ByteString -> PeerMessage e
forall e. Peer -> MessageId -> ByteString -> PeerMessage e
PMCall Peer
source MessageId
mid ByteString
msg) Peer
target
  where
    setCallResponder :: (Monad m)
      => MessageId
      -> StateT (RuntimeState e) m ()
    setCallResponder :: forall (m :: * -> *) e.
Monad m =>
MessageId -> StateT (RuntimeState e) m ()
setCallResponder MessageId
mid = do
      state :: RuntimeState e
state@RuntimeState {Map MessageId (Responder ByteString)
rsCalls :: Map MessageId (Responder ByteString)
rsCalls :: forall e. RuntimeState e -> Map MessageId (Responder ByteString)
rsCalls} <- StateT (RuntimeState e) m (RuntimeState e)
forall s (m :: * -> *). MonadState s m => m s
get
      RuntimeState e -> StateT (RuntimeState e) m ()
forall s (m :: * -> *). MonadState s m => s -> m ()
put RuntimeState e
state {
          rsCalls :: Map MessageId (Responder ByteString)
rsCalls = MessageId
-> Responder ByteString
-> Map MessageId (Responder ByteString)
-> Map MessageId (Responder ByteString)
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert MessageId
mid Responder ByteString
responder Map MessageId (Responder ByteString)
rsCalls
        }

handleRuntimeMessage (Cast Peer
target ByteString
msg) =
  {-# SCC "Cast" #-}
  PeerMessage e -> Peer -> StateT (RuntimeState e) m ()
forall (m :: * -> *) e.
(EventConstraints e, MonadCatch m, MonadLoggerIO m,
 MonadState (RuntimeState e) m) =>
PeerMessage e -> Peer -> m ()
sendPeer (ByteString -> PeerMessage e
forall e. ByteString -> PeerMessage e
PMCast ByteString
msg) Peer
target

handleRuntimeMessage (Broadcall DiffTime
timeout ByteString
msg Responder (Map Peer (Maybe ByteString))
responder) =
    {-# SCC "Broadcall" #-}
    do
      TimeSpec
expiry <- DiffTime -> TimeSpec -> TimeSpec
addTime DiffTime
timeout (TimeSpec -> TimeSpec)
-> StateT (RuntimeState e) m TimeSpec
-> StateT (RuntimeState e) m TimeSpec
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> StateT (RuntimeState e) m TimeSpec
forall (m :: * -> *). MonadTimeSpec m => m TimeSpec
getTime
      MessageId
mid <- StateT (RuntimeState e) m MessageId
forall (m :: * -> *) e.
Monad m =>
StateT (RuntimeState e) m MessageId
newMessageId
      Peer
source <- (RuntimeState e -> Peer) -> StateT (RuntimeState e) m Peer
forall s (m :: * -> *) a. MonadState s m => (s -> a) -> m a
gets RuntimeState e -> Peer
forall e. RuntimeState e -> Peer
rsSelf
      TimeSpec -> MessageId -> StateT (RuntimeState e) m ()
forall (m :: * -> *) e.
Monad m =>
TimeSpec -> MessageId -> StateT (RuntimeState e) m ()
setBroadcallResponder TimeSpec
expiry MessageId
mid
      (Peer -> StateT (RuntimeState e) m ())
-> Set Peer -> StateT (RuntimeState e) m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (PeerMessage e -> Peer -> StateT (RuntimeState e) m ()
forall (m :: * -> *) e.
(EventConstraints e, MonadCatch m, MonadLoggerIO m,
 MonadState (RuntimeState e) m) =>
PeerMessage e -> Peer -> m ()
sendPeer (Peer -> MessageId -> ByteString -> PeerMessage e
forall e. Peer -> MessageId -> ByteString -> PeerMessage e
PMCall Peer
source MessageId
mid ByteString
msg)) (Set Peer -> StateT (RuntimeState e) m ())
-> StateT (RuntimeState e) m (Set Peer)
-> StateT (RuntimeState e) m ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< StateT (RuntimeState e) m (Set Peer)
forall (m :: * -> *) e.
Monad m =>
StateT (RuntimeState e) m (Set Peer)
getPeers
  where
    setBroadcallResponder :: (Monad m)
      => TimeSpec
      -> MessageId
      -> StateT (RuntimeState e) m ()
    setBroadcallResponder :: forall (m :: * -> *) e.
Monad m =>
TimeSpec -> MessageId -> StateT (RuntimeState e) m ()
setBroadcallResponder TimeSpec
expiry MessageId
mid = do
      Set Peer
peers <- StateT (RuntimeState e) m (Set Peer)
forall (m :: * -> *) e.
Monad m =>
StateT (RuntimeState e) m (Set Peer)
getPeers
      state :: RuntimeState e
state@RuntimeState {Map
  MessageId
  (Map Peer (Maybe ByteString),
   Responder (Map Peer (Maybe ByteString)), TimeSpec)
rsBroadcalls :: Map
  MessageId
  (Map Peer (Maybe ByteString),
   Responder (Map Peer (Maybe ByteString)), TimeSpec)
rsBroadcalls :: forall e.
RuntimeState e
-> Map
     MessageId
     (Map Peer (Maybe ByteString),
      Responder (Map Peer (Maybe ByteString)), TimeSpec)
rsBroadcalls} <- StateT (RuntimeState e) m (RuntimeState e)
forall s (m :: * -> *). MonadState s m => m s
get
      RuntimeState e -> StateT (RuntimeState e) m ()
forall s (m :: * -> *). MonadState s m => s -> m ()
put RuntimeState e
state {
          rsBroadcalls :: Map
  MessageId
  (Map Peer (Maybe ByteString),
   Responder (Map Peer (Maybe ByteString)), TimeSpec)
rsBroadcalls =
            MessageId
-> (Map Peer (Maybe ByteString),
    Responder (Map Peer (Maybe ByteString)), TimeSpec)
-> Map
     MessageId
     (Map Peer (Maybe ByteString),
      Responder (Map Peer (Maybe ByteString)), TimeSpec)
-> Map
     MessageId
     (Map Peer (Maybe ByteString),
      Responder (Map Peer (Maybe ByteString)), TimeSpec)
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert
              MessageId
mid
              (
                [(Peer, Maybe ByteString)] -> Map Peer (Maybe ByteString)
forall k a. Ord k => [(k, a)] -> Map k a
Map.fromList [(Peer
peer, Maybe ByteString
forall a. Maybe a
Nothing) | Peer
peer <- Set Peer -> [Peer]
forall a. Set a -> [a]
Set.toList Set Peer
peers],
                Responder (Map Peer (Maybe ByteString))
responder,
                TimeSpec
expiry
              )
              Map
  MessageId
  (Map Peer (Maybe ByteString),
   Responder (Map Peer (Maybe ByteString)), TimeSpec)
rsBroadcalls
        }

handleRuntimeMessage (Broadcast ByteString
msg) =
  {-# SCC "Broadcast" #-}
  (Peer -> StateT (RuntimeState e) m ())
-> Set Peer -> StateT (RuntimeState e) m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (PeerMessage e -> Peer -> StateT (RuntimeState e) m ()
forall (m :: * -> *) e.
(EventConstraints e, MonadCatch m, MonadLoggerIO m,
 MonadState (RuntimeState e) m) =>
PeerMessage e -> Peer -> m ()
sendPeer (ByteString -> PeerMessage e
forall e. ByteString -> PeerMessage e
PMCast ByteString
msg)) (Set Peer -> StateT (RuntimeState e) m ())
-> StateT (RuntimeState e) m (Set Peer)
-> StateT (RuntimeState e) m ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< StateT (RuntimeState e) m (Set Peer)
forall (m :: * -> *) e.
Monad m =>
StateT (RuntimeState e) m (Set Peer)
getPeers

handleRuntimeMessage (SendCallResponse Peer
target MessageId
mid ByteString
msg) =
  {-# SCC "SendCallResponse" #-}
  do
    Peer
source <- (RuntimeState e -> Peer) -> StateT (RuntimeState e) m Peer
forall s (m :: * -> *) a. MonadState s m => (s -> a) -> m a
gets RuntimeState e -> Peer
forall e. RuntimeState e -> Peer
rsSelf
    PeerMessage e -> Peer -> StateT (RuntimeState e) m ()
forall (m :: * -> *) e.
(EventConstraints e, MonadCatch m, MonadLoggerIO m,
 MonadState (RuntimeState e) m) =>
PeerMessage e -> Peer -> m ()
sendPeer (Peer -> MessageId -> ByteString -> PeerMessage e
forall e. Peer -> MessageId -> ByteString -> PeerMessage e
PMCallResponse Peer
source MessageId
mid ByteString
msg) Peer
target

handleRuntimeMessage (HandleCallResponse Peer
source MessageId
mid ByteString
msg) =
  {-# SCC "HandleCallResponse" #-}
  do
    state :: RuntimeState e
state@RuntimeState {Map MessageId (Responder ByteString)
rsCalls :: Map MessageId (Responder ByteString)
rsCalls :: forall e. RuntimeState e -> Map MessageId (Responder ByteString)
rsCalls, Map
  MessageId
  (Map Peer (Maybe ByteString),
   Responder (Map Peer (Maybe ByteString)), TimeSpec)
rsBroadcalls :: Map
  MessageId
  (Map Peer (Maybe ByteString),
   Responder (Map Peer (Maybe ByteString)), TimeSpec)
rsBroadcalls :: forall e.
RuntimeState e
-> Map
     MessageId
     (Map Peer (Maybe ByteString),
      Responder (Map Peer (Maybe ByteString)), TimeSpec)
rsBroadcalls} <- StateT (RuntimeState e) m (RuntimeState e)
forall s (m :: * -> *). MonadState s m => m s
get
    case MessageId
-> Map MessageId (Responder ByteString)
-> Maybe (Responder ByteString)
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup MessageId
mid Map MessageId (Responder ByteString)
rsCalls of
      Maybe (Responder ByteString)
Nothing ->
        case MessageId
-> Map
     MessageId
     (Map Peer (Maybe ByteString),
      Responder (Map Peer (Maybe ByteString)), TimeSpec)
-> Maybe
     (Map Peer (Maybe ByteString),
      Responder (Map Peer (Maybe ByteString)), TimeSpec)
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup MessageId
mid Map
  MessageId
  (Map Peer (Maybe ByteString),
   Responder (Map Peer (Maybe ByteString)), TimeSpec)
rsBroadcalls of
          Maybe
  (Map Peer (Maybe ByteString),
   Responder (Map Peer (Maybe ByteString)), TimeSpec)
Nothing -> () -> StateT (RuntimeState e) m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
          Just (Map Peer (Maybe ByteString)
responses, Responder (Map Peer (Maybe ByteString))
responder, TimeSpec
expiry) ->
            let
              responses2 :: Map Peer (Maybe ByteString)
responses2 = Peer
-> Maybe ByteString
-> Map Peer (Maybe ByteString)
-> Map Peer (Maybe ByteString)
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert Peer
source (ByteString -> Maybe ByteString
forall a. a -> Maybe a
Just ByteString
msg) Map Peer (Maybe ByteString)
responses
              response :: Map Peer ByteString
response = [(Peer, ByteString)] -> Map Peer ByteString
forall k a. Ord k => [(k, a)] -> Map k a
Map.fromList [
                  (Peer
peer, ByteString
r)
                  | (Peer
peer, Just ByteString
r) <- Map Peer (Maybe ByteString) -> [(Peer, Maybe ByteString)]
forall k a. Map k a -> [(k, a)]
Map.toList Map Peer (Maybe ByteString)
responses2
                ]
              peers :: Set Peer
peers = Map Peer (Maybe ByteString) -> Set Peer
forall k a. Map k a -> Set k
Map.keysSet Map Peer (Maybe ByteString)
responses2
            in
              if Set Peer -> Bool
forall a. Set a -> Bool
Set.null (Set Peer
peers Set Peer -> Set Peer -> Set Peer
forall a. Ord a => Set a -> Set a -> Set a
\\ Map Peer ByteString -> Set Peer
forall k a. Map k a -> Set k
Map.keysSet Map Peer ByteString
response)
                then do
                  Responder (Map Peer (Maybe ByteString))
-> Map Peer (Maybe ByteString) -> StateT (RuntimeState e) m ()
forall (m :: * -> *) a. MonadIO m => Responder a -> a -> m ()
respond Responder (Map Peer (Maybe ByteString))
responder (ByteString -> Maybe ByteString
forall a. a -> Maybe a
Just (ByteString -> Maybe ByteString)
-> Map Peer ByteString -> Map Peer (Maybe ByteString)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Map Peer ByteString
response)
                  RuntimeState e -> StateT (RuntimeState e) m ()
forall s (m :: * -> *). MonadState s m => s -> m ()
put RuntimeState e
state {
                      rsBroadcalls :: Map
  MessageId
  (Map Peer (Maybe ByteString),
   Responder (Map Peer (Maybe ByteString)), TimeSpec)
rsBroadcalls = MessageId
-> Map
     MessageId
     (Map Peer (Maybe ByteString),
      Responder (Map Peer (Maybe ByteString)), TimeSpec)
-> Map
     MessageId
     (Map Peer (Maybe ByteString),
      Responder (Map Peer (Maybe ByteString)), TimeSpec)
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete MessageId
mid Map
  MessageId
  (Map Peer (Maybe ByteString),
   Responder (Map Peer (Maybe ByteString)), TimeSpec)
rsBroadcalls
                    }
                else
                  RuntimeState e -> StateT (RuntimeState e) m ()
forall s (m :: * -> *). MonadState s m => s -> m ()
put RuntimeState e
state {
                      rsBroadcalls :: Map
  MessageId
  (Map Peer (Maybe ByteString),
   Responder (Map Peer (Maybe ByteString)), TimeSpec)
rsBroadcalls =
                        MessageId
-> (Map Peer (Maybe ByteString),
    Responder (Map Peer (Maybe ByteString)), TimeSpec)
-> Map
     MessageId
     (Map Peer (Maybe ByteString),
      Responder (Map Peer (Maybe ByteString)), TimeSpec)
-> Map
     MessageId
     (Map Peer (Maybe ByteString),
      Responder (Map Peer (Maybe ByteString)), TimeSpec)
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert
                          MessageId
mid
                          (Map Peer (Maybe ByteString)
responses2, Responder (Map Peer (Maybe ByteString))
responder, TimeSpec
expiry)
                          Map
  MessageId
  (Map Peer (Maybe ByteString),
   Responder (Map Peer (Maybe ByteString)), TimeSpec)
rsBroadcalls
                    }
      Just Responder ByteString
responder -> do
        Responder ByteString -> ByteString -> StateT (RuntimeState e) m ()
forall (m :: * -> *) a. MonadIO m => Responder a -> a -> m ()
respond Responder ByteString
responder ByteString
msg
        RuntimeState e -> StateT (RuntimeState e) m ()
forall s (m :: * -> *). MonadState s m => s -> m ()
put RuntimeState e
state {rsCalls :: Map MessageId (Responder ByteString)
rsCalls = MessageId
-> Map MessageId (Responder ByteString)
-> Map MessageId (Responder ByteString)
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete MessageId
mid Map MessageId (Responder ByteString)
rsCalls}

handleRuntimeMessage (Resend Responder ()
responder) =
  {-# SCC "Resend" #-}
  StateT (RuntimeState e) m ()
forall e (m :: * -> *).
(EventConstraints e, MonadCatch m, MonadLoggerIO m,
 MonadState (RuntimeState e) m) =>
m ()
propagate StateT (RuntimeState e) m ()
-> (() -> StateT (RuntimeState e) m ())
-> StateT (RuntimeState e) m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Responder () -> () -> StateT (RuntimeState e) m ()
forall (m :: * -> *) a. MonadIO m => Responder a -> a -> m ()
respond Responder ()
responder


{- | Get the projected peers. -}
getPeers :: (Monad m) => StateT (RuntimeState e) m (Set Peer)
getPeers :: forall (m :: * -> *) e.
Monad m =>
StateT (RuntimeState e) m (Set Peer)
getPeers = (RuntimeState e -> Set Peer)
-> StateT (RuntimeState e) m (Set Peer)
forall s (m :: * -> *) a. MonadState s m => (s -> a) -> m a
gets (EventFold ClusterName Peer e -> Set Peer
forall p o e. Ord p => EventFold o p e -> Set p
projParticipants (EventFold ClusterName Peer e -> Set Peer)
-> (RuntimeState e -> EventFold ClusterName Peer e)
-> RuntimeState e
-> Set Peer
forall b c a. (b -> c) -> (a -> b) -> a -> c
. RuntimeState e -> EventFold ClusterName Peer e
forall e. RuntimeState e -> EventFold ClusterName Peer e
rsClusterState)


{- | Get a new messageId. -}
newMessageId :: (Monad m) => StateT (RuntimeState e) m MessageId
newMessageId :: forall (m :: * -> *) e.
Monad m =>
StateT (RuntimeState e) m MessageId
newMessageId = do
  state :: RuntimeState e
state@RuntimeState {MessageId
rsNextId :: MessageId
rsNextId :: forall e. RuntimeState e -> MessageId
rsNextId} <- StateT (RuntimeState e) m (RuntimeState e)
forall s (m :: * -> *). MonadState s m => m s
get
  RuntimeState e -> StateT (RuntimeState e) m ()
forall s (m :: * -> *). MonadState s m => s -> m ()
put RuntimeState e
state {rsNextId :: MessageId
rsNextId = MessageId -> MessageId
nextMessageId MessageId
rsNextId}
  MessageId -> StateT (RuntimeState e) m MessageId
forall (m :: * -> *) a. Monad m => a -> m a
return MessageId
rsNextId


{- |
  Like 'runEventFoldT', plus automatically take care of doing necessary
  IO implied by the cluster update.
-}
updateCluster
  :: ( EventConstraints e
     , MonadCatch m
     , MonadLoggerIO m
     , MonadState (RuntimeState e) m
     )
  => EventFoldT ClusterName Peer e m a
  -> m a
updateCluster :: forall e (m :: * -> *) a.
(EventConstraints e, MonadCatch m, MonadLoggerIO m,
 MonadState (RuntimeState e) m) =>
EventFoldT ClusterName Peer e m a -> m a
updateCluster EventFoldT ClusterName Peer e m a
action = do
  RuntimeState {Peer
rsSelf :: Peer
rsSelf :: forall e. RuntimeState e -> Peer
rsSelf} <- m (RuntimeState e)
forall s (m :: * -> *). MonadState s m => m s
get
  Peer -> EventFoldT ClusterName Peer e m a -> m a
forall e (m :: * -> *) a.
(EventConstraints e, MonadCatch m, MonadLoggerIO m,
 MonadState (RuntimeState e) m) =>
Peer -> EventFoldT ClusterName Peer e m a -> m a
updateClusterAs Peer
rsSelf EventFoldT ClusterName Peer e m a
action


{- |
  Like 'updateCluster', but perform the operation on behalf of a specified
  peer. This is required for e.g. the peer eject case, when the ejected peer
  may not be able to perform acknowledgements on its own behalf.
-}
updateClusterAs
  :: forall e m a.
     ( EventConstraints e
     , MonadCatch m
     , MonadLoggerIO m
     , MonadState (RuntimeState e) m
     )
  => Peer
  -> EventFoldT ClusterName Peer e m a
  -> m a
updateClusterAs :: forall e (m :: * -> *) a.
(EventConstraints e, MonadCatch m, MonadLoggerIO m,
 MonadState (RuntimeState e) m) =>
Peer -> EventFoldT ClusterName Peer e m a -> m a
updateClusterAs Peer
asPeer EventFoldT ClusterName Peer e m a
action = do
  (EventFold ClusterName Peer e
oldCluster, EventFold ClusterName Peer e -> IO ()
notify)
    <- (RuntimeState e
 -> (EventFold ClusterName Peer e,
     EventFold ClusterName Peer e -> IO ()))
-> m (EventFold ClusterName Peer e,
      EventFold ClusterName Peer e -> IO ())
forall s (m :: * -> *) a. MonadState s m => (s -> a) -> m a
gets (RuntimeState e -> EventFold ClusterName Peer e
forall e. RuntimeState e -> EventFold ClusterName Peer e
rsClusterState (RuntimeState e -> EventFold ClusterName Peer e)
-> (RuntimeState e -> EventFold ClusterName Peer e -> IO ())
-> RuntimeState e
-> (EventFold ClusterName Peer e,
    EventFold ClusterName Peer e -> IO ())
forall (a :: * -> * -> *) b c c'.
Arrow a =>
a b c -> a b c' -> a b (c, c')
&&& RuntimeState e -> EventFold ClusterName Peer e -> IO ()
forall e. RuntimeState e -> EventFold ClusterName Peer e -> IO ()
rsNotify)
  (a
v, UpdateResult ClusterName Peer e
ur) <- Peer
-> EventFold ClusterName Peer e
-> EventFoldT ClusterName Peer e m a
-> m (a, UpdateResult ClusterName Peer e)
forall p o e (m :: * -> *) a.
Ord p =>
p
-> EventFold o p e
-> EventFoldT o p e m a
-> m (a, UpdateResult o p e)
runEventFoldT Peer
asPeer EventFold ClusterName Peer e
oldCluster EventFoldT ClusterName Peer e m a
action
  do {- Update the cluster -}
    let
      newCluster :: EventFold ClusterName Peer e
      newCluster :: EventFold ClusterName Peer e
newCluster = UpdateResult ClusterName Peer e -> EventFold ClusterName Peer e
forall o p e. UpdateResult o p e -> EventFold o p e
urEventFold UpdateResult ClusterName Peer e
ur
    Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (EventFold ClusterName Peer e
oldCluster EventFold ClusterName Peer e
-> EventFold ClusterName Peer e -> Bool
forall a. Eq a => a -> a -> Bool
/= EventFold ClusterName Peer e
newCluster) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (EventFold ClusterName Peer e -> IO ()
notify EventFold ClusterName Peer e
newCluster)
    (RuntimeState e -> RuntimeState e) -> m ()
forall s (m :: * -> *). MonadState s m => (s -> s) -> m ()
modify'
      (
        let
          doModify :: RuntimeState e -> RuntimeState e
doModify RuntimeState e
state = 
            EventFold ClusterName Peer e
newCluster EventFold ClusterName Peer e -> RuntimeState e -> RuntimeState e
`seq`
            RuntimeState e
state
              { rsClusterState :: EventFold ClusterName Peer e
rsClusterState = EventFold ClusterName Peer e
newCluster
              }
        in
          RuntimeState e -> RuntimeState e
doModify
      )
  do {- Dispatch outputs. -}
    Peer
self <- (RuntimeState e -> Peer) -> m Peer
forall s (m :: * -> *) a. MonadState s m => (s -> a) -> m a
gets RuntimeState e -> Peer
forall e. RuntimeState e -> Peer
rsSelf
    let
      outputs :: Map (EventId Peer) (Output e)
      outputs :: Map (EventId Peer) (Output e)
outputs = UpdateResult ClusterName Peer e -> Map (EventId Peer) (Output e)
forall o p e. UpdateResult o p e -> Map (EventId p) (Output e)
urOutputs UpdateResult ClusterName Peer e
ur

      byRemotePeer :: Map Peer (Map (EventId Peer) (Output e))
      byRemotePeer :: Map Peer (Map (EventId Peer) (Output e))
byRemotePeer =
        (Map (EventId Peer) (Output e)
 -> Map (EventId Peer) (Output e) -> Map (EventId Peer) (Output e))
-> [Map Peer (Map (EventId Peer) (Output e))]
-> Map Peer (Map (EventId Peer) (Output e))
forall (f :: * -> *) k a.
(Foldable f, Ord k) =>
(a -> a -> a) -> f (Map k a) -> Map k a
Map.unionsWith
          Map (EventId Peer) (Output e)
-> Map (EventId Peer) (Output e) -> Map (EventId Peer) (Output e)
forall a. Semigroup a => a -> a -> a
(<>)
          [ Peer
-> Map (EventId Peer) (Output e)
-> Map Peer (Map (EventId Peer) (Output e))
forall k a. k -> a -> Map k a
Map.singleton Peer
peer (EventId Peer -> Output e -> Map (EventId Peer) (Output e)
forall k a. k -> a -> Map k a
Map.singleton EventId Peer
eid Output e
o)
          | (EventId Peer
eid, Output e
o) <- Map (EventId Peer) (Output e) -> [(EventId Peer, Output e)]
forall k a. Map k a -> [(k, a)]
Map.toList Map (EventId Peer) (Output e)
outputs
          , Just Peer
peer <- [EventId Peer -> Maybe Peer
forall p. EventId p -> Maybe p
EF.source EventId Peer
eid]
          ]
    Map (EventId Peer) (Output e) -> m ()
forall (m :: * -> *) e.
(MonadLoggerIO m, MonadState (RuntimeState e) m,
 Show (Output e)) =>
Map (EventId Peer) (Output e) -> m ()
respondToWaiting Map (EventId Peer) (Output e)
outputs
    [m ()] -> m ()
forall (t :: * -> *) (m :: * -> *) a.
(Foldable t, Monad m) =>
t (m a) -> m ()
sequence_
      [ do
          Text -> m ()
forall (m :: * -> *). (HasCallStack, MonadLogger m) => Text -> m ()
logDebug (Text -> m ()) -> Text -> m ()
forall a b. (a -> b) -> a -> b
$ Text
"Sending remote outputs: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Map Peer (Map (EventId Peer) (Output e)) -> Text
forall a b. (Show a, IsString b) => a -> b
showt Map Peer (Map (EventId Peer) (Output e))
byRemotePeer
          PeerMessage e -> Peer -> m ()
forall (m :: * -> *) e.
(EventConstraints e, MonadCatch m, MonadLoggerIO m,
 MonadState (RuntimeState e) m) =>
PeerMessage e -> Peer -> m ()
sendPeer (Map (EventId Peer) (Output e) -> PeerMessage e
forall e. Map (EventId Peer) (Output e) -> PeerMessage e
PMOutputs Map (EventId Peer) (Output e)
outputsForPeer) Peer
peer
      | (Peer
peer, Map (EventId Peer) (Output e)
outputsForPeer) <- Map Peer (Map (EventId Peer) (Output e))
-> [(Peer, Map (EventId Peer) (Output e))]
forall k a. Map k a -> [(k, a)]
Map.toList Map Peer (Map (EventId Peer) (Output e))
byRemotePeer
      , Peer
peer Peer -> Peer -> Bool
forall a. Eq a => a -> a -> Bool
/= Peer
self
      ]
  a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure a
v


{- | Wait on a consistent response for the given state id. -}
waitOn :: (Monad m)
  => EventId Peer
  -> Responder (Output e)
  -> StateT (RuntimeState e) m ()
waitOn :: forall (m :: * -> *) e.
Monad m =>
EventId Peer
-> Responder (Output e) -> StateT (RuntimeState e) m ()
waitOn EventId Peer
sid Responder (Output e)
responder =
  (RuntimeState e -> RuntimeState e) -> StateT (RuntimeState e) m ()
forall s (m :: * -> *). MonadState s m => (s -> s) -> m ()
modify' (\state :: RuntimeState e
state@RuntimeState {Map (EventId Peer) (Responder (Output e))
rsWaiting :: Map (EventId Peer) (Responder (Output e))
rsWaiting :: forall e.
RuntimeState e -> Map (EventId Peer) (Responder (Output e))
rsWaiting} -> RuntimeState e
state {
    rsWaiting :: Map (EventId Peer) (Responder (Output e))
rsWaiting = EventId Peer
-> Responder (Output e)
-> Map (EventId Peer) (Responder (Output e))
-> Map (EventId Peer) (Responder (Output e))
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert EventId Peer
sid Responder (Output e)
responder Map (EventId Peer) (Responder (Output e))
rsWaiting
  })


{- | Propagates cluster information if necessary. -}
propagate
  :: ( EventConstraints e
     , MonadCatch m
     , MonadLoggerIO m
     , MonadState (RuntimeState e) m
     )
  => m ()
propagate :: forall e (m :: * -> *).
(EventConstraints e, MonadCatch m, MonadLoggerIO m,
 MonadState (RuntimeState e) m) =>
m ()
propagate = do
    (Peer
self, EventFold ClusterName Peer e
cluster) <- (RuntimeState e -> (Peer, EventFold ClusterName Peer e))
-> m (Peer, EventFold ClusterName Peer e)
forall s (m :: * -> *) a. MonadState s m => (s -> a) -> m a
gets (RuntimeState e -> Peer
forall e. RuntimeState e -> Peer
rsSelf (RuntimeState e -> Peer)
-> (RuntimeState e -> EventFold ClusterName Peer e)
-> RuntimeState e
-> (Peer, EventFold ClusterName Peer e)
forall (a :: * -> * -> *) b c c'.
Arrow a =>
a b c -> a b c' -> a b (c, c')
&&& RuntimeState e -> EventFold ClusterName Peer e
forall e. RuntimeState e -> EventFold ClusterName Peer e
rsClusterState)
    let
      targets :: Set Peer
targets = Peer -> Set Peer -> Set Peer
forall a. Ord a => a -> Set a -> Set a
Set.delete Peer
self (Set Peer -> Set Peer) -> Set Peer -> Set Peer
forall a b. (a -> b) -> a -> b
$
        EventFold ClusterName Peer e -> Set Peer
forall p o e. Ord p => EventFold o p e -> Set p
EF.allParticipants EventFold ClusterName Peer e
cluster

    IO [Peer] -> m [Peer]
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO ([Peer] -> IO [Peer]
forall (m :: * -> *) a. MonadRandom m => [a] -> m [a]
shuffleM (Set Peer -> [Peer]
forall a. Set a -> [a]
Set.toList Set Peer
targets)) m [Peer] -> ([Peer] -> m ()) -> m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
      [] -> () -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
      Peer
target:[Peer]
_ ->
        -- case events target cluster of
        --   Nothing -> do
        --     logInfo
        --       $ "Sending full merge because the target's join event "
        --       <> "has not reaached the infimum"
        --     sendPeer (PMFullMerge cluster) target
        --   Just diff ->
        --     sendPeer (PMMerge diff) target
        PeerMessage e -> Peer -> m ()
forall (m :: * -> *) e.
(EventConstraints e, MonadCatch m, MonadLoggerIO m,
 MonadState (RuntimeState e) m) =>
PeerMessage e -> Peer -> m ()
sendPeer (EventFold ClusterName Peer e -> PeerMessage e
forall e. EventFold ClusterName Peer e -> PeerMessage e
PMFullMerge EventFold ClusterName Peer e
cluster) Peer
target
    m ()
forall e (m :: * -> *).
(MonadState (RuntimeState e) m, MonadLogger m) =>
m ()
disconnectObsolete
  where
    {- |
      Shut down connections to peers that are no longer participating
      in the cluster.
    -}
    disconnectObsolete
      :: ( MonadState (RuntimeState e) m
         , MonadLogger m
         )
      => m ()
    disconnectObsolete :: forall e (m :: * -> *).
(MonadState (RuntimeState e) m, MonadLogger m) =>
m ()
disconnectObsolete = do
      (EventFold ClusterName Peer e
cluster, Map Peer (Connection e)
conns) <- (RuntimeState e
 -> (EventFold ClusterName Peer e, Map Peer (Connection e)))
-> m (EventFold ClusterName Peer e, Map Peer (Connection e))
forall s (m :: * -> *) a. MonadState s m => (s -> a) -> m a
gets (RuntimeState e -> EventFold ClusterName Peer e
forall e. RuntimeState e -> EventFold ClusterName Peer e
rsClusterState (RuntimeState e -> EventFold ClusterName Peer e)
-> (RuntimeState e -> Map Peer (Connection e))
-> RuntimeState e
-> (EventFold ClusterName Peer e, Map Peer (Connection e))
forall (a :: * -> * -> *) b c c'.
Arrow a =>
a b c -> a b c' -> a b (c, c')
&&& RuntimeState e -> Map Peer (Connection e)
forall e. RuntimeState e -> Map Peer (Connection e)
rsConnections)
      let obsolete :: Set Peer
obsolete = Map Peer (Connection e) -> Set Peer
forall k a. Map k a -> Set k
Map.keysSet Map Peer (Connection e)
conns Set Peer -> Set Peer -> Set Peer
forall a. Ord a => Set a -> Set a -> Set a
\\ EventFold ClusterName Peer e -> Set Peer
forall p o e. Ord p => EventFold o p e -> Set p
EF.allParticipants EventFold ClusterName Peer e
cluster
      Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (Set Peer -> Bool
forall a. Set a -> Bool
Set.null Set Peer
obsolete) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
        Text -> m ()
forall (m :: * -> *). (HasCallStack, MonadLogger m) => Text -> m ()
logInfo (Text -> m ()) -> Text -> m ()
forall a b. (a -> b) -> a -> b
$ Text
"Disconnecting obsolete: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Set Peer -> Text
forall a b. (Show a, IsString b) => a -> b
showt Set Peer
obsolete
      (Peer -> m ()) -> Set Peer -> m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ Peer -> m ()
forall (m :: * -> *) e.
(MonadLogger m, MonadState (RuntimeState e) m) =>
Peer -> m ()
disconnect Set Peer
obsolete


{- |
  Respond to event applications that are waiting on a consistent result,
  if such a result is available.
-}
respondToWaiting
  :: forall m e.
     ( MonadLoggerIO m
     , MonadState (RuntimeState e) m
     , Show (Output e)
     )
  => Map (EventId Peer) (Output e)
  -> m ()
respondToWaiting :: forall (m :: * -> *) e.
(MonadLoggerIO m, MonadState (RuntimeState e) m,
 Show (Output e)) =>
Map (EventId Peer) (Output e) -> m ()
respondToWaiting Map (EventId Peer) (Output e)
available = do
    RuntimeState e
rs <- m (RuntimeState e)
forall s (m :: * -> *). MonadState s m => m s
get
    Text -> m ()
forall (m :: * -> *). (HasCallStack, MonadLogger m) => Text -> m ()
logDebug
      (Text -> m ()) -> Text -> m ()
forall a b. (a -> b) -> a -> b
$ Text
"Responding to: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> (Map (EventId Peer) (Output e), Set (EventId Peer)) -> Text
forall a b. (Show a, IsString b) => a -> b
showt (Map (EventId Peer) (Output e)
available, Map (EventId Peer) (Responder (Output e)) -> Set (EventId Peer)
forall k a. Map k a -> Set k
Map.keysSet (RuntimeState e -> Map (EventId Peer) (Responder (Output e))
forall e.
RuntimeState e -> Map (EventId Peer) (Responder (Output e))
rsWaiting RuntimeState e
rs))
    ((EventId Peer, Output e) -> m ())
-> [(EventId Peer, Output e)] -> m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (EventId Peer, Output e) -> m ()
respondToOne (Map (EventId Peer) (Output e) -> [(EventId Peer, Output e)]
forall k a. Map k a -> [(k, a)]
Map.toList Map (EventId Peer) (Output e)
available)
  where
    respondToOne
      :: (EventId Peer, Output e)
      -> m ()
    respondToOne :: (EventId Peer, Output e) -> m ()
respondToOne (EventId Peer
sid, Output e
output) = do
      state :: RuntimeState e
state@RuntimeState {Map (EventId Peer) (Responder (Output e))
rsWaiting :: Map (EventId Peer) (Responder (Output e))
rsWaiting :: forall e.
RuntimeState e -> Map (EventId Peer) (Responder (Output e))
rsWaiting} <- m (RuntimeState e)
forall s (m :: * -> *). MonadState s m => m s
get
      case EventId Peer
-> Map (EventId Peer) (Responder (Output e))
-> Maybe (Responder (Output e))
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup EventId Peer
sid Map (EventId Peer) (Responder (Output e))
rsWaiting of
        Maybe (Responder (Output e))
Nothing -> () -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
        Just Responder (Output e)
responder -> do
          Responder (Output e) -> Output e -> m ()
forall (m :: * -> *) a. MonadIO m => Responder a -> a -> m ()
respond Responder (Output e)
responder Output e
output
          RuntimeState e -> m ()
forall s (m :: * -> *). MonadState s m => s -> m ()
put RuntimeState e
state {rsWaiting :: Map (EventId Peer) (Responder (Output e))
rsWaiting = EventId Peer
-> Map (EventId Peer) (Responder (Output e))
-> Map (EventId Peer) (Responder (Output e))
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete EventId Peer
sid Map (EventId Peer) (Responder (Output e))
rsWaiting}


{- | This defines the various ways a node can be spun up. -}
data StartupMode e
  {- | Indicates that we should bootstrap a new cluster at startup. -}
  = NewCluster
      Peer {- ^ The peer being launched. -}
      ClusterName {- ^ The name of the cluster being launched. -}

  {- | Indicates that the node should try to join an existing cluster. -}
  | JoinCluster
      Peer {- ^ The peer being launched. -}
      ClusterName {- ^ The name of the cluster we are trying to join. -}
      Peer {- ^ The existing peer we are attempting to join with. -}

  {- | Resume operation given the previously saved state. -}
  | Recover
      Peer {- ^ The Peer being recovered. -}
      (EventFold ClusterName Peer e)
      {- ^ The last acknowledged state we had before we crashed. -}
deriving stock instance
    ( Show e
    , Show (Output e)
    , Show (State e)
    )
  =>
    Show (StartupMode e)


{- | Initialize the runtime state. -}
makeRuntimeState :: (EventConstraints e, MonadLoggerIO m)
  => (Peer -> EventFold ClusterName Peer e -> IO ())
     {- ^ Callback when the cluster-wide powerstate changes. -}
  -> StartupMode e
  -> m (RuntimeState e)

makeRuntimeState :: forall e (m :: * -> *).
(EventConstraints e, MonadLoggerIO m) =>
(Peer -> EventFold ClusterName Peer e -> IO ())
-> StartupMode e -> m (RuntimeState e)
makeRuntimeState
    Peer -> EventFold ClusterName Peer e -> IO ()
notify
    (NewCluster Peer
self ClusterName
clusterId)
  = do
    Text -> m ()
forall (m :: * -> *). (HasCallStack, MonadLogger m) => Text -> m ()
logInfo Text
"Starting a new cluster."
    {- Build a brand new node state, for the first node in a cluster. -}
    (Peer -> EventFold ClusterName Peer e -> IO ())
-> StartupMode e -> m (RuntimeState e)
forall e (m :: * -> *).
(EventConstraints e, MonadLoggerIO m) =>
(Peer -> EventFold ClusterName Peer e -> IO ())
-> StartupMode e -> m (RuntimeState e)
makeRuntimeState
      Peer -> EventFold ClusterName Peer e -> IO ()
notify
      (Peer -> EventFold ClusterName Peer e -> StartupMode e
forall e. Peer -> EventFold ClusterName Peer e -> StartupMode e
Recover Peer
self (ClusterName -> Peer -> EventFold ClusterName Peer e
forall o p e.
(Default (State e), Event p e, Ord p) =>
o -> p -> EventFold o p e
EF.new ClusterName
clusterId Peer
self))

makeRuntimeState
    Peer -> EventFold ClusterName Peer e -> IO ()
notify
    (JoinCluster Peer
self ClusterName
_clusterName Peer
targetPeer)
  = do
    {- Join a cluster an existing cluster. -}
    Text -> m ()
forall (m :: * -> *). (HasCallStack, MonadLogger m) => Text -> m ()
logInfo (Text -> m ()) -> Text -> m ()
forall a b. (a -> b) -> a -> b
$ Text
"Trying to join an existing cluster on " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> AddressDescription -> Text
forall a b. (Show a, IsString b) => a -> b
showt AddressDescription
addr
    JoinOk EventFold ClusterName Peer e
cluster <-
      JoinRequest -> m (JoinResponse e)
forall e (m :: * -> *).
(EventConstraints e, MonadLoggerIO m) =>
JoinRequest -> m (JoinResponse e)
requestJoin
      (JoinRequest -> m (JoinResponse e))
-> (Peer -> JoinRequest) -> Peer -> m (JoinResponse e)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Peer -> JoinRequest
JoinRequest
      (Peer -> m (JoinResponse e)) -> Peer -> m (JoinResponse e)
forall a b. (a -> b) -> a -> b
$ Peer
self
    Text -> m ()
forall (m :: * -> *). (HasCallStack, MonadLogger m) => Text -> m ()
logInfo (Text -> m ()) -> Text -> m ()
forall a b. (a -> b) -> a -> b
$ Text
"Join response with cluster: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> EventFold ClusterName Peer e -> Text
forall a b. (Show a, IsString b) => a -> b
showt EventFold ClusterName Peer e
cluster
    (Peer -> EventFold ClusterName Peer e -> IO ())
-> StartupMode e -> m (RuntimeState e)
forall e (m :: * -> *).
(EventConstraints e, MonadLoggerIO m) =>
(Peer -> EventFold ClusterName Peer e -> IO ())
-> StartupMode e -> m (RuntimeState e)
makeRuntimeState Peer -> EventFold ClusterName Peer e -> IO ()
notify (Peer -> EventFold ClusterName Peer e -> StartupMode e
forall e. Peer -> EventFold ClusterName Peer e -> StartupMode e
Recover Peer
self EventFold ClusterName Peer e
cluster)
  where
    requestJoin :: (EventConstraints e, MonadLoggerIO m)
      => JoinRequest
      -> m (JoinResponse e)
    requestJoin :: forall e (m :: * -> *).
(EventConstraints e, MonadLoggerIO m) =>
JoinRequest -> m (JoinResponse e)
requestJoin JoinRequest
joinMsg = ((JoinRequest -> m (JoinResponse e))
-> JoinRequest -> m (JoinResponse e)
forall a b. (a -> b) -> a -> b
$ JoinRequest
joinMsg) ((JoinRequest -> m (JoinResponse e)) -> m (JoinResponse e))
-> m (JoinRequest -> m (JoinResponse e)) -> m (JoinResponse e)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< AddressDescription
-> Maybe ClientParams -> m (JoinRequest -> m (JoinResponse e))
forall request response (m :: * -> *) (n :: * -> *).
(Binary request, Binary response, MonadIO m, MonadLoggerIO n,
 Show response) =>
AddressDescription
-> Maybe ClientParams -> n (request -> m response)
connectServer AddressDescription
addr Maybe ClientParams
forall a. Maybe a
Nothing

    addr :: AddressDescription
    addr :: AddressDescription
addr =
      Text -> AddressDescription
AddressDescription
        (
          Peer -> Text
unPeer Peer
targetPeer
          Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
":" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> PortNumber -> Text
forall a b. (Show a, IsString b) => a -> b
showt PortNumber
joinMessagePort
        )

makeRuntimeState
    Peer -> EventFold ClusterName Peer e -> IO ()
notify
    (Recover Peer
self EventFold ClusterName Peer e
clusterState)
  = do
    MessageId
rsNextId <- m MessageId
forall (m :: * -> *). MonadIO m => m MessageId
newSequence
    RuntimeState e -> m (RuntimeState e)
forall (m :: * -> *) a. Monad m => a -> m a
return
      RuntimeState :: forall e.
Peer
-> EventFold ClusterName Peer e
-> Map Peer (Connection e)
-> Map (EventId Peer) (Responder (Output e))
-> Map MessageId (Responder ByteString)
-> Map
     MessageId
     (Map Peer (Maybe ByteString),
      Responder (Map Peer (Maybe ByteString)), TimeSpec)
-> MessageId
-> (EventFold ClusterName Peer e -> IO ())
-> Map (EventId Peer) (Responder (JoinResponse e))
-> RuntimeState e
RuntimeState
        { rsSelf :: Peer
rsSelf = Peer
self
        , rsClusterState :: EventFold ClusterName Peer e
rsClusterState = EventFold ClusterName Peer e
clusterState
        , rsConnections :: Map Peer (Connection e)
rsConnections = Map Peer (Connection e)
forall a. Monoid a => a
mempty
        , rsWaiting :: Map (EventId Peer) (Responder (Output e))
rsWaiting = Map (EventId Peer) (Responder (Output e))
forall a. Monoid a => a
mempty
        , rsCalls :: Map MessageId (Responder ByteString)
rsCalls = Map MessageId (Responder ByteString)
forall a. Monoid a => a
mempty
        , rsBroadcalls :: Map
  MessageId
  (Map Peer (Maybe ByteString),
   Responder (Map Peer (Maybe ByteString)), TimeSpec)
rsBroadcalls = Map
  MessageId
  (Map Peer (Maybe ByteString),
   Responder (Map Peer (Maybe ByteString)), TimeSpec)
forall a. Monoid a => a
mempty
        , MessageId
rsNextId :: MessageId
rsNextId :: MessageId
rsNextId
        , rsNotify :: EventFold ClusterName Peer e -> IO ()
rsNotify = Peer -> EventFold ClusterName Peer e -> IO ()
notify Peer
self
        , rsJoins :: Map (EventId Peer) (Responder (JoinResponse e))
rsJoins = Map (EventId Peer) (Responder (JoinResponse e))
forall a. Monoid a => a
mempty
        }


{- | This is the type of a join request message. -}
newtype JoinRequest = JoinRequest Peer
  deriving stock ((forall x. JoinRequest -> Rep JoinRequest x)
-> (forall x. Rep JoinRequest x -> JoinRequest)
-> Generic JoinRequest
forall x. Rep JoinRequest x -> JoinRequest
forall x. JoinRequest -> Rep JoinRequest x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep JoinRequest x -> JoinRequest
$cfrom :: forall x. JoinRequest -> Rep JoinRequest x
Generic, Int -> JoinRequest -> ShowS
[JoinRequest] -> ShowS
JoinRequest -> String
(Int -> JoinRequest -> ShowS)
-> (JoinRequest -> String)
-> ([JoinRequest] -> ShowS)
-> Show JoinRequest
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [JoinRequest] -> ShowS
$cshowList :: [JoinRequest] -> ShowS
show :: JoinRequest -> String
$cshow :: JoinRequest -> String
showsPrec :: Int -> JoinRequest -> ShowS
$cshowsPrec :: Int -> JoinRequest -> ShowS
Show)
instance Binary JoinRequest


{- |
  Initialize a new sequence of messageIds. It would be perfectly fine to ensure
  unique message ids by generating a unique UUID for each one, but generating
  UUIDs is not free, and we are probably going to be generating a lot of these.
-}
newSequence :: (MonadIO m) => m MessageId
newSequence :: forall (m :: * -> *). MonadIO m => m MessageId
newSequence = do
    UUID
sid <- m UUID
forall (m :: * -> *). MonadIO m => m UUID
getUUID
    MessageId -> m MessageId
forall (f :: * -> *) a. Applicative f => a -> f a
pure (UUID -> Word64 -> MessageId
M UUID
sid Word64
0)
  where
    {- | A utility function that makes a UUID, no matter what.  -}
    getUUID :: (MonadIO m) => m UUID
    getUUID :: forall (m :: * -> *). MonadIO m => m UUID
getUUID = IO (Maybe UUID) -> m (Maybe UUID)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (Maybe UUID)
nextUUID m (Maybe UUID) -> (Maybe UUID -> m UUID) -> m UUID
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= m UUID -> (UUID -> m UUID) -> Maybe UUID -> m UUID
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (m ()
wait m () -> m UUID -> m UUID
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> m UUID
forall (m :: * -> *). MonadIO m => m UUID
getUUID) UUID -> m UUID
forall (m :: * -> *) a. Monad m => a -> m a
return
      where
        wait :: m ()
wait = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Int -> IO ()
threadDelay Int
oneMillisecond)
        oneMillisecond :: Int
oneMillisecond = Int
1000


{- |
  Generate the next message id in the sequence. We would normally use
  `succ` for this kind of thing, but making `MessageId` an instance of
  `Enum` really isn't appropriate.
-}
nextMessageId :: MessageId -> MessageId
nextMessageId :: MessageId -> MessageId
nextMessageId (M UUID
sequenceId Word64
ord) = UUID -> Word64 -> MessageId
M UUID
sequenceId (Word64 -> Word64
forall a. Enum a => a -> a
succ Word64
ord)


{- | Obtain the 'ClusterName'. -}
getClusterName :: Runtime e -> ClusterName
getClusterName :: forall e. Runtime e -> ClusterName
getClusterName = Runtime e -> ClusterName
forall e. Runtime e -> ClusterName
rClusterId


{- | The join message port  -}
joinMessagePort :: PortNumber
joinMessagePort :: PortNumber
joinMessagePort = PortNumber
5289


{- | Like 'Fork.respond', but returns '()'. -}
respond :: (MonadIO m) => Responder a -> a -> m ()
respond :: forall (m :: * -> *) a. MonadIO m => Responder a -> a -> m ()
respond Responder a
responder = m Responded -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m Responded -> m ()) -> (a -> m Responded) -> a -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Responder a -> a -> m Responded
forall (m :: * -> *) a.
MonadIO m =>
Responder a -> a -> m Responded
Fork.respond Responder a
responder


{- |
  Retrieve some basic stats that can be used to intuit the health of
  the cluster.
-}
getStats :: (MonadIO m) => Runtime e -> m Stats
getStats :: forall (m :: * -> *) e. MonadIO m => Runtime e -> m Stats
getStats Runtime e
runtime = do
  TimeSpec
now <- IO TimeSpec -> m TimeSpec
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO TimeSpec
forall (m :: * -> *). MonadTimeSpec m => m TimeSpec
getTime
  Map Peer TimeSpec
stats <- IO (Map Peer TimeSpec) -> m (Map Peer TimeSpec)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Map Peer TimeSpec) -> m (Map Peer TimeSpec))
-> IO (Map Peer TimeSpec) -> m (Map Peer TimeSpec)
forall a b. (a -> b) -> a -> b
$ IORef (Map Peer TimeSpec) -> IO (Map Peer TimeSpec)
forall a. IORef a -> IO a
readIORef (Runtime e -> IORef (Map Peer TimeSpec)
forall e. Runtime e -> IORef (Map Peer TimeSpec)
rStats Runtime e
runtime)
  Stats -> m Stats
forall (f :: * -> *) a. Applicative f => a -> f a
pure
    Stats :: Map Peer DiffTime -> Stats
Stats
      { timeWithoutProgress :: Map Peer DiffTime
timeWithoutProgress = TimeSpec -> TimeSpec -> DiffTime
diffTimeSpec TimeSpec
now (TimeSpec -> DiffTime) -> Map Peer TimeSpec -> Map Peer DiffTime
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Map Peer TimeSpec
stats
      }