{-# LANGUAGE UndecidableInstances #-}
module Control.Eff.Concurrent.Api.Supervisor
( Sup()
, SpawnFun
, SupConfig(MkSupConfig)
, supConfigChildStopTimeout
, supConfigSpawnFun
, SpawnErr(AlreadyStarted)
, startSupervisor
, stopSupervisor
, isSupervisorAlive
, monitorSupervisor
, getDiagnosticInfo
, spawnChild
, lookupChild
, stopChild
) where
import Control.DeepSeq (NFData(rnf))
import Control.Eff as Eff
import Control.Eff.Concurrent.Api
import Control.Eff.Concurrent.Api.Client
import Control.Eff.Concurrent.Api.Server
import Control.Eff.Concurrent.Api.Supervisor.InternalState
import Control.Eff.Concurrent.Process
import Control.Eff.Concurrent.Process.Timer
import Control.Eff.Extend (raise)
import Control.Eff.Log
import Control.Eff.State.Strict as Eff
import Control.Lens hiding ((.=), use)
import Data.Default
import Data.Dynamic
import Data.Foldable
import qualified Data.Map as Map
import Data.Text (Text, pack)
import Data.Type.Pretty
import GHC.Generics (Generic)
import GHC.Stack
import Control.Applicative ((<|>))
type SpawnFun i e o = i -> Eff e (o, ProcessId)
data SupConfig i e o =
MkSupConfig
{ _supConfigSpawnFun :: SpawnFun i e o
, _supConfigChildStopTimeout :: Timeout
}
makeLenses ''SupConfig
newtype Sup childId spawnResult =
MkSup (Server (Sup childId spawnResult))
deriving (Ord, Eq, Typeable, NFData)
instance (PrettyTypeShow (ToPretty childId), PrettyTypeShow (ToPretty spawnResult))
=> Show (Sup childId spawnResult) where
showsPrec d (MkSup svr) = showsPrec d svr
data instance Api (Sup i o) r where
StartC :: i -> Api (Sup i o) ('Synchronous (Either (SpawnErr i o) o))
StopC :: i -> Timeout -> Api (Sup i o) ('Synchronous Bool)
LookupC :: i -> Api (Sup i o) ('Synchronous (Maybe o))
GetDiagnosticInfo :: Api (Sup i o) ('Synchronous Text)
deriving Typeable
type instance ToPretty (Sup i o) =
PutStr "supervisor{" <++> ToPretty i <+> PutStr "=>" <+> ToPretty o <++> PutStr "}"
instance (Show i) => Show (Api (Sup i o) ('Synchronous r)) where
showsPrec d (StartC c) = showParen (d >= 10) (showString "StartC " . showsPrec 10 c)
showsPrec d (StopC c t) = showParen (d >= 10) (showString "StopC " . showsPrec 10 c . showChar ' ' . showsPrec 10 t)
showsPrec d (LookupC c) = showParen (d >= 10) (showString "LookupC " . showsPrec 10 c)
showsPrec _ GetDiagnosticInfo = showString "GetDiagnosticInfo"
instance (NFData i) => NFData (Api (Sup i o) ('Synchronous r)) where
rnf (StartC ci) = rnf ci
rnf (StopC ci t) = rnf ci `seq` rnf t
rnf (LookupC ci) = rnf ci
rnf GetDiagnosticInfo = ()
data SpawnErr i o
= AlreadyStarted i
o
deriving (Eq, Ord, Show, Typeable, Generic)
instance (NFData i, NFData o) => NFData (SpawnErr i o)
startSupervisor
:: forall i e o
. ( HasCallStack
, Member Logs e
, Lifted IO e
, Ord i
, Tangible i
, Tangible o
)
=> SupConfig i (InterruptableProcess e) o
-> Eff (InterruptableProcess e) (Sup i o)
startSupervisor supConfig = do
(_sup, supPid) <- spawnApiServerStateful initChildren (onRequest ^: onMessage) onInterrupt
return (mkSup supPid)
where
mkSup supPid = MkSup (asServer supPid)
initChildren :: Eff (InterruptableProcess e) (Children i o)
initChildren = return def
onRequest :: MessageCallback (Sup i o) (State (Children i o) ': InterruptableProcess e)
onRequest = handleCalls onCall
where
onCall ::
Api (Sup i o) ('Synchronous reply)
-> (Eff (State (Children i o) ': InterruptableProcess e) (Maybe reply, CallbackResult 'Recoverable) -> st)
-> st
onCall GetDiagnosticInfo k = k ((, AwaitNext) . Just . pack . show <$> getChildren @i @o)
onCall (LookupC i) k = k ((, AwaitNext) . Just . fmap _childOutput <$> lookupChildById @i @o i)
onCall (StopC i t) k =
k $ do
mExisting <- lookupAndRemoveChildById @i @o i
case mExisting of
Nothing -> do
return (Just False, AwaitNext)
Just existingChild -> do
stopOrKillChild i existingChild t
return (Just True, AwaitNext)
onCall (StartC i) k =
k $ do
(o, cPid) <- raise ((supConfig ^. supConfigSpawnFun) i)
cMon <- monitor cPid
mExisting <- lookupChildById i
case mExisting of
Nothing -> do
putChild i (MkChild o cPid cMon)
return (Just (Right o), AwaitNext)
Just existingChild ->
return (Just (Left (AlreadyStarted i (existingChild ^. childOutput))), AwaitNext)
onMessage :: MessageCallback '[] (State (Children i o) ': InterruptableProcess e)
onMessage = handleProcessDowns onDown <> handleAnyMessages onInfo
where
onDown mrChild = do
oldEntry <- lookupAndRemoveChildByMonitor @i @o mrChild
case oldEntry of
Nothing ->
logWarning ("unexpected process down: " <> pack (show mrChild))
Just (i, c) -> do
logInfo ( "process down: "
<> pack (show mrChild)
<> " for child "
<> pack (show i)
<> " => "
<> pack (show (_childOutput c))
)
return AwaitNext
onInfo d = do
logInfo ("unexpected message received: " <> pack (show d))
return AwaitNext
onInterrupt :: InterruptCallback (State (Children i o) ': ConsProcess e)
onInterrupt =
InterruptCallback $ \e -> do
let (logSev, exitReason) =
case e of
NormalExitRequested ->
(debugSeverity, ExitNormally)
_ ->
(warningSeverity, interruptToExit (ErrorInterrupt ("supervisor interrupted: " <> show e)))
stopAllChildren @i @o (supConfig ^. supConfigChildStopTimeout)
logWithSeverity logSev ("supervisor stopping: " <> pack (show e))
pure (StopServer exitReason)
stopSupervisor
:: ( HasCallStack
, Member Interrupts e
, SetMember Process (Process q0) e
, Member Logs e
, Lifted IO e
, Ord i
, Tangible i
, Tangible o
)
=> Sup i o
-> Eff e ()
stopSupervisor (MkSup svr) = do
logInfo ("stopping supervisor: " <> pack (show svr))
mr <- monitor (_fromServer svr)
sendInterrupt (_fromServer svr) NormalExitRequested
r <- receiveSelectedMessage (selectProcessDown mr)
logInfo ("supervisor stopped: " <> pack (show svr) <> " " <> pack (show r))
isSupervisorAlive
:: ( HasCallStack
, Member Interrupts e
, Member Logs e
, Typeable i
, Typeable o
, NFData i
, NFData o
, Show i
, Show o
, SetMember Process (Process q0) e
)
=> Sup i o
-> Eff e Bool
isSupervisorAlive (MkSup x) = isProcessAlive (_fromServer x)
monitorSupervisor
:: ( HasCallStack
, Member Interrupts e
, Member Logs e
, Typeable i
, Typeable o
, NFData i
, NFData o
, Show i
, Show o
, SetMember Process (Process q0) e
)
=> Sup i o
-> Eff e MonitorReference
monitorSupervisor (MkSup x) = monitor (_fromServer x)
spawnChild ::
( HasCallStack
, Member Interrupts e
, Member Logs e
, Ord i
, Tangible i
, Tangible o
, SetMember Process (Process q0) e
)
=> Sup i o
-> i
-> Eff e (Either (SpawnErr i o) o)
spawnChild (MkSup svr) cId = call svr (StartC cId)
lookupChild ::
( HasCallStack
, Member Interrupts e
, Member Logs e
, Ord i
, Tangible i
, Tangible o
, SetMember Process (Process q0) e
)
=> Sup i o
-> i
-> Eff e (Maybe o)
lookupChild (MkSup svr) cId = call svr (LookupC cId)
stopChild ::
( HasCallStack
, Member Interrupts e
, Member Logs e
, Ord i
, Tangible i
, Tangible o
, SetMember Process (Process q0) e
)
=> Sup i o
-> i
-> Eff e Bool
stopChild (MkSup svr) cId = call svr (StopC cId (TimeoutMicros 4000000))
getDiagnosticInfo
:: ( Ord i
, Tangible i
, Tangible o
, Typeable e
, HasCallStack
, Member Interrupts e
, SetMember Process (Process q0) e
)
=> Sup i o
-> Eff e Text
getDiagnosticInfo (MkSup s) = call s GetDiagnosticInfo
stopOrKillChild
:: forall i o e q0 .
( Ord i
, Tangible i
, Tangible o
, HasCallStack
, SetMember Process (Process q0) e
, Member Interrupts e
, Lifted IO e
, Lifted IO q0
, Member Logs e
, Member (State (Children i o)) e
)
=> i
-> Child o
-> Timeout
-> Eff e ()
stopOrKillChild cId c stopTimeout =
do
sup <- MkSup . asServer @(Sup i o) <$> self
t <- startTimer stopTimeout
sendInterrupt (c^.childProcessId) NormalExitRequested
r1 <- receiveSelectedMessage ( Right <$> selectProcessDown (c^.childMonitoring)
<|> Left <$> selectTimerElapsed t )
case r1 of
Left timerElapsed -> do
logWarning (pack (show timerElapsed) <> ": child "<> pack (show cId) <>" => " <> pack(show (c^.childOutput)) <>" did not shutdown in time")
sendShutdown
(c^.childProcessId)
(interruptToExit
(TimeoutInterrupt
("child did not shut down in time and was terminated by the "
++ show sup)))
Right downMsg ->
logInfo ("child "<> pack (show cId) <>" => " <> pack(show (c^.childOutput)) <>" terminated: " <> pack (show (downReason downMsg)))
stopAllChildren
:: forall i o e q0 .
( Ord i
, Tangible i
, Tangible o
, HasCallStack
, SetMember Process (Process q0) e
, Lifted IO e
, Lifted IO q0
, Member Logs e
, Member (State (Children i o)) e
)
=> Timeout -> Eff e ()
stopAllChildren stopTimeout = removeAllChildren @i @o >>= pure . Map.assocs >>= traverse_ xxx
where
xxx (cId, c) = provideInterrupts (stopOrKillChild cId c stopTimeout) >>= either crash return
where
crash e = do
logError (pack (show e) <> " while stopping child: " <> pack (show cId) <> " " <> pack (show c))