{-# LANGUAGE UndecidableInstances #-}
module Control.Eff.Concurrent.Protocol.Supervisor
( Sup()
, ChildId
, StartArgument(MkSupConfig)
, supConfigChildStopTimeout
, SpawnErr(AlreadyStarted)
, startSupervisor
, stopSupervisor
, isSupervisorAlive
, monitorSupervisor
, getDiagnosticInfo
, spawnChild
, lookupChild
, stopChild
) where
import Control.DeepSeq (NFData(rnf))
import Control.Eff as Eff
import Control.Eff.Concurrent.Protocol
import Control.Eff.Concurrent.Protocol.Client
import Control.Eff.Concurrent.Protocol.Wrapper
import Control.Eff.Concurrent.Protocol.StatefulServer as Server
import Control.Eff.Concurrent.Protocol.Supervisor.InternalState
import Control.Eff.Concurrent.Process
import Control.Eff.Concurrent.Process.Timer
import Control.Eff.Extend (raise)
import Control.Eff.Log
import Control.Eff.State.Strict as Eff
import Control.Lens hiding ((.=), use)
import Data.Default
import Data.Dynamic
import Data.Foldable
import Data.Kind
import qualified Data.Map as Map
import Data.Text (Text, pack)
import Data.Type.Pretty
import GHC.Generics (Generic)
import GHC.Stack
import Control.Applicative ((<|>))
data Sup (p :: Type) deriving Typeable
instance Typeable p => HasPdu (Sup p) where
data instance Pdu (Sup p) r where
StartC :: ChildId p -> Pdu (Sup p) ('Synchronous (Either (SpawnErr p) (Endpoint (Protocol p))))
StopC :: ChildId p -> Timeout -> Pdu (Sup p) ('Synchronous Bool)
LookupC :: ChildId p -> Pdu (Sup p) ('Synchronous (Maybe (Endpoint (Protocol p))))
GetDiagnosticInfo :: Pdu (Sup p) ('Synchronous Text)
deriving Typeable
instance (Show (ChildId p)) => Show (Pdu (Sup p) ('Synchronous r)) where
showsPrec d (StartC c) = showParen (d >= 10) (showString "StartC " . showsPrec 10 c)
showsPrec d (StopC c t) = showParen (d >= 10) (showString "StopC " . showsPrec 10 c . showChar ' ' . showsPrec 10 t)
showsPrec d (LookupC c) = showParen (d >= 10) (showString "LookupC " . showsPrec 10 c)
showsPrec _ GetDiagnosticInfo = showString "GetDiagnosticInfo"
instance (NFData (ChildId p)) => NFData (Pdu (Sup p) ('Synchronous r)) where
rnf (StartC ci) = rnf ci
rnf (StopC ci t) = rnf ci `seq` rnf t
rnf (LookupC ci) = rnf ci
rnf GetDiagnosticInfo = ()
type instance ToPretty (Sup p) = "supervisor" <:> ToPretty p
type family ChildId p
type TangibleSup p =
( Tangible (ChildId p)
, Ord (ChildId p)
, Typeable p
)
instance
( LogIo q
, TangibleSup p
, Tangible (ChildId p)
, Server p (Processes q)
) => Server (Sup p) (Processes q) where
data StartArgument (Sup p) (Processes q) = MkSupConfig
{
supConfigChildStopTimeout :: Timeout
, supConfigStartFun :: ChildId p -> Server.StartArgument p (Processes q)
}
type Model (Sup p) = Children (ChildId p) p
setup _ _cfg = pure (def, ())
update _ supConfig (OnCall rt req) =
case req of
GetDiagnosticInfo -> do
p <- (pack . show <$> getChildren @(ChildId p) @p)
sendReply rt p
LookupC i -> do
p <- fmap _childEndpoint <$> lookupChildById @(ChildId p) @p i
sendReply rt p
StopC i t -> do
mExisting <- lookupAndRemoveChildById @(ChildId p) @p i
case mExisting of
Nothing -> sendReply rt False
Just existingChild -> do
stopOrKillChild i existingChild t
sendReply rt True
StartC i -> do
mExisting <- lookupChildById @(ChildId p) @p i
case mExisting of
Nothing -> do
childEp <- raise (raise (Server.start (supConfigStartFun supConfig i)))
let childPid = _fromEndpoint childEp
cMon <- monitor childPid
putChild i (MkChild @p childEp cMon)
sendReply rt (Right childEp)
Just existingChild ->
sendReply rt (Left (AlreadyStarted i (existingChild ^. childEndpoint)))
update _ _supConfig (OnDown pd) = do
oldEntry <- lookupAndRemoveChildByMonitor @(ChildId p) @p (downReference pd)
case oldEntry of
Nothing -> logWarning ("unexpected: " <> pack (show pd))
Just (i, c) -> do
logInfo ( pack (show pd)
<> " for child "
<> pack (show i)
<> " => "
<> pack (show (_childEndpoint c))
)
update _ supConfig (OnInterrupt e) = do
let (logSev, exitReason) =
case e of
NormalExitRequested ->
(debugSeverity, ExitNormally)
_ ->
(warningSeverity, interruptToExit (ErrorInterrupt ("supervisor interrupted: " <> show e)))
stopAllChildren @p (supConfigChildStopTimeout supConfig)
logWithSeverity logSev ("supervisor stopping: " <> pack (show e))
exitBecause exitReason
update _ _supConfig o = logWarning ("unexpected: " <> pack (show o))
data SpawnErr p = AlreadyStarted (ChildId p) (Endpoint (Protocol p))
deriving (Typeable, Generic)
deriving instance Eq (ChildId p) => Eq (SpawnErr p)
deriving instance Ord (ChildId p) => Ord (SpawnErr p)
deriving instance (Typeable (Protocol p), Show (ChildId p)) => Show (SpawnErr p)
instance NFData (ChildId p) => NFData (SpawnErr p)
startSupervisor
:: forall p e
. ( HasCallStack
, LogsTo IO (Processes e)
, Lifted IO e
, TangibleSup p
, Server (Sup p) (Processes e)
)
=> StartArgument (Sup p) (Processes e)
-> Eff (Processes e) (Endpoint (Sup p))
startSupervisor = Server.start
stopSupervisor
:: ( HasCallStack
, HasProcesses e q
, Member Logs e
, Lifted IO e
, TangibleSup p
)
=> Endpoint (Sup p)
-> Eff e ()
stopSupervisor ep = do
logInfo ("stopping supervisor: " <> pack (show ep))
mr <- monitor (_fromEndpoint ep)
sendInterrupt (_fromEndpoint ep) NormalExitRequested
r <- receiveSelectedMessage (selectProcessDown mr)
logInfo ("supervisor stopped: " <> pack (show ep) <> " " <> pack (show r))
isSupervisorAlive
:: forall p q0 e .
( HasCallStack
, Member Logs e
, Typeable p
, HasProcesses e q0
)
=> Endpoint (Sup p)
-> Eff e Bool
isSupervisorAlive x = isProcessAlive (_fromEndpoint x)
monitorSupervisor
:: forall p q0 e .
( HasCallStack
, Member Logs e
, HasProcesses e q0
, TangibleSup p
)
=> Endpoint (Sup p)
-> Eff e MonitorReference
monitorSupervisor x = monitor (_fromEndpoint x)
spawnChild
:: forall p q0 e .
( HasCallStack
, Member Logs e
, HasProcesses e q0
, TangibleSup p
, Typeable (Protocol p)
)
=> Endpoint (Sup p)
-> ChildId p
-> Eff e (Either (SpawnErr p) (Endpoint (Protocol p)))
spawnChild ep cId = call ep (StartC cId)
lookupChild ::
forall p e q0 .
( HasCallStack
, Member Logs e
, HasProcesses e q0
, TangibleSup p
, Typeable (Protocol p)
)
=> Endpoint (Sup p)
-> ChildId p
-> Eff e (Maybe (Endpoint (Protocol p)))
lookupChild ep cId = call ep (LookupC @p cId)
stopChild ::
forall p e q0 .
( HasCallStack
, Member Logs e
, HasProcesses e q0
, TangibleSup p
)
=> Endpoint (Sup p)
-> ChildId p
-> Eff e Bool
stopChild ep cId = call ep (StopC @p cId (TimeoutMicros 4000000))
getDiagnosticInfo
:: forall p e q0 .
( HasCallStack
, HasProcesses e q0
, TangibleSup p
)
=> Endpoint (Sup p)
-> Eff e Text
getDiagnosticInfo s = call s (GetDiagnosticInfo @p)
stopOrKillChild
:: forall p e q0 .
( HasCallStack
, HasProcesses e q0
, Lifted IO e
, Lifted IO q0
, Member Logs e
, Member (State (Children (ChildId p) p)) e
, TangibleSup p
, Typeable (Protocol p)
)
=> ChildId p
-> Child p
-> Timeout
-> Eff e ()
stopOrKillChild cId c stopTimeout =
do
sup <- asEndpoint @(Sup p) <$> self
t <- startTimer stopTimeout
sendInterrupt (_fromEndpoint (c^.childEndpoint)) NormalExitRequested
r1 <- receiveSelectedMessage ( Right <$> selectProcessDown (c^.childMonitoring)
<|> Left <$> selectTimerElapsed t )
case r1 of
Left timerElapsed -> do
logWarning (pack (show timerElapsed) <> ": child "<> pack (show cId) <>" => " <> pack(show (c^.childEndpoint)) <>" did not shutdown in time")
sendShutdown
(_fromEndpoint (c^.childEndpoint))
(interruptToExit
(TimeoutInterrupt
("child did not shut down in time and was terminated by the "
++ show sup)))
Right downMsg ->
logInfo ("child "<> pack (show cId) <>" => " <> pack(show (c^.childEndpoint)) <>" terminated: " <> pack (show (downReason downMsg)))
stopAllChildren
:: forall p e q0 .
( HasCallStack
, HasProcesses e q0
, Lifted IO e
, Lifted IO q0
, Member Logs e
, Member (State (Children (ChildId p) p)) e
, TangibleSup p
, Typeable (Protocol p)
)
=> Timeout -> Eff e ()
stopAllChildren stopTimeout = removeAllChildren @(ChildId p) @p >>= pure . Map.assocs >>= traverse_ xxx
where
xxx (cId, c) = provideInterrupts (stopOrKillChild cId c stopTimeout) >>= either crash return
where
crash e = do
logError (pack (show e) <> " while stopping child: " <> pack (show cId) <> " " <> pack (show c))