{-# LANGUAGE UndecidableInstances #-}
module Control.Eff.Concurrent.Protocol.Broker
( startLink
, statefulChild
, stopBroker
, isBrokerAlive
, monitorBroker
, getDiagnosticInfo
, spawnChild
, spawnOrLookup
, lookupChild
, callById
, castById
, stopChild
, ChildNotFound(..)
, Broker()
, Pdu(StartC, StopC, LookupC, GetDiagnosticInfo)
, ChildId
, Stateful.StartArgument(MkBrokerConfig)
, brokerConfigChildStopTimeout
, SpawnErr(AlreadyStarted)
, ChildEvent(OnChildSpawned, OnChildDown, OnBrokerShuttingDown)
) where
import Control.DeepSeq (NFData(rnf))
import Control.Eff as Eff
import Control.Eff.Concurrent.Protocol
import Control.Eff.Concurrent.Protocol.Client
import Control.Eff.Concurrent.Protocol.Wrapper
import Control.Eff.Concurrent.Protocol.Observer
import qualified Control.Eff.Concurrent.Protocol.EffectfulServer as Effectful
import qualified Control.Eff.Concurrent.Protocol.StatefulServer as Stateful
import Control.Eff.Concurrent.Protocol.Broker.InternalState
import Control.Eff.Concurrent.Process
import Control.Eff.Concurrent.Process.Timer
import Control.Eff.Extend (raise)
import Control.Eff.Log
import Control.Eff.State.Strict as Eff
import Control.Lens hiding ((.=), use)
import Data.Default
import Data.Dynamic
import Data.Foldable
import Data.Kind
import qualified Data.Map as Map
import Data.Text (Text, pack)
import Data.Type.Pretty
import GHC.Generics (Generic)
import GHC.Stack
import Control.Applicative ((<|>))
startLink
:: forall p e
. ( HasCallStack
, IoLogging (Processes e)
, TangibleBroker p
, Stateful.Server (Broker p) (Processes e)
)
=> Stateful.StartArgument (Broker p)
-> Eff (Processes e) (Endpoint (Broker p))
startLink = Stateful.startLink
statefulChild
:: forall p e
. ( HasCallStack
, IoLogging e
, TangibleBroker (Stateful.Stateful p)
, Stateful.Server (Broker (Stateful.Stateful p)) e
)
=> Timeout
-> (ChildId p -> Stateful.StartArgument p)
-> Stateful.StartArgument (Broker (Stateful.Stateful p))
statefulChild t f = MkBrokerConfig t (Stateful.Init . f)
stopBroker
:: ( HasCallStack
, HasProcesses e q
, Member Logs e
, Lifted IO e
, TangibleBroker p
)
=> Endpoint (Broker p)
-> Eff e ()
stopBroker ep = do
logInfo ("stopping broker: " <> pack (show ep))
mr <- monitor (_fromEndpoint ep)
sendInterrupt (_fromEndpoint ep) NormalExitRequested
r <- receiveSelectedMessage (selectProcessDown mr)
logInfo ("broker stopped: " <> pack (show ep) <> " " <> pack (show r))
isBrokerAlive
:: forall p q0 e .
( HasCallStack
, Member Logs e
, Typeable p
, HasProcesses e q0
)
=> Endpoint (Broker p)
-> Eff e Bool
isBrokerAlive x = isProcessAlive (_fromEndpoint x)
monitorBroker
:: forall p q0 e .
( HasCallStack
, Member Logs e
, HasProcesses e q0
, TangibleBroker p
)
=> Endpoint (Broker p)
-> Eff e MonitorReference
monitorBroker x = monitor (_fromEndpoint x)
spawnChild
:: forall p q0 e .
( HasCallStack
, Member Logs e
, HasProcesses e q0
, TangibleBroker p
, Typeable (Effectful.ServerPdu p)
)
=> Endpoint (Broker p)
-> ChildId p
-> Eff e (Either (SpawnErr p) (Endpoint (Effectful.ServerPdu p)))
spawnChild ep cId = call ep (StartC cId)
spawnOrLookup
:: forall p q0 e .
( HasCallStack
, Member Logs e
, HasProcesses e q0
, TangibleBroker p
, Typeable (Effectful.ServerPdu p)
)
=> Endpoint (Broker p)
-> ChildId p
-> Eff e (Endpoint (Effectful.ServerPdu p))
spawnOrLookup supEp cId =
do res <- spawnChild supEp cId
pure $
case res of
Left (AlreadyStarted _ ep) -> ep
Right ep -> ep
lookupChild ::
forall p e q0 .
( HasCallStack
, Member Logs e
, HasProcesses e q0
, TangibleBroker p
, Typeable (Effectful.ServerPdu p)
)
=> Endpoint (Broker p)
-> ChildId p
-> Eff e (Maybe (Endpoint (Effectful.ServerPdu p)))
lookupChild ep cId = call ep (LookupC @p cId)
stopChild ::
forall p e q0 .
( HasCallStack
, Member Logs e
, HasProcesses e q0
, TangibleBroker p
)
=> Endpoint (Broker p)
-> ChildId p
-> Eff e Bool
stopChild ep cId = call ep (StopC @p cId (TimeoutMicros 4000000))
callById ::
forall destination protocol result e q0 .
( HasCallStack
, Member Logs e
, HasProcesses e q0
, Lifted IO e
, Lifted IO q0
, TangibleBroker protocol
, TangiblePdu destination ( 'Synchronous result)
, TangiblePdu protocol ( 'Synchronous result)
, Embeds (Effectful.ServerPdu destination) protocol
, Ord (ChildId destination)
, Tangible (ChildId destination)
, Typeable (Effectful.ServerPdu destination)
, Tangible result
, NFData (Pdu protocol ( 'Synchronous result))
, NFData (Pdu (Effectful.ServerPdu destination) ( 'Synchronous result))
, Show (Pdu (Effectful.ServerPdu destination) ( 'Synchronous result))
)
=> Endpoint (Broker destination)
-> ChildId destination
-> Pdu protocol ( 'Synchronous result)
-> Timeout
-> Eff e result
callById broker cId pdu tMax =
lookupChild broker cId
>>=
maybe
(do logError ("callById failed for: " <> pack (show pdu))
interrupt (InterruptedBy (ChildNotFound cId broker))
)
(\cEp -> callWithTimeout cEp pdu tMax)
data ChildNotFound child where
ChildNotFound :: ChildId child -> Endpoint (Broker child) -> ChildNotFound child
deriving (Typeable)
instance (Show (ChildId child), Typeable child) => Show (ChildNotFound child) where
showsPrec d (ChildNotFound cId broker)
= showParen (d >= 10) ( showString "child not found: "
. showsPrec 10 cId
. showString " at: "
. showsPrec 10 broker
)
instance NFData (ChildId c) => NFData (ChildNotFound c) where
rnf (ChildNotFound cId broker) = rnf cId `seq` rnf broker `seq` ()
castById ::
forall destination protocol e q0 .
( HasCallStack
, Member Logs e
, HasProcesses e q0
, TangibleBroker protocol
, TangiblePdu destination 'Asynchronous
, TangiblePdu protocol 'Asynchronous
)
=> Endpoint (Broker destination)
-> ChildId destination
-> Pdu protocol 'Asynchronous
-> Eff e ()
castById = error "TODO"
getDiagnosticInfo
:: forall p e q0 .
( HasCallStack
, HasProcesses e q0
, TangibleBroker p
)
=> Endpoint (Broker p)
-> Eff e Text
getDiagnosticInfo s = call s (GetDiagnosticInfo @p)
data Broker (p :: Type) deriving Typeable
instance Typeable p => HasPdu (Broker p) where
type instance EmbeddedPduList (Broker p) = '[ObserverRegistry (ChildEvent p)]
data instance Pdu (Broker p) r where
StartC :: ChildId p -> Pdu (Broker p) ('Synchronous (Either (SpawnErr p) (Endpoint (Effectful.ServerPdu p))))
StopC :: ChildId p -> Timeout -> Pdu (Broker p) ('Synchronous Bool)
LookupC :: ChildId p -> Pdu (Broker p) ('Synchronous (Maybe (Endpoint (Effectful.ServerPdu p))))
GetDiagnosticInfo :: Pdu (Broker p) ('Synchronous Text)
ChildEventObserverRegistry :: Pdu (ObserverRegistry (ChildEvent p)) r -> Pdu (Broker p) r
deriving Typeable
instance (Typeable p, Show (ChildId p)) => Show (Pdu (Broker p) r) where
showsPrec d (StartC c) = showParen (d >= 10) (showString "StartC " . showsPrec 10 c)
showsPrec d (StopC c t) = showParen (d >= 10) (showString "StopC " . showsPrec 10 c . showChar ' ' . showsPrec 10 t)
showsPrec d (LookupC c) = showParen (d >= 10) (showString "LookupC " . showsPrec 10 c)
showsPrec _ GetDiagnosticInfo = showString "GetDiagnosticInfo"
showsPrec d (ChildEventObserverRegistry c) = showParen (d >= 10) (showString "ChildEventObserverRegistry " . showsPrec 10 c)
instance (NFData (ChildId p)) => NFData (Pdu (Broker p) r) where
rnf (StartC ci) = rnf ci
rnf (StopC ci t) = rnf ci `seq` rnf t
rnf (LookupC ci) = rnf ci
rnf GetDiagnosticInfo = ()
rnf (ChildEventObserverRegistry x) = rnf x
instance Typeable p => HasPduPrism (Broker p) (ObserverRegistry (ChildEvent p)) where
embedPdu = ChildEventObserverRegistry
fromPdu (ChildEventObserverRegistry x) = Just x
fromPdu _ = Nothing
type instance ToPretty (Broker p) = "broker" <:> ToPretty p
data ChildEvent p where
OnChildSpawned :: Endpoint (Broker p) -> ChildId p -> Endpoint (Effectful.ServerPdu p) -> ChildEvent p
OnChildDown :: Endpoint (Broker p) -> ChildId p -> Endpoint (Effectful.ServerPdu p) -> Interrupt 'NoRecovery -> ChildEvent p
OnBrokerShuttingDown :: Endpoint (Broker p) -> ChildEvent p
deriving (Typeable, Generic)
instance (NFData (ChildId p)) => NFData (ChildEvent p)
instance (Typeable p, Typeable (Effectful.ServerPdu p), Show (ChildId p)) => Show (ChildEvent p) where
showsPrec d x =
case x of
OnChildSpawned s i e ->
showParen (d >= 10)
(shows s . showString ": child-spawned: " . shows i . showChar ' ' . shows e)
OnChildDown s i e r ->
showParen (d >= 10)
(shows s . showString ": child-down: " . shows i . showChar ' ' . shows e . showChar ' ' . showsPrec 10 r)
OnBrokerShuttingDown s ->
shows s . showString ": shutting down"
type family ChildId p
type instance ChildId (Stateful.Stateful p) = ChildId p
type TangibleBroker p =
( Tangible (ChildId p)
, Ord (ChildId p)
, Typeable p
)
instance
( IoLogging q
, TangibleBroker p
, Tangible (ChildId p)
, Typeable (Effectful.ServerPdu p)
, Effectful.Server p (Processes q)
, HasProcesses (Effectful.ServerEffects p (Processes q)) q
) => Stateful.Server (Broker p) (Processes q) where
data instance StartArgument (Broker p) = MkBrokerConfig
{
brokerConfigChildStopTimeout :: Timeout
, brokerConfigStartFun :: ChildId p -> Effectful.Init p
}
data instance Model (Broker p) =
BrokerModel { _children :: Children (ChildId p) p
, _childEventObserver :: ObserverRegistry (ChildEvent p)
}
deriving Typeable
setup _ _cfg = pure (BrokerModel def emptyObserverRegistry, ())
update _ _brokerConfig (Stateful.OnCast req) =
case req of
ChildEventObserverRegistry x ->
Stateful.zoomModel @(Broker p) childEventObserverLens (observerRegistryHandlePdu x)
update me brokerConfig (Stateful.OnCall rt req) =
case req of
ChildEventObserverRegistry x ->
logEmergency ("unreachable: " <> pack (show x))
GetDiagnosticInfo -> zoomToChildren @p $ do
p <- (pack . show <$> getChildren @(ChildId p) @p)
sendReply rt p
LookupC i -> zoomToChildren @p $ do
p <- fmap (view childEndpoint) <$> lookupChildById @(ChildId p) @p i
sendReply rt p
StopC i t -> zoomToChildren @p $ do
mExisting <- lookupAndRemoveChildById @(ChildId p) @p i
case mExisting of
Nothing -> sendReply rt False
Just existingChild -> do
reason <- stopOrKillChild i existingChild t
sendReply rt True
Stateful.zoomModel @(Broker p)
childEventObserverLens
(observerRegistryNotify @(ChildEvent p) (OnChildDown me i (existingChild^.childEndpoint) reason))
StartC i -> do
mExisting <- zoomToChildren @p $ lookupChildById @(ChildId p) @p i
case mExisting of
Nothing -> do
childEp <- raise (raise (Effectful.startLink (brokerConfigStartFun brokerConfig i)))
let childPid = _fromEndpoint childEp
cMon <- monitor childPid
zoomToChildren @p $ putChild i (MkChild @p childEp cMon)
sendReply rt (Right childEp)
Stateful.zoomModel @(Broker p)
childEventObserverLens
(observerRegistryNotify @(ChildEvent p) (OnChildSpawned me i childEp))
Just existingChild ->
sendReply rt (Left (AlreadyStarted i (existingChild ^. childEndpoint)))
update me _brokerConfig (Stateful.OnDown pd) = do
wasObserver <- Stateful.zoomModel @(Broker p)
childEventObserverLens
(observerRegistryRemoveProcess @(ChildEvent p) (downProcess pd))
if wasObserver
then logInfo ("observer process died: " <> pack (show pd))
else do
oldEntry <- zoomToChildren @p $ lookupAndRemoveChildByMonitor @(ChildId p) @p (downReference pd)
case oldEntry of
Nothing -> logWarning ("unexpected: " <> pack (show pd))
Just (i, c) -> do
logInfo ( pack (show pd)
<> " for child "
<> pack (show i)
<> " => "
<> pack (show (c^.childEndpoint))
)
Stateful.zoomModel @(Broker p)
childEventObserverLens
(observerRegistryNotify @(ChildEvent p) (OnChildDown me i (c^.childEndpoint) (downReason pd)))
update me brokerConfig (Stateful.OnInterrupt e) =
case e of
NormalExitRequested -> do
logDebug ("broker stopping: " <> pack (show e))
Stateful.zoomModel @(Broker p)
childEventObserverLens
(observerRegistryNotify @(ChildEvent p) (OnBrokerShuttingDown me))
stopAllChildren @p me (brokerConfigChildStopTimeout brokerConfig)
exitNormally
LinkedProcessCrashed linked ->
logNotice (pack (show linked))
_ -> do
logWarning ("broker interrupted: " <> pack (show e))
Stateful.zoomModel @(Broker p)
childEventObserverLens
(observerRegistryNotify @(ChildEvent p) (OnBrokerShuttingDown me))
stopAllChildren @p me (brokerConfigChildStopTimeout brokerConfig)
exitBecause (interruptToExit e)
update _ _brokerConfig o = logWarning ("unexpected: " <> pack (show o))
zoomToChildren :: forall p c e
. Member (Stateful.ModelState (Broker p)) e
=> Eff (State (Children (ChildId p) p) ': e) c
-> Eff e c
zoomToChildren = Stateful.zoomModel @(Broker p) childrenLens
childrenLens :: Lens' (Stateful.Model (Broker p)) (Children (ChildId p) p)
childrenLens = lens _children (\(BrokerModel _ o) c -> BrokerModel c o)
childEventObserverLens :: Lens' (Stateful.Model (Broker p)) (ObserverRegistry (ChildEvent p))
childEventObserverLens = lens _childEventObserver (\(BrokerModel o _) c -> BrokerModel o c)
data SpawnErr p = AlreadyStarted (ChildId p) (Endpoint (Effectful.ServerPdu p))
deriving (Typeable, Generic)
deriving instance Eq (ChildId p) => Eq (SpawnErr p)
deriving instance Ord (ChildId p) => Ord (SpawnErr p)
deriving instance (Typeable (Effectful.ServerPdu p), Show (ChildId p)) => Show (SpawnErr p)
instance NFData (ChildId p) => NFData (SpawnErr p)
stopOrKillChild
:: forall p e q0 .
( HasCallStack
, HasProcesses e q0
, Lifted IO e
, Lifted IO q0
, Member Logs e
, Member (Stateful.ModelState (Broker p)) e
, TangibleBroker p
, Typeable (Effectful.ServerPdu p)
)
=> ChildId p
-> Child p
-> Timeout
-> Eff e (Interrupt 'NoRecovery)
stopOrKillChild cId c stopTimeout =
do
broker <- asEndpoint @(Broker p) <$> self
t <- startTimerWithTitle
(MkProcessTitle ("child-exit-timer-" <> pack (show broker) <> "-" <> pack (show cId)))
stopTimeout
sendInterrupt (_fromEndpoint (c^.childEndpoint)) NormalExitRequested
r1 <- receiveSelectedMessage ( Right <$> selectProcessDown (c^.childMonitoring)
<|> Left <$> selectTimerElapsed t )
demonitor (c^.childMonitoring)
unlinkProcess (_fromEndpoint (c^.childEndpoint))
case r1 of
Left timerElapsed -> do
logWarning (pack (show timerElapsed) <> ": child "<> pack (show cId) <>" => " <> pack(show (c^.childEndpoint)) <>" did not shutdown in time")
let reason = interruptToExit (TimeoutInterrupt
("child did not shut down in time and was terminated by the "
++ show broker))
sendShutdown (_fromEndpoint (c^.childEndpoint)) reason
return reason
Right downMsg -> do
logInfo ("child "<> pack (show cId) <>" => " <> pack(show (c^.childEndpoint)) <>" terminated: " <> pack (show (downReason downMsg)))
return (downReason downMsg)
stopAllChildren
:: forall p e q0 .
( HasCallStack
, HasProcesses e q0
, Lifted IO e
, Lifted IO q0
, Member Logs e
, Member (Stateful.ModelState (Broker p)) e
, TangibleBroker p
, Typeable (Effectful.ServerPdu p)
)
=> Endpoint (Broker p) -> Timeout -> Eff e ()
stopAllChildren me stopTimeout =
zoomToChildren @p
(removeAllChildren @(ChildId p) @p)
>>= pure . Map.assocs
>>= traverse_ killAndNotify
where
killAndNotify (cId, c) = do
reason <- provideInterrupts (stopOrKillChild cId c stopTimeout) >>= either crash return
Stateful.zoomModel @(Broker p)
childEventObserverLens
(observerRegistryNotify @(ChildEvent p) (OnChildDown me cId (c^.childEndpoint) reason))
where
crash e = do
logError (pack (show e) <> " while stopping child: " <> pack (show cId) <> " " <> pack (show c))
return (interruptToExit e)