{-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE NumericUnderscores #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE StandaloneDeriving #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE UndecidableInstances #-}

{- | The meat of the Legion runtime implementation. -}
module OM.Legion.Runtime (
  -- * Starting the framework runtime.
  forkLegionary,
  Runtime,
  StartupMode(..),

  -- * Constraints
  MonadConstraints,

  -- * Runtime Interface.
  applyFast,
  applyConsistent,
  readState,
  call,
  cast,
  broadcall,
  broadcast,
  eject,
  getSelf,
  getClusterName,
  getStats,
  Stats(..),
) where

import Control.Arrow (Arrow((&&&)))
import Control.Concurrent (threadDelay)
import Control.Exception.Safe (MonadCatch, tryAny)
import Control.Monad (unless, void, when)
import Control.Monad.IO.Class (MonadIO(liftIO))
import Control.Monad.IO.Unlift (MonadUnliftIO)
import Control.Monad.Logger.CallStack (LoggingT(runLoggingT),
  MonadLoggerIO(askLoggerIO), LogStr, MonadLogger, logDebug, logError,
  logInfo)
import Control.Monad.State (MonadState(get, put), StateT, evalStateT,
  gets, modify')
import Control.Monad.Trans.Class (MonadTrans(lift))
import Data.Aeson (ToJSON)
import Data.Binary (Binary)
import Data.ByteString.Lazy (ByteString)
import Data.CRDT.EventFold (Event(Output, State),
  UpdateResult(urEventFold, urOutputs), EventFold, EventId, diffSize,
  events, infimumId, projParticipants)
import Data.CRDT.EventFold.Monad (MonadUpdateEF(diffMerge, disassociate,
  event, fullMerge, participate), EventFoldT, runEventFoldT)
import Data.Default.Class (Default)
import Data.Function ((&))
import Data.IORef (IORef, atomicModifyIORef', newIORef, readIORef)
import Data.Map (Map)
import Data.Set ((\\), Set)
import Data.Time (DiffTime, diffTimeToPicoseconds, picosecondsToDiffTime)
import Data.UUID (UUID)
import Data.UUID.V1 (nextUUID)
import GHC.Generics (Generic)
import Network.Socket (PortNumber)
import OM.Fork (Actor(Msg, actorChan), Race, Responder, race)
import OM.Legion.Connection (JoinResponse(JoinOk),
  RuntimeState(RuntimeState, rsBroadcalls, rsCalls, rsClusterState,
  rsConnections, rsJoins, rsNextId, rsNotify, rsSelf, rsWaiting),
  EventConstraints, disconnect, peerMessagePort, sendPeer)
import OM.Legion.MsgChan (MessageId(M), Peer(unPeer), PeerMessage(PMCall,
  PMCallResponse, PMCast, PMFullMerge, PMMerge, PMOutputs), ClusterName)
import OM.Legion.RChan (JoinRequest(JoinRequest),
  RuntimeMessage(ApplyConsistent, ApplyFast, Broadcall, Broadcast,
  Call, Cast, Eject, FullMerge, GetStats, HandleCallResponse, Join,
  Merge, Outputs, ReadState, Resend, SendCallResponse), RChan, newRChan,
  readRChan)
import OM.Logging (withPrefix)
import OM.Show (showj, showt)
import OM.Socket (AddressDescription(AddressDescription), connectServer,
  openIngress, openServer)
import OM.Time (MonadTimeSpec(getTime), addTime, diffTimeSpec)
import Prelude (Applicative(pure), Either(Left, Right), Enum(succ),
  Eq((/=)), Functor(fmap), Maybe(Just, Nothing), Monad((>>), (>>=),
  return), Monoid(mempty), Ord((<=), (>=)), Semigroup((<>)), ($), (.),
  (<$>), (=<<), IO, Int, MonadFail, Show, String, fst, mapM_, maybe,
  otherwise, seq, sequence_)
import System.Clock (TimeSpec)
import System.Random.Shuffle (shuffleM)
import qualified Data.Binary as Binary
import qualified Data.CRDT.EventFold as EF
import qualified Data.Map as Map
import qualified Data.Set as Set
import qualified OM.Fork as Fork
import qualified Streaming.Prelude as Stream

{-# ANN module ("HLint: ignore Redundant <$>" :: String) #-}
{-# ANN module ("HLint: ignore Use underscore" :: String) #-}

{- |
  Shorthand for all the monad constraints, mainly use so that
  documentation renders better.
-}
type MonadConstraints m =
  ( MonadCatch m
  , MonadFail m
  , MonadLoggerIO m
  , MonadTimeSpec m
  , MonadUnliftIO m
  , Race
  )


{- | Fork the Legion runtime system. -}
forkLegionary
  :: ( EventConstraints e
     , MonadConstraints m
     )
  => (ByteString -> IO ByteString) {- ^ Handle a user call request. -}
  -> (ByteString -> IO ()) {- ^ Handle a user cast message. -}
  -> (Peer -> EventFold ClusterName Peer e -> IO ())
     {- ^ Callback when the cluster-wide eventfold changes. -}
  -> Int
     {- ^
       The propagation interval, in microseconds (for use with
       `threadDelay`).
     -}
  -> StartupMode e
     {- ^
       How to start the runtime, by creating new cluster or joining an
       existing cluster.
     -}
  -> m (Runtime e)
forkLegionary :: forall e (m :: * -> *).
(EventConstraints e, MonadConstraints m) =>
(ByteString -> IO ByteString)
-> (ByteString -> IO ())
-> (Peer -> EventFold ClusterName Peer e -> IO ())
-> Int
-> StartupMode e
-> m (Runtime e)
forkLegionary
    ByteString -> IO ByteString
handleUserCall
    ByteString -> IO ()
handleUserCast
    Peer -> EventFold ClusterName Peer e -> IO ()
notify
    Int
resendInterval
    StartupMode e
startupMode
  = do
    Text -> m ()
forall (m :: * -> *). (HasCallStack, MonadLogger m) => Text -> m ()
logInfo (Text -> m ()) -> Text -> m ()
forall a b. (a -> b) -> a -> b
$ Text
"Starting up with the following Mode: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> StartupMode e -> Text
forall a b. (Show a, IsString b) => a -> b
showt StartupMode e
startupMode
    rts <- (Peer -> EventFold ClusterName Peer e -> IO ())
-> StartupMode e -> m (RuntimeState e)
forall e (m :: * -> *).
(EventConstraints e, MonadLoggerIO m) =>
(Peer -> EventFold ClusterName Peer e -> IO ())
-> StartupMode e -> m (RuntimeState e)
makeRuntimeState Peer -> EventFold ClusterName Peer e -> IO ()
notify StartupMode e
startupMode
    runtimeChan <- newRChan
    logging <- withPrefix (logPrefix (rsSelf rts)) <$> askLoggerIO
    rStats <- liftIO $ newIORef mempty
    (`runLoggingT` logging) $
      executeRuntime
        handleUserCall
        handleUserCast
        resendInterval
        rts
        runtimeChan
        rStats
    let
      clusterId :: ClusterName
      clusterId = EventFold ClusterName Peer e -> ClusterName
forall o p e. EventFold o p e -> o
EF.origin (RuntimeState e -> EventFold ClusterName Peer e
forall e. RuntimeState e -> EventFold ClusterName Peer e
rsClusterState RuntimeState e
rts)
    return
      Runtime
        { rChan = runtimeChan
        , rSelf = rsSelf rts
        , rClusterId = clusterId
        , rStats
        }
  where
    logPrefix :: Peer -> LogStr
    logPrefix :: Peer -> LogStr
logPrefix Peer
self_ = LogStr
"[" LogStr -> LogStr -> LogStr
forall a. Semigroup a => a -> a -> a
<> Peer -> LogStr
forall a b. (Show a, IsString b) => a -> b
showt Peer
self_ LogStr -> LogStr -> LogStr
forall a. Semigroup a => a -> a -> a
<> LogStr
"] "


{- | A handle on the Legion runtime. -}
data Runtime e = Runtime
  {      forall e. Runtime e -> RChan e
rChan :: RChan e
  ,      forall e. Runtime e -> Peer
rSelf :: Peer
  , forall e. Runtime e -> ClusterName
rClusterId :: ClusterName
  ,     forall e. Runtime e -> IORef (Map Peer TimeSpec)
rStats :: IORef (Map Peer TimeSpec)
  }
instance Actor (Runtime e) where
  type Msg (Runtime e) = RuntimeMessage e
  actorChan :: Runtime e -> Msg (Runtime e) -> IO ()
actorChan = RChan e -> Msg (RChan e) -> IO ()
RChan e -> RuntimeMessage e -> IO ()
forall a. Actor a => a -> Msg a -> IO ()
actorChan (RChan e -> RuntimeMessage e -> IO ())
-> (Runtime e -> RChan e) -> Runtime e -> RuntimeMessage e -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Runtime e -> RChan e
forall e. Runtime e -> RChan e
rChan


{- |
  Some basic stats that can be used to intuit the health of the cluster.
  We currently only report on how long it has been since some peer has
  made some progress.
-}
newtype Stats = Stats
  { Stats -> Map Peer DiffTime
timeWithoutProgress :: Map Peer DiffTime
                           {- ^
                             How long it has been since a 'Peer' has
                             made progress (if it is 'divergent'). If
                             the peer is completely up to date as far as
                             we know, then it does not appear in the map
                             at all. Only peers which we are expecting
                             to make progress appear.
                           -}
  }
  deriving stock ((forall x. Stats -> Rep Stats x)
-> (forall x. Rep Stats x -> Stats) -> Generic Stats
forall x. Rep Stats x -> Stats
forall x. Stats -> Rep Stats x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. Stats -> Rep Stats x
from :: forall x. Stats -> Rep Stats x
$cto :: forall x. Rep Stats x -> Stats
to :: forall x. Rep Stats x -> Stats
Generic, Int -> Stats -> ShowS
[Stats] -> ShowS
Stats -> String
(Int -> Stats -> ShowS)
-> (Stats -> String) -> ([Stats] -> ShowS) -> Show Stats
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> Stats -> ShowS
showsPrec :: Int -> Stats -> ShowS
$cshow :: Stats -> String
show :: Stats -> String
$cshowList :: [Stats] -> ShowS
showList :: [Stats] -> ShowS
Show, Stats -> Stats -> Bool
(Stats -> Stats -> Bool) -> (Stats -> Stats -> Bool) -> Eq Stats
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: Stats -> Stats -> Bool
== :: Stats -> Stats -> Bool
$c/= :: Stats -> Stats -> Bool
/= :: Stats -> Stats -> Bool
Eq)
  deriving anyclass ([Stats] -> Value
[Stats] -> Encoding
Stats -> Bool
Stats -> Value
Stats -> Encoding
(Stats -> Value)
-> (Stats -> Encoding)
-> ([Stats] -> Value)
-> ([Stats] -> Encoding)
-> (Stats -> Bool)
-> ToJSON Stats
forall a.
(a -> Value)
-> (a -> Encoding)
-> ([a] -> Value)
-> ([a] -> Encoding)
-> (a -> Bool)
-> ToJSON a
$ctoJSON :: Stats -> Value
toJSON :: Stats -> Value
$ctoEncoding :: Stats -> Encoding
toEncoding :: Stats -> Encoding
$ctoJSONList :: [Stats] -> Value
toJSONList :: [Stats] -> Value
$ctoEncodingList :: [Stats] -> Encoding
toEncodingList :: [Stats] -> Encoding
$comitField :: Stats -> Bool
omitField :: Stats -> Bool
ToJSON)
instance Binary Stats where
  get :: Get Stats
get =
    Map Peer DiffTime -> Stats
Stats (Map Peer DiffTime -> Stats)
-> Get (Map Peer DiffTime) -> Get Stats
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ((Integer -> DiffTime) -> Map Peer Integer -> Map Peer DiffTime
forall a b. (a -> b) -> Map Peer a -> Map Peer b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Integer -> DiffTime
picosecondsToDiffTime (Map Peer Integer -> Map Peer DiffTime)
-> Get (Map Peer Integer) -> Get (Map Peer DiffTime)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Get (Map Peer Integer)
forall t. Binary t => Get t
Binary.get)
  put :: Stats -> Put
put (Stats Map Peer DiffTime
timeWithoutProgress) =
    Map Peer Integer -> Put
forall t. Binary t => t -> Put
Binary.put (DiffTime -> Integer
diffTimeToPicoseconds (DiffTime -> Integer) -> Map Peer DiffTime -> Map Peer Integer
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Map Peer DiffTime
timeWithoutProgress)


{- |
  Update the distributed cluster state by applying an event. The event
  output will be returned immediately and may not reflect a totally
  consistent view of the cluster. The state update itself, however,
  is guaranteed to be applied atomically and consistently throughout
  the cluster.
-}
applyFast :: (MonadIO m)
  => Runtime e     {- ^ The runtime handle. -}
  -> e             {- ^ The event to be applied. -}
  -> m (Output e)  {- ^ Returns the possibly inconsistent event output. -}
applyFast :: forall (m :: * -> *) e. MonadIO m => Runtime e -> e -> m (Output e)
applyFast Runtime e
runtime e
e = Runtime e
-> (Responder (Output e) -> Msg (Runtime e)) -> m (Output e)
forall actor (m :: * -> *) a.
(Actor actor, MonadIO m) =>
actor -> (Responder a -> Msg actor) -> m a
Fork.call Runtime e
runtime (e -> Responder (Output e) -> RuntimeMessage e
forall e. e -> Responder (Output e) -> RuntimeMessage e
ApplyFast e
e)


{- |
  Update the distributed cluster state by applying an event. Both the
  event output and resulting state will be totally consistent throughout
  the cluster.
-}
applyConsistent :: (MonadIO m)
  => Runtime e     {- ^ The runtime handle. -}
  -> e             {- ^ The event to be applied. -}
  -> m (Output e)  {- ^ Returns the strongly consistent event output. -}
applyConsistent :: forall (m :: * -> *) e. MonadIO m => Runtime e -> e -> m (Output e)
applyConsistent Runtime e
runtime e
e = Runtime e
-> (Responder (Output e) -> Msg (Runtime e)) -> m (Output e)
forall actor (m :: * -> *) a.
(Actor actor, MonadIO m) =>
actor -> (Responder a -> Msg actor) -> m a
Fork.call Runtime e
runtime (e -> Responder (Output e) -> RuntimeMessage e
forall e. e -> Responder (Output e) -> RuntimeMessage e
ApplyConsistent e
e)


{- | Read the current powerstate value. -}
readState :: (MonadIO m)
  => Runtime e
  -> m (EventFold ClusterName Peer e)
readState :: forall (m :: * -> *) e.
MonadIO m =>
Runtime e -> m (EventFold ClusterName Peer e)
readState Runtime e
runtime = Runtime e
-> (Responder (EventFold ClusterName Peer e) -> Msg (Runtime e))
-> m (EventFold ClusterName Peer e)
forall actor (m :: * -> *) a.
(Actor actor, MonadIO m) =>
actor -> (Responder a -> Msg actor) -> m a
Fork.call Runtime e
runtime Responder (EventFold ClusterName Peer e) -> Msg (Runtime e)
Responder (EventFold ClusterName Peer e) -> RuntimeMessage e
forall e.
Responder (EventFold ClusterName Peer e) -> RuntimeMessage e
ReadState


{- |
  Send a user message to some other peer, and block until a response
  is received.
-}
call :: (MonadIO m) => Runtime e -> Peer -> ByteString -> m ByteString
call :: forall (m :: * -> *) e.
MonadIO m =>
Runtime e -> Peer -> ByteString -> m ByteString
call Runtime e
runtime Peer
target ByteString
msg = Runtime e
-> (Responder ByteString -> Msg (Runtime e)) -> m ByteString
forall actor (m :: * -> *) a.
(Actor actor, MonadIO m) =>
actor -> (Responder a -> Msg actor) -> m a
Fork.call Runtime e
runtime (Peer -> ByteString -> Responder ByteString -> RuntimeMessage e
forall e.
Peer -> ByteString -> Responder ByteString -> RuntimeMessage e
Call Peer
target ByteString
msg)


{- | Send the result of a call back to the peer that originated it. -}
sendCallResponse
  :: (MonadIO m)
  => RChan e
  -> Peer
  -> MessageId
  -> ByteString
  -> m ()
sendCallResponse :: forall (m :: * -> *) e.
MonadIO m =>
RChan e -> Peer -> MessageId -> ByteString -> m ()
sendCallResponse RChan e
runtimeChan Peer
target MessageId
mid ByteString
msg =
  RChan e -> Msg (RChan e) -> m ()
forall actor (m :: * -> *).
(Actor actor, MonadIO m) =>
actor -> Msg actor -> m ()
Fork.cast RChan e
runtimeChan (Peer -> MessageId -> ByteString -> RuntimeMessage e
forall e. Peer -> MessageId -> ByteString -> RuntimeMessage e
SendCallResponse Peer
target MessageId
mid ByteString
msg)


{- | Send a user message to some other peer, without waiting on a response. -}
cast :: (MonadIO m) => Runtime e -> Peer -> ByteString -> m ()
cast :: forall (m :: * -> *) e.
MonadIO m =>
Runtime e -> Peer -> ByteString -> m ()
cast Runtime e
runtime Peer
target ByteString
message = Runtime e -> Msg (Runtime e) -> m ()
forall actor (m :: * -> *).
(Actor actor, MonadIO m) =>
actor -> Msg actor -> m ()
Fork.cast Runtime e
runtime (Peer -> ByteString -> RuntimeMessage e
forall e. Peer -> ByteString -> RuntimeMessage e
Cast Peer
target ByteString
message)


{- |
  Send a user message to all peers, and block until a response is received
  from all of them.
-}
broadcall :: (MonadIO m)
  => Runtime e
  -> DiffTime {- ^ The timeout. -}
  -> ByteString
  -> m (Map Peer (Maybe ByteString))
broadcall :: forall (m :: * -> *) e.
MonadIO m =>
Runtime e
-> DiffTime -> ByteString -> m (Map Peer (Maybe ByteString))
broadcall Runtime e
runtime DiffTime
timeout ByteString
msg = Runtime e
-> (Responder (Map Peer (Maybe ByteString)) -> Msg (Runtime e))
-> m (Map Peer (Maybe ByteString))
forall actor (m :: * -> *) a.
(Actor actor, MonadIO m) =>
actor -> (Responder a -> Msg actor) -> m a
Fork.call Runtime e
runtime (DiffTime
-> ByteString
-> Responder (Map Peer (Maybe ByteString))
-> RuntimeMessage e
forall e.
DiffTime
-> ByteString
-> Responder (Map Peer (Maybe ByteString))
-> RuntimeMessage e
Broadcall DiffTime
timeout ByteString
msg)


{- | Send a user message to all peers, without wating on a response. -}
broadcast :: (MonadIO m) => Runtime e -> ByteString -> m ()
broadcast :: forall (m :: * -> *) e.
MonadIO m =>
Runtime e -> ByteString -> m ()
broadcast Runtime e
runtime ByteString
msg = Runtime e -> Msg (Runtime e) -> m ()
forall actor (m :: * -> *).
(Actor actor, MonadIO m) =>
actor -> Msg actor -> m ()
Fork.cast Runtime e
runtime (ByteString -> RuntimeMessage e
forall e. ByteString -> RuntimeMessage e
Broadcast ByteString
msg)


{- | Eject a peer from the cluster. -}
eject :: (MonadIO m) => Runtime e -> Peer -> m ()
eject :: forall (m :: * -> *) e. MonadIO m => Runtime e -> Peer -> m ()
eject Runtime e
runtime Peer
peer = Runtime e -> (Responder () -> Msg (Runtime e)) -> m ()
forall actor (m :: * -> *) a.
(Actor actor, MonadIO m) =>
actor -> (Responder a -> Msg actor) -> m a
Fork.call Runtime e
runtime (Peer -> Responder () -> RuntimeMessage e
forall e. Peer -> Responder () -> RuntimeMessage e
Eject Peer
peer)


{- | Get the identifier for the local peer. -}
getSelf :: Runtime e -> Peer
getSelf :: forall e. Runtime e -> Peer
getSelf = Runtime e -> Peer
forall e. Runtime e -> Peer
rSelf


{- |
  Execute the Legion runtime, with the given user definitions, and
  framework settings. This function never returns (except maybe with an
  exception if something goes horribly wrong).
-}
executeRuntime
  :: ( Binary (Output e)
     , Binary (State e)
     , Binary e
     , Default (State e)
     , Eq (Output e)
     , Eq e
     , Event Peer e
     , MonadCatch m
     , MonadFail m
     , MonadLoggerIO m
     , MonadTimeSpec m
     , MonadUnliftIO m
     , Race
     , Show (Output e)
     , Show (State e)
     , Show e
     , ToJSON (Output e)
     , ToJSON (State e)
     , ToJSON e
     )
  => (ByteString -> IO ByteString)
     {- ^ Handle a user call request.  -}
  -> (ByteString -> IO ())
     {- ^ Handle a user cast message. -}
  -> Int
     {- ^
       The propagation interval, in microseconds (for use with
       `threadDelay`).
     -}
  -> RuntimeState e
  -> RChan e
     {- ^
       A source of requests, together with a way to respond to the
       requets.
     -}
  -> IORef (Map Peer TimeSpec)
  -> m ()
executeRuntime :: forall e (m :: * -> *).
(Binary (Output e), Binary (State e), Binary e, Default (State e),
 Eq (Output e), Eq e, Event Peer e, MonadCatch m, MonadFail m,
 MonadLoggerIO m, MonadTimeSpec m, MonadUnliftIO m, Race,
 Show (Output e), Show (State e), Show e, ToJSON (Output e),
 ToJSON (State e), ToJSON e) =>
(ByteString -> IO ByteString)
-> (ByteString -> IO ())
-> Int
-> RuntimeState e
-> RChan e
-> IORef (Map Peer TimeSpec)
-> m ()
executeRuntime
    ByteString -> IO ByteString
handleUserCall
    ByteString -> IO ()
handleUserCast
    Int
resendInterval
    RuntimeState e
rts
    RChan e
runtimeChan
    IORef (Map Peer TimeSpec)
peerStats
  = do
    {- Start the various messages sources. -}
    ProcessName -> m () -> m ()
forall (m :: * -> *) a.
(MonadCatch m, MonadLogger m, MonadUnliftIO m, Race) =>
ProcessName -> m a -> m ()
race ProcessName
"om-legion peer listener" m ()
forall (m :: * -> *). (MonadLoggerIO m, MonadFail m) => m ()
runPeerListener
    ProcessName -> m () -> m ()
forall (m :: * -> *) a.
(MonadCatch m, MonadLogger m, MonadUnliftIO m, Race) =>
ProcessName -> m a -> m ()
race ProcessName
"om-legion join listener" m ()
forall (m :: * -> *).
(MonadCatch m, MonadFail m, MonadLogger m, MonadUnliftIO m,
 Race) =>
m ()
runJoinListener
    ProcessName -> m () -> m ()
forall (m :: * -> *) a.
(MonadCatch m, MonadLogger m, MonadUnliftIO m, Race) =>
ProcessName -> m a -> m ()
race ProcessName
"om-legion periodic resend" m ()
forall (m :: * -> *). MonadIO m => m ()
runPeriodicResent
    ProcessName -> m Any -> m ()
forall (m :: * -> *) a.
(MonadCatch m, MonadLogger m, MonadUnliftIO m, Race) =>
ProcessName -> m a -> m ()
race ProcessName
"om-legion message handler" (m Any -> m ()) -> m Any -> m ()
forall a b. (a -> b) -> a -> b
$
      (StateT (RuntimeState e) m Any -> RuntimeState e -> m Any
forall (m :: * -> *) s a. Monad m => StateT s m a -> s -> m a
`evalStateT` RuntimeState e
rts)
        (
          let
            -- handleMessages :: StateT (RuntimeState e3) m Void
            handleMessages :: StateT (RuntimeState e) m Any
handleMessages = do
              msg <- RChan e -> StateT (RuntimeState e) m (RuntimeMessage e)
forall (m :: * -> *) e.
MonadIO m =>
RChan e -> m (RuntimeMessage e)
readRChan RChan e
runtimeChan
              RuntimeState {rsClusterState = cluster1} <- get
              logDebug $ "Handling: " <> showt msg
              handleRuntimeMessage msg
              RuntimeState {rsClusterState = cluster2} <- get
              when (cluster1 /= cluster2) $
                logDebug $ "New Cluster State: " <> showj cluster2
                -- propagate
              handleBroadcallTimeouts
              handleOutstandingJoins
              handleMessages
          in do
            IO () -> StateT (RuntimeState e) m ()
forall a. IO a -> StateT (RuntimeState e) m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> StateT (RuntimeState e) m ())
-> IO () -> StateT (RuntimeState e) m ()
forall a b. (a -> b) -> a -> b
$ RuntimeState e -> EventFold ClusterName Peer e -> IO ()
forall e. RuntimeState e -> EventFold ClusterName Peer e -> IO ()
rsNotify RuntimeState e
rts (RuntimeState e -> EventFold ClusterName Peer e
forall e. RuntimeState e -> EventFold ClusterName Peer e
rsClusterState RuntimeState e
rts)
            StateT (RuntimeState e) m Any
handleMessages
        )
  where
    runPeerListener :: (MonadLoggerIO m, MonadFail m) => m ()
    runPeerListener :: forall (m :: * -> *). (MonadLoggerIO m, MonadFail m) => m ()
runPeerListener =
      let
        addy :: AddressDescription
        addy :: AddressDescription
addy =
          Text -> AddressDescription
AddressDescription
            (
              Peer -> Text
unPeer (RuntimeState e -> Peer
forall e. RuntimeState e -> Peer
rsSelf RuntimeState e
rts)
              Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
":" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> PortNumber -> Text
forall a b. (Show a, IsString b) => a -> b
showt PortNumber
peerMessagePort
            )
      in
        AddressDescription -> Stream (Of (Peer, PeerMessage e)) m ()
forall i (m :: * -> *) never_returns.
(Binary i, MonadFail m, MonadIO m, Race) =>
AddressDescription -> Stream (Of i) m never_returns
openIngress AddressDescription
addy
        Stream (Of (Peer, PeerMessage e)) m ()
-> ((Peer, PeerMessage e) -> Stream (Of (RuntimeMessage e)) m ())
-> Stream (Of (RuntimeMessage e)) m ()
forall (m :: * -> *) (f :: * -> *) a r x.
(Monad m, Functor f) =>
Stream (Of a) m r -> (a -> Stream f m x) -> Stream f m r
`Stream.for`
          (\ (Peer
msgSource, PeerMessage e
msg) -> do
            IO () -> Stream (Of (RuntimeMessage e)) m ()
forall a. IO a -> Stream (Of (RuntimeMessage e)) m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Stream (Of (RuntimeMessage e)) m ())
-> IO () -> Stream (Of (RuntimeMessage e)) m ()
forall a b. (a -> b) -> a -> b
$ do
              now <- IO TimeSpec
forall (m :: * -> *). MonadTimeSpec m => m TimeSpec
getTime
              atomicModifyIORef'
                peerStats
                (\Map Peer TimeSpec
peerTimes -> (Peer -> TimeSpec -> Map Peer TimeSpec -> Map Peer TimeSpec
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert Peer
msgSource TimeSpec
now Map Peer TimeSpec
peerTimes, ()))
            m () -> Stream (Of (RuntimeMessage e)) m ()
forall (m :: * -> *) a.
Monad m =>
m a -> Stream (Of (RuntimeMessage e)) m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m () -> Stream (Of (RuntimeMessage e)) m ())
-> m () -> Stream (Of (RuntimeMessage e)) m ()
forall a b. (a -> b) -> a -> b
$ Text -> m ()
forall (m :: * -> *). (HasCallStack, MonadLogger m) => Text -> m ()
logDebug (Text -> m ()) -> Text -> m ()
forall a b. (a -> b) -> a -> b
$ Text
"Handling: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> (Peer, PeerMessage e) -> Text
forall a b. (Show a, IsString b) => a -> b
showt (Peer
msgSource :: Peer, PeerMessage e
msg)
            case PeerMessage e
msg of
              PMFullMerge EventFold ClusterName Peer e
ps -> RuntimeMessage e -> Stream (Of (RuntimeMessage e)) m ()
forall (m :: * -> *) a. Monad m => a -> Stream (Of a) m ()
Stream.yield (EventFold ClusterName Peer e -> RuntimeMessage e
forall e. EventFold ClusterName Peer e -> RuntimeMessage e
FullMerge EventFold ClusterName Peer e
ps)
              PMOutputs Map (EventId Peer) (Output e)
outputs -> RuntimeMessage e -> Stream (Of (RuntimeMessage e)) m ()
forall (m :: * -> *) a. Monad m => a -> Stream (Of a) m ()
Stream.yield (Map (EventId Peer) (Output e) -> RuntimeMessage e
forall e. Map (EventId Peer) (Output e) -> RuntimeMessage e
Outputs Map (EventId Peer) (Output e)
outputs)
              PMMerge Diff ClusterName Peer e
ps -> RuntimeMessage e -> Stream (Of (RuntimeMessage e)) m ()
forall (m :: * -> *) a. Monad m => a -> Stream (Of a) m ()
Stream.yield (Diff ClusterName Peer e -> RuntimeMessage e
forall e. Diff ClusterName Peer e -> RuntimeMessage e
Merge Diff ClusterName Peer e
ps)
              PMCall Peer
source MessageId
mid ByteString
callMsg ->
                (IO (Either SomeException ByteString)
-> Stream
     (Of (RuntimeMessage e)) m (Either SomeException ByteString)
forall a. IO a -> Stream (Of (RuntimeMessage e)) m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either SomeException ByteString)
 -> Stream
      (Of (RuntimeMessage e)) m (Either SomeException ByteString))
-> (IO ByteString -> IO (Either SomeException ByteString))
-> IO ByteString
-> Stream
     (Of (RuntimeMessage e)) m (Either SomeException ByteString)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO ByteString -> IO (Either SomeException ByteString)
forall (m :: * -> *) a.
(HasCallStack, MonadCatch m) =>
m a -> m (Either SomeException a)
tryAny) (ByteString -> IO ByteString
handleUserCall ByteString
callMsg) Stream (Of (RuntimeMessage e)) m (Either SomeException ByteString)
-> (Either SomeException ByteString
    -> Stream (Of (RuntimeMessage e)) m ())
-> Stream (Of (RuntimeMessage e)) m ()
forall a b.
Stream (Of (RuntimeMessage e)) m a
-> (a -> Stream (Of (RuntimeMessage e)) m b)
-> Stream (Of (RuntimeMessage e)) m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
                  Left SomeException
err ->
                    m () -> Stream (Of (RuntimeMessage e)) m ()
forall (m :: * -> *) a.
Monad m =>
m a -> Stream (Of (RuntimeMessage e)) m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m () -> Stream (Of (RuntimeMessage e)) m ())
-> (Text -> m ()) -> Text -> Stream (Of (RuntimeMessage e)) m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> m ()
forall (m :: * -> *). (HasCallStack, MonadLogger m) => Text -> m ()
logError
                      (Text -> Stream (Of (RuntimeMessage e)) m ())
-> Text -> Stream (Of (RuntimeMessage e)) m ()
forall a b. (a -> b) -> a -> b
$ Text
"User call handling failed with: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> SomeException -> Text
forall a b. (Show a, IsString b) => a -> b
showt SomeException
err
                  Right ByteString
v -> RChan e
-> Peer
-> MessageId
-> ByteString
-> Stream (Of (RuntimeMessage e)) m ()
forall (m :: * -> *) e.
MonadIO m =>
RChan e -> Peer -> MessageId -> ByteString -> m ()
sendCallResponse RChan e
runtimeChan Peer
source MessageId
mid ByteString
v
              PMCast ByteString
castMsg -> IO () -> Stream (Of (RuntimeMessage e)) m ()
forall a. IO a -> Stream (Of (RuntimeMessage e)) m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (ByteString -> IO ()
handleUserCast ByteString
castMsg)
              PMCallResponse Peer
source MessageId
mid ByteString
responseMsg ->
                RuntimeMessage e -> Stream (Of (RuntimeMessage e)) m ()
forall (m :: * -> *) a. Monad m => a -> Stream (Of a) m ()
Stream.yield (Peer -> MessageId -> ByteString -> RuntimeMessage e
forall e. Peer -> MessageId -> ByteString -> RuntimeMessage e
HandleCallResponse Peer
source MessageId
mid ByteString
responseMsg)
          )
        Stream (Of (RuntimeMessage e)) m ()
-> (Stream (Of (RuntimeMessage e)) m () -> m ()) -> m ()
forall a b. a -> (a -> b) -> b
& (RuntimeMessage e -> m ())
-> Stream (Of (RuntimeMessage e)) m () -> m ()
forall (m :: * -> *) a x r.
Monad m =>
(a -> m x) -> Stream (Of a) m r -> m r
Stream.mapM_ (RChan e -> Msg (RChan e) -> m ()
forall actor (m :: * -> *).
(Actor actor, MonadIO m) =>
actor -> Msg actor -> m ()
Fork.cast RChan e
runtimeChan)

    runJoinListener
      :: ( MonadCatch m
         , MonadFail m
         , MonadLogger m
         , MonadUnliftIO m
         , Race
         )
      => m ()
    runJoinListener :: forall (m :: * -> *).
(MonadCatch m, MonadFail m, MonadLogger m, MonadUnliftIO m,
 Race) =>
m ()
runJoinListener =
      let
        addy :: AddressDescription
        addy :: AddressDescription
addy =
          Text -> AddressDescription
AddressDescription
            (
              Peer -> Text
unPeer (RuntimeState e -> Peer
forall e. RuntimeState e -> Peer
rsSelf RuntimeState e
rts)
              Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
":" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> PortNumber -> Text
forall a b. (Show a, IsString b) => a -> b
showt PortNumber
joinMessagePort
            )
      in
        AddressDescription
-> Maybe (IO ServerParams)
-> Stream (Of (JoinRequest, JoinResponse e -> m Responded)) m ()
forall request response (m :: * -> *) never_returns.
(Binary request, Binary response, MonadLogger m, MonadCatch m,
 MonadFail m, MonadUnliftIO m, Race) =>
AddressDescription
-> Maybe (IO ServerParams)
-> Stream (Of (request, response -> m Responded)) m never_returns
openServer AddressDescription
addy Maybe (IO ServerParams)
forall a. Maybe a
Nothing
        Stream (Of (JoinRequest, JoinResponse e -> m Responded)) m ()
-> (Stream (Of (JoinRequest, JoinResponse e -> m Responded)) m ()
    -> m ())
-> m ()
forall a b. a -> (a -> b) -> b
& ((JoinRequest, JoinResponse e -> m Responded) -> m Responded)
-> Stream (Of (JoinRequest, JoinResponse e -> m Responded)) m ()
-> m ()
forall (m :: * -> *) a x r.
Monad m =>
(a -> m x) -> Stream (Of a) m r -> m r
Stream.mapM_
            (\(JoinRequest
req, JoinResponse e -> m Responded
respond_) ->
              RChan e
-> (Responder (JoinResponse e) -> Msg (RChan e))
-> m (JoinResponse e)
forall actor (m :: * -> *) a.
(Actor actor, MonadIO m) =>
actor -> (Responder a -> Msg actor) -> m a
Fork.call RChan e
runtimeChan (JoinRequest -> Responder (JoinResponse e) -> RuntimeMessage e
forall e.
JoinRequest -> Responder (JoinResponse e) -> RuntimeMessage e
Join JoinRequest
req) m (JoinResponse e)
-> (JoinResponse e -> m Responded) -> m Responded
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= JoinResponse e -> m Responded
respond_
            )

    runPeriodicResent :: (MonadIO m) => m ()
    runPeriodicResent :: forall (m :: * -> *). MonadIO m => m ()
runPeriodicResent =
      let
        periodicResend :: (MonadIO m) => m ()
        periodicResend :: forall (m :: * -> *). MonadIO m => m ()
periodicResend = do
          IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Int -> IO ()
threadDelay Int
resendInterval
          RChan e -> (Responder () -> Msg (RChan e)) -> m ()
forall actor (m :: * -> *) a.
(Actor actor, MonadIO m) =>
actor -> (Responder a -> Msg actor) -> m a
Fork.call RChan e
runtimeChan Responder () -> Msg (RChan e)
Responder () -> RuntimeMessage e
forall e. Responder () -> RuntimeMessage e
Resend
          m ()
forall (m :: * -> *). MonadIO m => m ()
periodicResend
      in
        m ()
forall (m :: * -> *). MonadIO m => m ()
periodicResend


{- | Handle any outstanding joins. -}
handleOutstandingJoins :: (MonadLoggerIO m) => StateT (RuntimeState e) m ()
handleOutstandingJoins :: forall (m :: * -> *) e.
MonadLoggerIO m =>
StateT (RuntimeState e) m ()
handleOutstandingJoins = do
  state@RuntimeState {rsJoins, rsClusterState} <- StateT (RuntimeState e) m (RuntimeState e)
forall s (m :: * -> *). MonadState s m => m s
get
  let
    (consistent, pending) =
      Map.partitionWithKey
        (\EventId Peer
k Responder (JoinResponse e)
_ -> EventId Peer
k EventId Peer -> EventId Peer -> Bool
forall a. Ord a => a -> a -> Bool
<= EventFold ClusterName Peer e -> EventId Peer
forall o p e. EventFold o p e -> EventId p
infimumId EventFold ClusterName Peer e
rsClusterState)
        rsJoins
  put state {rsJoins = pending}
  sequence_ [
      do
        logInfo $ "Completing join (" <> showt sid <> ")."
        respond responder (JoinOk rsClusterState)
      | (sid, responder) <- Map.toList consistent
    ]


{- | Handle any broadcall timeouts. -}
handleBroadcallTimeouts
  :: ( MonadIO m
     , MonadTimeSpec m
     )
  => StateT (RuntimeState e) m ()
handleBroadcallTimeouts :: forall (m :: * -> *) e.
(MonadIO m, MonadTimeSpec m) =>
StateT (RuntimeState e) m ()
handleBroadcallTimeouts = do
  broadcalls <- (RuntimeState e
 -> Map
      MessageId
      (Map Peer (Maybe ByteString),
       Responder (Map Peer (Maybe ByteString)), TimeSpec))
-> StateT
     (RuntimeState e)
     m
     (Map
        MessageId
        (Map Peer (Maybe ByteString),
         Responder (Map Peer (Maybe ByteString)), TimeSpec))
forall s (m :: * -> *) a. MonadState s m => (s -> a) -> m a
gets RuntimeState e
-> Map
     MessageId
     (Map Peer (Maybe ByteString),
      Responder (Map Peer (Maybe ByteString)), TimeSpec)
forall e.
RuntimeState e
-> Map
     MessageId
     (Map Peer (Maybe ByteString),
      Responder (Map Peer (Maybe ByteString)), TimeSpec)
rsBroadcalls
  now <- getTime
  sequence_ [
      do
        respond responder responses
        modify' (\RuntimeState e
rs -> RuntimeState e
rs {
            rsBroadcalls = Map.delete messageId (rsBroadcalls rs)
          })
      | (messageId, (responses, responder, expiry)) <- Map.toList broadcalls
      , now >= expiry
    ]


{- | Execute the incoming messages. -}
handleRuntimeMessage
  :: ( Binary (Output e)
     , Binary (State e)
     , Binary e
     , Default (State e)
     , Eq (Output e)
     , Eq e
     , Event Peer e
     , MonadLoggerIO m
     , MonadTimeSpec m
     , Show (Output e)
     , Show (State e)
     , Show e
     , ToJSON (Output e)
     , ToJSON (State e)
     , ToJSON e
     )
  => RuntimeMessage e
  -> StateT (RuntimeState e) m ()

handleRuntimeMessage :: forall e (m :: * -> *).
(Binary (Output e), Binary (State e), Binary e, Default (State e),
 Eq (Output e), Eq e, Event Peer e, MonadLoggerIO m,
 MonadTimeSpec m, Show (Output e), Show (State e), Show e,
 ToJSON (Output e), ToJSON (State e), ToJSON e) =>
RuntimeMessage e -> StateT (RuntimeState e) m ()
handleRuntimeMessage (Outputs Map (EventId Peer) (Output e)
outputs) =
  {-# SCC "Outputs" #-}
  Map (EventId Peer) (Output e) -> StateT (RuntimeState e) m ()
forall (m :: * -> *) e.
(MonadLoggerIO m, MonadState (RuntimeState e) m,
 Show (Output e)) =>
Map (EventId Peer) (Output e) -> m ()
respondToWaiting Map (EventId Peer) (Output e)
outputs

handleRuntimeMessage (GetStats Responder (EventFold ClusterName Peer e)
responder) =
  {-# SCC "GetStats" #-}
  Responder (EventFold ClusterName Peer e)
-> EventFold ClusterName Peer e -> StateT (RuntimeState e) m ()
forall (m :: * -> *) a. MonadIO m => Responder a -> a -> m ()
respond Responder (EventFold ClusterName Peer e)
responder (EventFold ClusterName Peer e -> StateT (RuntimeState e) m ())
-> StateT (RuntimeState e) m (EventFold ClusterName Peer e)
-> StateT (RuntimeState e) m ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< (RuntimeState e -> EventFold ClusterName Peer e)
-> StateT (RuntimeState e) m (EventFold ClusterName Peer e)
forall s (m :: * -> *) a. MonadState s m => (s -> a) -> m a
gets RuntimeState e -> EventFold ClusterName Peer e
forall e. RuntimeState e -> EventFold ClusterName Peer e
rsClusterState

handleRuntimeMessage (ApplyFast e
e Responder (Output e)
responder) =
  {-# SCC "ApplyFast" #-}
  EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
-> StateT (RuntimeState e) m ()
forall e (m :: * -> *) a.
(EventConstraints e, MonadLoggerIO m,
 MonadState (RuntimeState e) m) =>
EventFoldT ClusterName Peer e m a -> m a
updateCluster (EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
 -> StateT (RuntimeState e) m ())
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
-> StateT (RuntimeState e) m ()
forall a b. (a -> b) -> a -> b
$
    (Output e, EventId Peer) -> Output e
forall a b. (a, b) -> a
fst ((Output e, EventId Peer) -> Output e)
-> EventFoldT
     ClusterName
     Peer
     e
     (StateT (RuntimeState e) m)
     (Output e, EventId Peer)
-> EventFoldT
     ClusterName Peer e (StateT (RuntimeState e) m) (Output e)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> e
-> EventFoldT
     ClusterName
     Peer
     e
     (StateT (RuntimeState e) m)
     (Output e, EventId Peer)
forall o p e (m :: * -> *).
MonadUpdateEF o p e m =>
e -> m (Output e, EventId p)
event e
e EventFoldT
  ClusterName Peer e (StateT (RuntimeState e) m) (Output e)
-> (Output e
    -> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ())
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
forall a b.
EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) a
-> (a
    -> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) b)
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Responder (Output e)
-> Output e
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
forall (m :: * -> *) a. MonadIO m => Responder a -> a -> m ()
respond Responder (Output e)
responder

handleRuntimeMessage (ApplyConsistent e
e Responder (Output e)
responder) =
  {-# SCC "ApplyConsistent" #-}
  (
    do
      EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
-> StateT (RuntimeState e) m ()
forall e (m :: * -> *) a.
(EventConstraints e, MonadLoggerIO m,
 MonadState (RuntimeState e) m) =>
EventFoldT ClusterName Peer e m a -> m a
updateCluster (EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
 -> StateT (RuntimeState e) m ())
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
-> StateT (RuntimeState e) m ()
forall a b. (a -> b) -> a -> b
$ do
        (_v, sid) <- e
-> EventFoldT
     ClusterName
     Peer
     e
     (StateT (RuntimeState e) m)
     (Output e, EventId Peer)
forall o p e (m :: * -> *).
MonadUpdateEF o p e m =>
e -> m (Output e, EventId p)
event e
e
        lift (waitOn sid responder)
      rs <- StateT (RuntimeState e) m (RuntimeState e)
forall s (m :: * -> *). MonadState s m => m s
get
      logDebug $ "Waiting: " <> showt (rsWaiting rs)
  )

handleRuntimeMessage (Eject Peer
peer Responder ()
responder) =
  {-# SCC "Eject" #-}
  do
    Peer
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
-> StateT (RuntimeState e) m ()
forall e (m :: * -> *) a.
(EventConstraints e, MonadLoggerIO m,
 MonadState (RuntimeState e) m) =>
Peer -> EventFoldT ClusterName Peer e m a -> m a
updateClusterAs Peer
peer (EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
 -> StateT (RuntimeState e) m ())
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
-> StateT (RuntimeState e) m ()
forall a b. (a -> b) -> a -> b
$
      EventFoldT
  ClusterName Peer e (StateT (RuntimeState e) m) (EventId Peer)
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (EventFoldT
   ClusterName Peer e (StateT (RuntimeState e) m) (EventId Peer)
 -> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ())
-> EventFoldT
     ClusterName Peer e (StateT (RuntimeState e) m) (EventId Peer)
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
forall a b. (a -> b) -> a -> b
$ Peer
-> EventFoldT
     ClusterName Peer e (StateT (RuntimeState e) m) (EventId Peer)
forall o p e (m :: * -> *).
MonadUpdateEF o p e m =>
p -> m (EventId p)
disassociate Peer
peer
    StateT (RuntimeState e) m ()
forall e (m :: * -> *).
(EventConstraints e, MonadLoggerIO m,
 MonadState (RuntimeState e) m) =>
m ()
propagate
    {- ↓
      This is an awful hack. The problem is that 'propagate' uses
      'sendPeer', but 'sendPeer' itself is asynchronous (though it should be
      very fast). The correct solution is a bit tricky. We can either figure
      out some way to block all the way down through the internals of the
      connection management, or else maybe extend the "join port" server
      endpoint to accept eject notifications as well as join requests.
    -}
    IO () -> StateT (RuntimeState e) m ()
forall a. IO a -> StateT (RuntimeState e) m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> StateT (RuntimeState e) m ())
-> IO () -> StateT (RuntimeState e) m ()
forall a b. (a -> b) -> a -> b
$ Int -> IO ()
threadDelay Int
500_000
    Responder () -> () -> StateT (RuntimeState e) m ()
forall (m :: * -> *) a. MonadIO m => Responder a -> a -> m ()
respond Responder ()
responder ()

handleRuntimeMessage (Merge Diff ClusterName Peer e
other) =
  {-# SCC "Merge" #-}
  EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
-> StateT (RuntimeState e) m ()
forall e (m :: * -> *) a.
(EventConstraints e, MonadLoggerIO m,
 MonadState (RuntimeState e) m) =>
EventFoldT ClusterName Peer e m a -> m a
updateCluster (EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
 -> StateT (RuntimeState e) m ())
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
-> StateT (RuntimeState e) m ()
forall a b. (a -> b) -> a -> b
$
    Diff ClusterName Peer e
-> EventFoldT
     ClusterName
     Peer
     e
     (StateT (RuntimeState e) m)
     (Either (MergeError ClusterName Peer e) ())
forall o p e (m :: * -> *).
MonadUpdateEF o p e m =>
Diff o p e -> m (Either (MergeError o p e) ())
diffMerge Diff ClusterName Peer e
other EventFoldT
  ClusterName
  Peer
  e
  (StateT (RuntimeState e) m)
  (Either (MergeError ClusterName Peer e) ())
-> (Either (MergeError ClusterName Peer e) ()
    -> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ())
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
forall a b.
EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) a
-> (a
    -> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) b)
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
      Left MergeError ClusterName Peer e
err -> Text
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
forall (m :: * -> *). (HasCallStack, MonadLogger m) => Text -> m ()
logError (Text
 -> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ())
-> Text
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
forall a b. (a -> b) -> a -> b
$ Text
"Bad cluster merge: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> MergeError ClusterName Peer e -> Text
forall a b. (Show a, IsString b) => a -> b
showt MergeError ClusterName Peer e
err
      Right () -> () -> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
forall a.
a -> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) a
forall (m :: * -> *) a. Monad m => a -> m a
return ()

handleRuntimeMessage (FullMerge EventFold ClusterName Peer e
other) =
  {-# SCC "FullMerge" #-}
  EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
-> StateT (RuntimeState e) m ()
forall e (m :: * -> *) a.
(EventConstraints e, MonadLoggerIO m,
 MonadState (RuntimeState e) m) =>
EventFoldT ClusterName Peer e m a -> m a
updateCluster (EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
 -> StateT (RuntimeState e) m ())
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
-> StateT (RuntimeState e) m ()
forall a b. (a -> b) -> a -> b
$
    EventFold ClusterName Peer e
-> EventFoldT
     ClusterName
     Peer
     e
     (StateT (RuntimeState e) m)
     (Either (MergeError ClusterName Peer e) ())
forall o p e (m :: * -> *).
MonadUpdateEF o p e m =>
EventFold o p e -> m (Either (MergeError o p e) ())
fullMerge EventFold ClusterName Peer e
other EventFoldT
  ClusterName
  Peer
  e
  (StateT (RuntimeState e) m)
  (Either (MergeError ClusterName Peer e) ())
-> (Either (MergeError ClusterName Peer e) ()
    -> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ())
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
forall a b.
EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) a
-> (a
    -> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) b)
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
      Left MergeError ClusterName Peer e
err -> Text
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
forall (m :: * -> *). (HasCallStack, MonadLogger m) => Text -> m ()
logError (Text
 -> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ())
-> Text
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
forall a b. (a -> b) -> a -> b
$ Text
"Bad cluster merge: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> MergeError ClusterName Peer e -> Text
forall a b. (Show a, IsString b) => a -> b
showt MergeError ClusterName Peer e
err
      Right () -> () -> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
forall a.
a -> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) a
forall (m :: * -> *) a. Monad m => a -> m a
return ()

handleRuntimeMessage (Join (JoinRequest Peer
peer) Responder (JoinResponse e)
responder) =
  {-# SCC "Join" #-}
  do
    Text -> StateT (RuntimeState e) m ()
forall (m :: * -> *). (HasCallStack, MonadLogger m) => Text -> m ()
logInfo (Text -> StateT (RuntimeState e) m ())
-> Text -> StateT (RuntimeState e) m ()
forall a b. (a -> b) -> a -> b
$ Text
"Handling join from peer: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Peer -> Text
forall a b. (Show a, IsString b) => a -> b
showt Peer
peer
    EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
-> StateT (RuntimeState e) m ()
forall e (m :: * -> *) a.
(EventConstraints e, MonadLoggerIO m,
 MonadState (RuntimeState e) m) =>
EventFoldT ClusterName Peer e m a -> m a
updateCluster (do
        EventFoldT
  ClusterName Peer e (StateT (RuntimeState e) m) (EventId Peer)
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (EventFoldT
   ClusterName Peer e (StateT (RuntimeState e) m) (EventId Peer)
 -> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ())
-> EventFoldT
     ClusterName Peer e (StateT (RuntimeState e) m) (EventId Peer)
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
forall a b. (a -> b) -> a -> b
$ Peer
-> EventFoldT
     ClusterName Peer e (StateT (RuntimeState e) m) (EventId Peer)
forall o p e (m :: * -> *).
MonadUpdateEF o p e m =>
p -> m (EventId p)
disassociate Peer
peer
        EventFoldT
  ClusterName Peer e (StateT (RuntimeState e) m) (EventId Peer)
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (EventFoldT
   ClusterName Peer e (StateT (RuntimeState e) m) (EventId Peer)
 -> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ())
-> EventFoldT
     ClusterName Peer e (StateT (RuntimeState e) m) (EventId Peer)
-> EventFoldT ClusterName Peer e (StateT (RuntimeState e) m) ()
forall a b. (a -> b) -> a -> b
$ Peer
-> EventFoldT
     ClusterName Peer e (StateT (RuntimeState e) m) (EventId Peer)
forall o p e (m :: * -> *).
MonadUpdateEF o p e m =>
p -> m (EventId p)
participate Peer
peer
      )
    RuntimeState {rsClusterState} <- StateT (RuntimeState e) m (RuntimeState e)
forall s (m :: * -> *). MonadState s m => m s
get
    logInfo $ "Join immediately with: " <> showt rsClusterState
    respond responder (JoinOk rsClusterState)

handleRuntimeMessage (ReadState Responder (EventFold ClusterName Peer e)
responder) =
  {-# SCC "ReadState" #-}
  Responder (EventFold ClusterName Peer e)
-> EventFold ClusterName Peer e -> StateT (RuntimeState e) m ()
forall (m :: * -> *) a. MonadIO m => Responder a -> a -> m ()
respond Responder (EventFold ClusterName Peer e)
responder (EventFold ClusterName Peer e -> StateT (RuntimeState e) m ())
-> (RuntimeState e -> EventFold ClusterName Peer e)
-> RuntimeState e
-> StateT (RuntimeState e) m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. RuntimeState e -> EventFold ClusterName Peer e
forall e. RuntimeState e -> EventFold ClusterName Peer e
rsClusterState (RuntimeState e -> StateT (RuntimeState e) m ())
-> StateT (RuntimeState e) m (RuntimeState e)
-> StateT (RuntimeState e) m ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< StateT (RuntimeState e) m (RuntimeState e)
forall s (m :: * -> *). MonadState s m => m s
get

handleRuntimeMessage (Call Peer
target ByteString
msg Responder ByteString
responder) =
    {-# SCC "Call" #-}
    do
      mid <- StateT (RuntimeState e) m MessageId
forall (m :: * -> *) e.
Monad m =>
StateT (RuntimeState e) m MessageId
newMessageId
      source <- gets rsSelf
      setCallResponder mid
      sendPeer (PMCall source mid msg) target
  where
    setCallResponder :: (Monad m)
      => MessageId
      -> StateT (RuntimeState e) m ()
    setCallResponder :: forall (m :: * -> *) e.
Monad m =>
MessageId -> StateT (RuntimeState e) m ()
setCallResponder MessageId
mid = do
      state@RuntimeState {rsCalls} <- StateT (RuntimeState e) m (RuntimeState e)
forall s (m :: * -> *). MonadState s m => m s
get
      put state {
          rsCalls = Map.insert mid responder rsCalls
        }

handleRuntimeMessage (Cast Peer
target ByteString
msg) =
  {-# SCC "Cast" #-}
  PeerMessage e -> Peer -> StateT (RuntimeState e) m ()
forall (m :: * -> *) e.
(EventConstraints e, MonadLoggerIO m,
 MonadState (RuntimeState e) m) =>
PeerMessage e -> Peer -> m ()
sendPeer (ByteString -> PeerMessage e
forall e. ByteString -> PeerMessage e
PMCast ByteString
msg) Peer
target

handleRuntimeMessage (Broadcall DiffTime
timeout ByteString
msg Responder (Map Peer (Maybe ByteString))
responder) =
    {-# SCC "Broadcall" #-}
    do
      expiry <- DiffTime -> TimeSpec -> TimeSpec
addTime DiffTime
timeout (TimeSpec -> TimeSpec)
-> StateT (RuntimeState e) m TimeSpec
-> StateT (RuntimeState e) m TimeSpec
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> StateT (RuntimeState e) m TimeSpec
forall (m :: * -> *). MonadTimeSpec m => m TimeSpec
getTime
      mid <- newMessageId
      source <- gets rsSelf
      setBroadcallResponder expiry mid
      mapM_ (sendPeer (PMCall source mid msg)) =<< getPeers
  where
    setBroadcallResponder :: (Monad m)
      => TimeSpec
      -> MessageId
      -> StateT (RuntimeState e) m ()
    setBroadcallResponder :: forall (m :: * -> *) e.
Monad m =>
TimeSpec -> MessageId -> StateT (RuntimeState e) m ()
setBroadcallResponder TimeSpec
expiry MessageId
mid = do
      peers <- StateT (RuntimeState e) m (Set Peer)
forall (m :: * -> *) e.
Monad m =>
StateT (RuntimeState e) m (Set Peer)
getPeers
      state@RuntimeState {rsBroadcalls} <- get
      put state {
          rsBroadcalls =
            Map.insert
              mid
              (
                Map.fromList [(peer, Nothing) | peer <- Set.toList peers],
                responder,
                expiry
              )
              rsBroadcalls
        }

handleRuntimeMessage (Broadcast ByteString
msg) =
  {-# SCC "Broadcast" #-}
  (Peer -> StateT (RuntimeState e) m ())
-> Set Peer -> StateT (RuntimeState e) m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (PeerMessage e -> Peer -> StateT (RuntimeState e) m ()
forall (m :: * -> *) e.
(EventConstraints e, MonadLoggerIO m,
 MonadState (RuntimeState e) m) =>
PeerMessage e -> Peer -> m ()
sendPeer (ByteString -> PeerMessage e
forall e. ByteString -> PeerMessage e
PMCast ByteString
msg)) (Set Peer -> StateT (RuntimeState e) m ())
-> StateT (RuntimeState e) m (Set Peer)
-> StateT (RuntimeState e) m ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< StateT (RuntimeState e) m (Set Peer)
forall (m :: * -> *) e.
Monad m =>
StateT (RuntimeState e) m (Set Peer)
getPeers

handleRuntimeMessage (SendCallResponse Peer
target MessageId
mid ByteString
msg) =
  {-# SCC "SendCallResponse" #-}
  do
    source <- (RuntimeState e -> Peer) -> StateT (RuntimeState e) m Peer
forall s (m :: * -> *) a. MonadState s m => (s -> a) -> m a
gets RuntimeState e -> Peer
forall e. RuntimeState e -> Peer
rsSelf
    sendPeer (PMCallResponse source mid msg) target

handleRuntimeMessage (HandleCallResponse Peer
source MessageId
mid ByteString
msg) =
  {-# SCC "HandleCallResponse" #-}
  do
    state@RuntimeState {rsCalls, rsBroadcalls} <- StateT (RuntimeState e) m (RuntimeState e)
forall s (m :: * -> *). MonadState s m => m s
get
    case Map.lookup mid rsCalls of
      Maybe (Responder ByteString)
Nothing ->
        case MessageId
-> Map
     MessageId
     (Map Peer (Maybe ByteString),
      Responder (Map Peer (Maybe ByteString)), TimeSpec)
-> Maybe
     (Map Peer (Maybe ByteString),
      Responder (Map Peer (Maybe ByteString)), TimeSpec)
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup MessageId
mid Map
  MessageId
  (Map Peer (Maybe ByteString),
   Responder (Map Peer (Maybe ByteString)), TimeSpec)
rsBroadcalls of
          Maybe
  (Map Peer (Maybe ByteString),
   Responder (Map Peer (Maybe ByteString)), TimeSpec)
Nothing -> () -> StateT (RuntimeState e) m ()
forall a. a -> StateT (RuntimeState e) m a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
          Just (Map Peer (Maybe ByteString)
responses, Responder (Map Peer (Maybe ByteString))
responder, TimeSpec
expiry) ->
            let
              responses2 :: Map Peer (Maybe ByteString)
responses2 = Peer
-> Maybe ByteString
-> Map Peer (Maybe ByteString)
-> Map Peer (Maybe ByteString)
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert Peer
source (ByteString -> Maybe ByteString
forall a. a -> Maybe a
Just ByteString
msg) Map Peer (Maybe ByteString)
responses
              response :: Map Peer ByteString
response = [(Peer, ByteString)] -> Map Peer ByteString
forall k a. Ord k => [(k, a)] -> Map k a
Map.fromList [
                  (Peer
peer, ByteString
r)
                  | (Peer
peer, Just ByteString
r) <- Map Peer (Maybe ByteString) -> [(Peer, Maybe ByteString)]
forall k a. Map k a -> [(k, a)]
Map.toList Map Peer (Maybe ByteString)
responses2
                ]
              peers :: Set Peer
peers = Map Peer (Maybe ByteString) -> Set Peer
forall k a. Map k a -> Set k
Map.keysSet Map Peer (Maybe ByteString)
responses2
            in
              if Set Peer -> Bool
forall a. Set a -> Bool
Set.null (Set Peer
peers Set Peer -> Set Peer -> Set Peer
forall a. Ord a => Set a -> Set a -> Set a
\\ Map Peer ByteString -> Set Peer
forall k a. Map k a -> Set k
Map.keysSet Map Peer ByteString
response)
                then do
                  Responder (Map Peer (Maybe ByteString))
-> Map Peer (Maybe ByteString) -> StateT (RuntimeState e) m ()
forall (m :: * -> *) a. MonadIO m => Responder a -> a -> m ()
respond Responder (Map Peer (Maybe ByteString))
responder (ByteString -> Maybe ByteString
forall a. a -> Maybe a
Just (ByteString -> Maybe ByteString)
-> Map Peer ByteString -> Map Peer (Maybe ByteString)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Map Peer ByteString
response)
                  RuntimeState e -> StateT (RuntimeState e) m ()
forall s (m :: * -> *). MonadState s m => s -> m ()
put RuntimeState e
state {
                      rsBroadcalls = Map.delete mid rsBroadcalls
                    }
                else
                  RuntimeState e -> StateT (RuntimeState e) m ()
forall s (m :: * -> *). MonadState s m => s -> m ()
put RuntimeState e
state {
                      rsBroadcalls =
                        Map.insert
                          mid
                          (responses2, responder, expiry)
                          rsBroadcalls
                    }
      Just Responder ByteString
responder -> do
        Responder ByteString -> ByteString -> StateT (RuntimeState e) m ()
forall (m :: * -> *) a. MonadIO m => Responder a -> a -> m ()
respond Responder ByteString
responder ByteString
msg
        RuntimeState e -> StateT (RuntimeState e) m ()
forall s (m :: * -> *). MonadState s m => s -> m ()
put RuntimeState e
state {rsCalls = Map.delete mid rsCalls}

handleRuntimeMessage (Resend Responder ()
responder) =
  {-# SCC "Resend" #-}
  StateT (RuntimeState e) m ()
forall e (m :: * -> *).
(EventConstraints e, MonadLoggerIO m,
 MonadState (RuntimeState e) m) =>
m ()
propagate StateT (RuntimeState e) m ()
-> (() -> StateT (RuntimeState e) m ())
-> StateT (RuntimeState e) m ()
forall a b.
StateT (RuntimeState e) m a
-> (a -> StateT (RuntimeState e) m b)
-> StateT (RuntimeState e) m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Responder () -> () -> StateT (RuntimeState e) m ()
forall (m :: * -> *) a. MonadIO m => Responder a -> a -> m ()
respond Responder ()
responder


{- | Get the projected peers. -}
getPeers :: (Monad m) => StateT (RuntimeState e) m (Set Peer)
getPeers :: forall (m :: * -> *) e.
Monad m =>
StateT (RuntimeState e) m (Set Peer)
getPeers = (RuntimeState e -> Set Peer)
-> StateT (RuntimeState e) m (Set Peer)
forall s (m :: * -> *) a. MonadState s m => (s -> a) -> m a
gets (EventFold ClusterName Peer e -> Set Peer
forall p o e. Ord p => EventFold o p e -> Set p
projParticipants (EventFold ClusterName Peer e -> Set Peer)
-> (RuntimeState e -> EventFold ClusterName Peer e)
-> RuntimeState e
-> Set Peer
forall b c a. (b -> c) -> (a -> b) -> a -> c
. RuntimeState e -> EventFold ClusterName Peer e
forall e. RuntimeState e -> EventFold ClusterName Peer e
rsClusterState)


{- | Get a new messageId. -}
newMessageId :: (Monad m) => StateT (RuntimeState e) m MessageId
newMessageId :: forall (m :: * -> *) e.
Monad m =>
StateT (RuntimeState e) m MessageId
newMessageId = do
  state@RuntimeState {rsNextId} <- StateT (RuntimeState e) m (RuntimeState e)
forall s (m :: * -> *). MonadState s m => m s
get
  put state {rsNextId = nextMessageId rsNextId}
  return rsNextId


{- |
  Like 'runEventFoldT', plus automatically take care of doing necessary
  IO implied by the cluster update.
-}
updateCluster
  :: ( EventConstraints e
     , MonadLoggerIO m
     , MonadState (RuntimeState e) m
     )
  => EventFoldT ClusterName Peer e m a
  -> m a
updateCluster :: forall e (m :: * -> *) a.
(EventConstraints e, MonadLoggerIO m,
 MonadState (RuntimeState e) m) =>
EventFoldT ClusterName Peer e m a -> m a
updateCluster EventFoldT ClusterName Peer e m a
action = do
  RuntimeState {rsSelf} <- m (RuntimeState e)
forall s (m :: * -> *). MonadState s m => m s
get
  updateClusterAs rsSelf action


{- |
  Like 'updateCluster', but perform the operation on behalf of a specified
  peer. This is required for e.g. the peer eject case, when the ejected peer
  may not be able to perform acknowledgements on its own behalf.
-}
updateClusterAs
  :: forall e m a.
     ( EventConstraints e
     , MonadLoggerIO m
     , MonadState (RuntimeState e) m
     )
  => Peer
  -> EventFoldT ClusterName Peer e m a
  -> m a
updateClusterAs :: forall e (m :: * -> *) a.
(EventConstraints e, MonadLoggerIO m,
 MonadState (RuntimeState e) m) =>
Peer -> EventFoldT ClusterName Peer e m a -> m a
updateClusterAs Peer
asPeer EventFoldT ClusterName Peer e m a
action = do
  (oldCluster, notify)
    <- (RuntimeState e
 -> (EventFold ClusterName Peer e,
     EventFold ClusterName Peer e -> IO ()))
-> m (EventFold ClusterName Peer e,
      EventFold ClusterName Peer e -> IO ())
forall s (m :: * -> *) a. MonadState s m => (s -> a) -> m a
gets (RuntimeState e -> EventFold ClusterName Peer e
forall e. RuntimeState e -> EventFold ClusterName Peer e
rsClusterState (RuntimeState e -> EventFold ClusterName Peer e)
-> (RuntimeState e -> EventFold ClusterName Peer e -> IO ())
-> RuntimeState e
-> (EventFold ClusterName Peer e,
    EventFold ClusterName Peer e -> IO ())
forall b c c'. (b -> c) -> (b -> c') -> b -> (c, c')
forall (a :: * -> * -> *) b c c'.
Arrow a =>
a b c -> a b c' -> a b (c, c')
&&& RuntimeState e -> EventFold ClusterName Peer e -> IO ()
forall e. RuntimeState e -> EventFold ClusterName Peer e -> IO ()
rsNotify)
  (v, ur) <- runEventFoldT asPeer oldCluster action
  do {- Update the cluster -}
    let
      newCluster :: EventFold ClusterName Peer e
      newCluster = UpdateResult ClusterName Peer e -> EventFold ClusterName Peer e
forall o p e. UpdateResult o p e -> EventFold o p e
urEventFold UpdateResult ClusterName Peer e
ur
    when (oldCluster /= newCluster) $ liftIO (notify newCluster)
    modify'
      (
        let
          doModify RuntimeState e
state =
            EventFold ClusterName Peer e
newCluster EventFold ClusterName Peer e -> RuntimeState e -> RuntimeState e
forall a b. a -> b -> b
`seq`
            RuntimeState e
state
              { rsClusterState = newCluster
              }
        in
          doModify
      )
  do {- Dispatch outputs. -}
    self <- gets rsSelf
    let
      outputs :: Map (EventId Peer) (Output e)
      outputs = UpdateResult ClusterName Peer e -> Map (EventId Peer) (Output e)
forall o p e. UpdateResult o p e -> Map (EventId p) (Output e)
urOutputs UpdateResult ClusterName Peer e
ur

      byRemotePeer :: Map Peer (Map (EventId Peer) (Output e))
      byRemotePeer =
        (Map (EventId Peer) (Output e)
 -> Map (EventId Peer) (Output e) -> Map (EventId Peer) (Output e))
-> [Map Peer (Map (EventId Peer) (Output e))]
-> Map Peer (Map (EventId Peer) (Output e))
forall (f :: * -> *) k a.
(Foldable f, Ord k) =>
(a -> a -> a) -> f (Map k a) -> Map k a
Map.unionsWith
          Map (EventId Peer) (Output e)
-> Map (EventId Peer) (Output e) -> Map (EventId Peer) (Output e)
forall a. Semigroup a => a -> a -> a
(<>)
          [ Peer
-> Map (EventId Peer) (Output e)
-> Map Peer (Map (EventId Peer) (Output e))
forall k a. k -> a -> Map k a
Map.singleton Peer
peer (EventId Peer -> Output e -> Map (EventId Peer) (Output e)
forall k a. k -> a -> Map k a
Map.singleton EventId Peer
eid Output e
o)
          | (EventId Peer
eid, Output e
o) <- Map (EventId Peer) (Output e) -> [(EventId Peer, Output e)]
forall k a. Map k a -> [(k, a)]
Map.toList Map (EventId Peer) (Output e)
outputs
          , Just Peer
peer <- [EventId Peer -> Maybe Peer
forall p. EventId p -> Maybe p
EF.source EventId Peer
eid]
          ]
    respondToWaiting outputs
    sequence_
      [ do
          logDebug $ "Sending remote outputs: " <> showt byRemotePeer
          sendPeer (PMOutputs outputsForPeer) peer
      | (peer, outputsForPeer) <- Map.toList byRemotePeer
      , peer /= self
      ]
  pure v


{- | Wait on a consistent response for the given state id. -}
waitOn :: (Monad m)
  => EventId Peer
  -> Responder (Output e)
  -> StateT (RuntimeState e) m ()
waitOn :: forall (m :: * -> *) e.
Monad m =>
EventId Peer
-> Responder (Output e) -> StateT (RuntimeState e) m ()
waitOn EventId Peer
sid Responder (Output e)
responder =
  (RuntimeState e -> RuntimeState e) -> StateT (RuntimeState e) m ()
forall s (m :: * -> *). MonadState s m => (s -> s) -> m ()
modify' (\state :: RuntimeState e
state@RuntimeState {Map (EventId Peer) (Responder (Output e))
rsWaiting :: forall e.
RuntimeState e -> Map (EventId Peer) (Responder (Output e))
rsWaiting :: Map (EventId Peer) (Responder (Output e))
rsWaiting} -> RuntimeState e
state {
    rsWaiting = Map.insert sid responder rsWaiting
  })


{- | Propagates cluster information if necessary. -}
propagate
  :: ( EventConstraints e
     , MonadLoggerIO m
     , MonadState (RuntimeState e) m
     )
  => m ()
propagate :: forall e (m :: * -> *).
(EventConstraints e, MonadLoggerIO m,
 MonadState (RuntimeState e) m) =>
m ()
propagate = do
    (self, cluster) <- (RuntimeState e -> (Peer, EventFold ClusterName Peer e))
-> m (Peer, EventFold ClusterName Peer e)
forall s (m :: * -> *) a. MonadState s m => (s -> a) -> m a
gets (RuntimeState e -> Peer
forall e. RuntimeState e -> Peer
rsSelf (RuntimeState e -> Peer)
-> (RuntimeState e -> EventFold ClusterName Peer e)
-> RuntimeState e
-> (Peer, EventFold ClusterName Peer e)
forall b c c'. (b -> c) -> (b -> c') -> b -> (c, c')
forall (a :: * -> * -> *) b c c'.
Arrow a =>
a b c -> a b c' -> a b (c, c')
&&& RuntimeState e -> EventFold ClusterName Peer e
forall e. RuntimeState e -> EventFold ClusterName Peer e
rsClusterState)
    let
      targets = Peer -> Set Peer -> Set Peer
forall a. Ord a => a -> Set a -> Set a
Set.delete Peer
self (Set Peer -> Set Peer) -> Set Peer -> Set Peer
forall a b. (a -> b) -> a -> b
$
        EventFold ClusterName Peer e -> Set Peer
forall p o e. Ord p => EventFold o p e -> Set p
EF.allParticipants EventFold ClusterName Peer e
cluster

    liftIO (shuffleM (Set.toList targets)) >>= \case
      [] -> () -> m ()
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
      Peer
target:[Peer]
_ ->
        case Peer
-> EventFold ClusterName Peer e -> Maybe (Diff ClusterName Peer e)
forall o p e. Ord p => p -> EventFold o p e -> Maybe (Diff o p e)
events Peer
target EventFold ClusterName Peer e
cluster of
          Maybe (Diff ClusterName Peer e)
Nothing -> do
            Text -> m ()
forall (m :: * -> *). (HasCallStack, MonadLogger m) => Text -> m ()
logInfo
              (Text -> m ()) -> Text -> m ()
forall a b. (a -> b) -> a -> b
$ Text
"Sending full merge because the target's join event "
              Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"has not reaached the infimum"
            PeerMessage e -> Peer -> m ()
forall (m :: * -> *) e.
(EventConstraints e, MonadLoggerIO m,
 MonadState (RuntimeState e) m) =>
PeerMessage e -> Peer -> m ()
sendPeer (EventFold ClusterName Peer e -> PeerMessage e
forall e. EventFold ClusterName Peer e -> PeerMessage e
PMFullMerge EventFold ClusterName Peer e
cluster) Peer
target
          Just Diff ClusterName Peer e
diff
            | Diff ClusterName Peer e -> Int
forall o p e. Diff o p e -> Int
diffSize Diff ClusterName Peer e
diff Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0 ->
                PeerMessage e -> Peer -> m ()
forall (m :: * -> *) e.
(EventConstraints e, MonadLoggerIO m,
 MonadState (RuntimeState e) m) =>
PeerMessage e -> Peer -> m ()
sendPeer (Diff ClusterName Peer e -> PeerMessage e
forall e. Diff ClusterName Peer e -> PeerMessage e
PMMerge Diff ClusterName Peer e
diff) Peer
target
            | Bool
otherwise ->
                PeerMessage e -> Peer -> m ()
forall (m :: * -> *) e.
(EventConstraints e, MonadLoggerIO m,
 MonadState (RuntimeState e) m) =>
PeerMessage e -> Peer -> m ()
sendPeer (EventFold ClusterName Peer e -> PeerMessage e
forall e. EventFold ClusterName Peer e -> PeerMessage e
PMFullMerge EventFold ClusterName Peer e
cluster) Peer
target
    disconnectObsolete
  where
    {- |
      Shut down connections to peers that are no longer participating
      in the cluster.
    -}
    disconnectObsolete
      :: ( MonadState (RuntimeState e) m
         , MonadLogger m
         )
      => m ()
    disconnectObsolete :: forall e (m :: * -> *).
(MonadState (RuntimeState e) m, MonadLogger m) =>
m ()
disconnectObsolete = do
      (cluster, conns) <- (RuntimeState e
 -> (EventFold ClusterName Peer e, Map Peer (Connection e)))
-> m (EventFold ClusterName Peer e, Map Peer (Connection e))
forall s (m :: * -> *) a. MonadState s m => (s -> a) -> m a
gets (RuntimeState e -> EventFold ClusterName Peer e
forall e. RuntimeState e -> EventFold ClusterName Peer e
rsClusterState (RuntimeState e -> EventFold ClusterName Peer e)
-> (RuntimeState e -> Map Peer (Connection e))
-> RuntimeState e
-> (EventFold ClusterName Peer e, Map Peer (Connection e))
forall b c c'. (b -> c) -> (b -> c') -> b -> (c, c')
forall (a :: * -> * -> *) b c c'.
Arrow a =>
a b c -> a b c' -> a b (c, c')
&&& RuntimeState e -> Map Peer (Connection e)
forall e. RuntimeState e -> Map Peer (Connection e)
rsConnections)
      let obsolete = Map Peer (Connection e) -> Set Peer
forall k a. Map k a -> Set k
Map.keysSet Map Peer (Connection e)
conns Set Peer -> Set Peer -> Set Peer
forall a. Ord a => Set a -> Set a -> Set a
\\ EventFold ClusterName Peer e -> Set Peer
forall p o e. Ord p => EventFold o p e -> Set p
EF.allParticipants EventFold ClusterName Peer e
cluster
      unless (Set.null obsolete) $
        logInfo $ "Disconnecting obsolete: " <> showt obsolete
      mapM_ disconnect obsolete


{- |
  Respond to event applications that are waiting on a consistent result,
  if such a result is available.
-}
respondToWaiting
  :: forall m e.
     ( MonadLoggerIO m
     , MonadState (RuntimeState e) m
     , Show (Output e)
     )
  => Map (EventId Peer) (Output e)
  -> m ()
respondToWaiting :: forall (m :: * -> *) e.
(MonadLoggerIO m, MonadState (RuntimeState e) m,
 Show (Output e)) =>
Map (EventId Peer) (Output e) -> m ()
respondToWaiting Map (EventId Peer) (Output e)
available = do
    rs <- m (RuntimeState e)
forall s (m :: * -> *). MonadState s m => m s
get
    logDebug
      $ "Responding to: " <> showt (available, Map.keysSet (rsWaiting rs))
    mapM_ respondToOne (Map.toList available)
  where
    respondToOne
      :: (EventId Peer, Output e)
      -> m ()
    respondToOne :: (EventId Peer, Output e) -> m ()
respondToOne (EventId Peer
sid, Output e
output) = do
      state@RuntimeState {rsWaiting} <- m (RuntimeState e)
forall s (m :: * -> *). MonadState s m => m s
get
      case Map.lookup sid rsWaiting of
        Maybe (Responder (Output e))
Nothing -> () -> m ()
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
        Just Responder (Output e)
responder -> do
          Responder (Output e) -> Output e -> m ()
forall (m :: * -> *) a. MonadIO m => Responder a -> a -> m ()
respond Responder (Output e)
responder Output e
output
          RuntimeState e -> m ()
forall s (m :: * -> *). MonadState s m => s -> m ()
put RuntimeState e
state {rsWaiting = Map.delete sid rsWaiting}


{- | This defines the various ways a node can be spun up. -}
data StartupMode e
  {- | Indicates that we should bootstrap a new cluster at startup. -}
  = NewCluster
      Peer {- ^ The peer being launched. -}
      ClusterName {- ^ The name of the cluster being launched. -}

  {- | Indicates that the node should try to join an existing cluster. -}
  | JoinCluster
      Peer {- ^ The peer being launched. -}
      ClusterName {- ^ The name of the cluster we are trying to join. -}
      Peer {- ^ The existing peer we are attempting to join with. -}

  {- | Resume operation given the previously saved state. -}
  | Recover
      Peer {- ^ The Peer being recovered. -}
      (EventFold ClusterName Peer e)
      {- ^ The last acknowledged state we had before we crashed. -}
deriving stock instance
    ( Show e
    , Show (Output e)
    , Show (State e)
    )
  =>
    Show (StartupMode e)


{- | Initialize the runtime state. -}
makeRuntimeState :: (EventConstraints e, MonadLoggerIO m)
  => (Peer -> EventFold ClusterName Peer e -> IO ())
     {- ^ Callback when the cluster-wide powerstate changes. -}
  -> StartupMode e
  -> m (RuntimeState e)

makeRuntimeState :: forall e (m :: * -> *).
(EventConstraints e, MonadLoggerIO m) =>
(Peer -> EventFold ClusterName Peer e -> IO ())
-> StartupMode e -> m (RuntimeState e)
makeRuntimeState
    Peer -> EventFold ClusterName Peer e -> IO ()
notify
    (NewCluster Peer
self ClusterName
clusterId)
  = do
    Text -> m ()
forall (m :: * -> *). (HasCallStack, MonadLogger m) => Text -> m ()
logInfo Text
"Starting a new cluster."
    {- Build a brand new node state, for the first node in a cluster. -}
    (Peer -> EventFold ClusterName Peer e -> IO ())
-> StartupMode e -> m (RuntimeState e)
forall e (m :: * -> *).
(EventConstraints e, MonadLoggerIO m) =>
(Peer -> EventFold ClusterName Peer e -> IO ())
-> StartupMode e -> m (RuntimeState e)
makeRuntimeState
      Peer -> EventFold ClusterName Peer e -> IO ()
notify
      (Peer -> EventFold ClusterName Peer e -> StartupMode e
forall e. Peer -> EventFold ClusterName Peer e -> StartupMode e
Recover Peer
self (ClusterName -> Peer -> EventFold ClusterName Peer e
forall o p e.
(Default (State e), Event p e, Ord p) =>
o -> p -> EventFold o p e
EF.new ClusterName
clusterId Peer
self))

makeRuntimeState
    Peer -> EventFold ClusterName Peer e -> IO ()
notify
    (JoinCluster Peer
self ClusterName
_clusterName Peer
targetPeer)
  = do
    {- Join a cluster an existing cluster. -}
    Text -> m ()
forall (m :: * -> *). (HasCallStack, MonadLogger m) => Text -> m ()
logInfo (Text -> m ()) -> Text -> m ()
forall a b. (a -> b) -> a -> b
$ Text
"Trying to join an existing cluster on " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> AddressDescription -> Text
forall a b. (Show a, IsString b) => a -> b
showt AddressDescription
addr
    JoinOk cluster <-
      JoinRequest -> m (JoinResponse e)
forall e (m :: * -> *).
(EventConstraints e, MonadLoggerIO m) =>
JoinRequest -> m (JoinResponse e)
requestJoin
      (JoinRequest -> m (JoinResponse e))
-> (Peer -> JoinRequest) -> Peer -> m (JoinResponse e)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Peer -> JoinRequest
JoinRequest
      (Peer -> m (JoinResponse e)) -> Peer -> m (JoinResponse e)
forall a b. (a -> b) -> a -> b
$ Peer
self
    logInfo $ "Join response with cluster: " <> showt cluster
    makeRuntimeState notify (Recover self cluster)
  where
    requestJoin :: (EventConstraints e, MonadLoggerIO m)
      => JoinRequest
      -> m (JoinResponse e)
    requestJoin :: forall e (m :: * -> *).
(EventConstraints e, MonadLoggerIO m) =>
JoinRequest -> m (JoinResponse e)
requestJoin JoinRequest
joinMsg = ((JoinRequest -> m (JoinResponse e))
-> JoinRequest -> m (JoinResponse e)
forall a b. (a -> b) -> a -> b
$ JoinRequest
joinMsg) ((JoinRequest -> m (JoinResponse e)) -> m (JoinResponse e))
-> m (JoinRequest -> m (JoinResponse e)) -> m (JoinResponse e)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< AddressDescription
-> Maybe ClientParams -> m (JoinRequest -> m (JoinResponse e))
forall (n :: * -> *) request (m :: * -> *) response.
(Binary request, Binary response, MonadIO m, MonadLoggerIO n,
 Show response) =>
AddressDescription
-> Maybe ClientParams -> n (request -> m response)
connectServer AddressDescription
addr Maybe ClientParams
forall a. Maybe a
Nothing

    addr :: AddressDescription
    addr :: AddressDescription
addr =
      Text -> AddressDescription
AddressDescription
        (
          Peer -> Text
unPeer Peer
targetPeer
          Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
":" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> PortNumber -> Text
forall a b. (Show a, IsString b) => a -> b
showt PortNumber
joinMessagePort
        )

makeRuntimeState
    Peer -> EventFold ClusterName Peer e -> IO ()
notify
    (Recover Peer
self EventFold ClusterName Peer e
clusterState)
  = do
    rsNextId <- m MessageId
forall (m :: * -> *). MonadIO m => m MessageId
newSequence
    return
      RuntimeState
        { rsSelf = self
        , rsClusterState = clusterState
        , rsConnections = mempty
        , rsWaiting = mempty
        , rsCalls = mempty
        , rsBroadcalls = mempty
        , rsNextId
        , rsNotify = notify self
        , rsJoins = mempty
        }


{- |
  Initialize a new sequence of messageIds. It would be perfectly fine to ensure
  unique message ids by generating a unique UUID for each one, but generating
  UUIDs is not free, and we are probably going to be generating a lot of these.
-}
newSequence :: (MonadIO m) => m MessageId
newSequence :: forall (m :: * -> *). MonadIO m => m MessageId
newSequence = do
    sid <- m UUID
forall (m :: * -> *). MonadIO m => m UUID
getUUID
    pure (M sid 0)
  where
    {- | A utility function that makes a UUID, no matter what.  -}
    getUUID :: (MonadIO m) => m UUID
    getUUID :: forall (m :: * -> *). MonadIO m => m UUID
getUUID = IO (Maybe UUID) -> m (Maybe UUID)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (Maybe UUID)
nextUUID m (Maybe UUID) -> (Maybe UUID -> m UUID) -> m UUID
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= m UUID -> (UUID -> m UUID) -> Maybe UUID -> m UUID
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (m ()
wait m () -> m UUID -> m UUID
forall a b. m a -> m b -> m b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> m UUID
forall (m :: * -> *). MonadIO m => m UUID
getUUID) UUID -> m UUID
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return
      where
        wait :: m ()
wait = IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Int -> IO ()
threadDelay Int
oneMillisecond)
        oneMillisecond :: Int
oneMillisecond = Int
1000


{- |
  Generate the next message id in the sequence. We would normally use
  `succ` for this kind of thing, but making `MessageId` an instance of
  `Enum` really isn't appropriate.
-}
nextMessageId :: MessageId -> MessageId
nextMessageId :: MessageId -> MessageId
nextMessageId (M UUID
sequenceId Word64
ord) = UUID -> Word64 -> MessageId
M UUID
sequenceId (Word64 -> Word64
forall a. Enum a => a -> a
succ Word64
ord)


{- | Obtain the 'ClusterName'. -}
getClusterName :: Runtime e -> ClusterName
getClusterName :: forall e. Runtime e -> ClusterName
getClusterName = Runtime e -> ClusterName
forall e. Runtime e -> ClusterName
rClusterId


{- | The join message port  -}
joinMessagePort :: PortNumber
joinMessagePort :: PortNumber
joinMessagePort = PortNumber
5289


{- | Like 'Fork.respond', but returns '()'. -}
respond :: (MonadIO m) => Responder a -> a -> m ()
respond :: forall (m :: * -> *) a. MonadIO m => Responder a -> a -> m ()
respond Responder a
responder = m Responded -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m Responded -> m ()) -> (a -> m Responded) -> a -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Responder a -> a -> m Responded
forall (m :: * -> *) a.
MonadIO m =>
Responder a -> a -> m Responded
Fork.respond Responder a
responder


{- |
  Retrieve some basic stats that can be used to intuit the health of
  the cluster.
-}
getStats :: (MonadIO m) => Runtime e -> m Stats
getStats :: forall (m :: * -> *) e. MonadIO m => Runtime e -> m Stats
getStats Runtime e
runtime = do
  now <- IO TimeSpec -> m TimeSpec
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO TimeSpec
forall (m :: * -> *). MonadTimeSpec m => m TimeSpec
getTime
  stats <- liftIO $ readIORef (rStats runtime)
  pure
    Stats
      { timeWithoutProgress = diffTimeSpec now <$> stats
      }