{-# LANGUAGE UndecidableInstances #-} -- | Monitor a process and act when it is unresponsive. -- -- Behaviour of the watchdog: -- -- When a child crashes: -- * if the allowed maximum number crashes per time span has been reached for the process, -- ** cancel all other timers -- ** don't start the child again -- ** if this is a /permanent/ watchdog crash the watchdog -- * otherwise -- ** tell the broker to start the child -- ** record a crash and start a timer to remove the record later -- ** monitor the child -- -- When a child crash timer elapses: -- * remove the crash record -- -- @since 0.30.0 module Control.Eff.Concurrent.Protocol.Watchdog ( startLink , Watchdog , attachTemporary , attachPermanent , getCrashReports , CrashRate(..) , crashCount , crashTimeSpan , crashesPerSeconds , CrashCount , CrashTimeSpan , ChildWatch(..) , parent , crashes , ExonerationTimer(..) , CrashReport(..) , crashTime , crashReason , exonerationTimerReference ) where import Control.DeepSeq import Control.Eff (Eff, Member, lift, Lifted) import Control.Eff.Concurrent.Misc import Control.Eff.Concurrent.Process import Control.Eff.Concurrent.Process.Timer import Control.Eff.Concurrent.Protocol import Control.Eff.Concurrent.Protocol.Client import Control.Eff.Concurrent.Protocol.Wrapper import qualified Control.Eff.Concurrent.Protocol.Observer as Observer import Control.Eff.Concurrent.Protocol.Observer (Observer) import qualified Control.Eff.Concurrent.Protocol.Broker as Broker import Control.Eff.Concurrent.Protocol.Broker (Broker) import qualified Control.Eff.Concurrent.Protocol.StatefulServer as Stateful import qualified Control.Eff.Concurrent.Protocol.EffectfulServer as Effectful import Control.Lens import Control.Eff.Log import Data.Set (Set) import Data.Typeable import qualified Data.Set as Set import Data.Map (Map) import qualified Data.Map as Map import Data.Time.Clock import Data.Kind (Type) import Data.Default import Data.Text (pack) import GHC.Stack (HasCallStack) import Data.Maybe (isJust) import Data.Foldable (traverse_, forM_) import Control.Monad (when) -- | The phantom for watchdog processes, that watch the given type of servers -- -- This type is used for the 'Effectful.Server' and 'HasPdu' instances. -- -- @since 0.30.0 data Watchdog (child :: Type) deriving Typeable -- | Start and link a new watchdog process. -- -- The watchdog process will register itself to the 'Broker.ChildEvent's and -- restart crashed children. -- -- @since 0.30.0 startLink :: forall child e q . ( HasCallStack , Typeable child , FilteredLogging (Processes q) , Member Logs q , HasProcesses e q , Tangible (Broker.ChildId child) , Ord (Broker.ChildId child) , HasPdu (Effectful.ServerPdu child) , Lifted IO q ) => CrashRate -> Eff e (Endpoint (Watchdog child)) startLink = Stateful.startLink @(Watchdog child) . StartWatchDog -- | Restart children of the given broker. -- -- When the broker exits, ignore the children of that broker. -- -- @since 0.30.0 attachTemporary :: forall child q e . ( HasCallStack , FilteredLogging e , Typeable child , HasPdu (Effectful.ServerPdu child) , Tangible (Broker.ChildId child) , Ord (Broker.ChildId child) , HasProcesses e q ) => Endpoint (Watchdog child) -> Endpoint (Broker child) -> Eff e () attachTemporary wd broker = callWithTimeout wd (Attach broker False) (TimeoutMicros 1_000_000) -- | Restart children of the given broker. -- -- When the broker exits, the watchdog process will exit, too. -- -- @since 0.30.0 attachPermanent :: forall child q e . ( HasCallStack , FilteredLogging e , Typeable child , HasPdu (Effectful.ServerPdu child) , Tangible (Broker.ChildId child) , Ord (Broker.ChildId child) , HasProcesses e q ) => Endpoint (Watchdog child) -> Endpoint (Broker child) -> Eff e () attachPermanent wd broker = callWithTimeout wd (Attach broker True) (TimeoutMicros 1_000_000) -- | Return a list of 'CrashReport's. -- -- Useful for diagnostics -- -- @since 0.30.0 getCrashReports :: forall child q e . ( HasCallStack , FilteredLogging e , Typeable child , HasPdu (Effectful.ServerPdu child) , Tangible (Broker.ChildId child) , Ord (Broker.ChildId child) , HasProcesses e q , Lifted IO q , Lifted IO e , Member Logs e ) => Endpoint (Watchdog child) -> Eff e (Map (Broker.ChildId child) (ChildWatch child)) getCrashReports wd = callWithTimeout wd GetCrashReports (TimeoutMicros 5_000_000) instance Typeable child => HasPdu (Watchdog child) where type instance EmbeddedPduList (Watchdog child) = '[Observer (Broker.ChildEvent child)] data Pdu (Watchdog child) r where Attach :: Endpoint (Broker child) -> Bool -> Pdu (Watchdog child) ('Synchronous ()) GetCrashReports :: Pdu (Watchdog child) ('Synchronous (Map (Broker.ChildId child) (ChildWatch child))) OnChildEvent :: Broker.ChildEvent child -> Pdu (Watchdog child) 'Asynchronous deriving Typeable instance Typeable child => HasPduPrism (Watchdog child) (Observer (Broker.ChildEvent child)) where embedPdu (Observer.Observed e) = OnChildEvent e fromPdu (OnChildEvent x) = Just (Observer.Observed x) fromPdu _ = Nothing instance (NFData (Broker.ChildId child)) => NFData (Pdu (Watchdog child) r) where rnf (Attach e b) = rnf e `seq` rnf b `seq` () rnf GetCrashReports = () rnf (OnChildEvent o) = rnf o instance ( Show (Broker.ChildId child) , Typeable child , Typeable (Effectful.ServerPdu child) ) => Show (Pdu (Watchdog child) r) where showsPrec d (Attach e False) = showParen (d>=10) (showString "attach-temporary: " . shows e) showsPrec d (Attach e True) = showParen (d>=10) (showString "attach-permanent: " . shows e) showsPrec _ GetCrashReports = showString "get-crash-reports" showsPrec d (OnChildEvent o) = showParen (d>=10) (showString "on-child-event: " . showsPrec 10 o) -- ------------------ Broker Watches data BrokerWatch = MkBrokerWatch { _brokerMonitor :: MonitorReference, _isPermanent :: Bool } deriving (Typeable) instance Show BrokerWatch where showsPrec d (MkBrokerWatch mon False) = showParen (d>=10) (showString "temporary-broker: " . showsPrec 10 mon) showsPrec d (MkBrokerWatch mon True) = showParen (d>=10) (showString "permanent-broker: " . showsPrec 10 mon) brokerMonitor :: Lens' BrokerWatch MonitorReference brokerMonitor = lens _brokerMonitor (\(MkBrokerWatch _ x) m -> MkBrokerWatch m x) isPermanent :: Lens' BrokerWatch Bool isPermanent = lens _isPermanent (\(MkBrokerWatch x _) m -> MkBrokerWatch x m) -- --- Server Definition instance ( Typeable child , HasPdu (Effectful.ServerPdu child) , Tangible (Broker.ChildId child) , Ord (Broker.ChildId child) , Eq (Broker.ChildId child) , Lifted IO e , Member Logs e ) => Stateful.Server (Watchdog child) (Processes e) where data instance StartArgument (Watchdog child) = StartWatchDog { _crashRate :: CrashRate } deriving Typeable data instance Model (Watchdog child) = WatchdogModel { _brokers :: Map (Endpoint (Broker child)) BrokerWatch , _watched :: Map (Broker.ChildId child) (ChildWatch child) } update me startArg = \case Effectful.OnCall rt (Attach broker permanent) -> do logDebug ( "attaching " <> if permanent then "permanently" else "temporarily" <> " to: " <> pack (show broker) ) oldMonitor <- Stateful.preuseModel @(Watchdog child) (brokers . at broker . _Just . brokerMonitor) newMonitor <- maybe (monitor (broker^.fromEndpoint)) return oldMonitor case oldMonitor of Nothing -> do logDebug ("start observing: " <> pack (show broker)) Observer.registerObserver @(Broker.ChildEvent child) broker me Just _ -> logDebug ("already observing " <> pack (show broker)) let newBrokerWatch = MkBrokerWatch newMonitor permanent Stateful.modifyModel (brokers . at broker ?~ newBrokerWatch) Observer.registerObserver @(Broker.ChildEvent child) broker me sendReply rt () Effectful.OnCall rt GetCrashReports -> Stateful.useModel @(Watchdog child) watched >>= sendReply rt Effectful.OnCast (OnChildEvent e) -> case e of down@(Broker.OnBrokerShuttingDown broker) -> do logInfo ("received: " <> pack (show down)) removeBroker me broker spawned@(Broker.OnChildSpawned broker _ _) -> do logInfo ("received: " <> pack (show spawned)) currentModel <- Stateful.getModel @(Watchdog child) when (not (Set.member broker (currentModel ^. brokers . to Map.keysSet))) (logWarning ("received child event for unknown broker: " <> pack (show spawned))) down@(Broker.OnChildDown broker cId _ ExitNormally) -> do logInfo ("received: " <> pack (show down)) currentModel <- Stateful.getModel @(Watchdog child) if not (Set.member broker (currentModel ^. brokers . to Map.keysSet)) then logWarning ("received child event for unknown broker: " <> pack (show down)) else removeAndCleanChild @child cId down@(Broker.OnChildDown broker cId _ reason) -> do logInfo ("received: " <> pack (show down)) currentModel <- Stateful.getModel @(Watchdog child) if not (Set.member broker (currentModel ^. brokers . to Map.keysSet)) then logWarning ("received child event for unknown broker: " <> pack (show down)) else do let recentCrashes = countRecentCrashes broker cId currentModel rate = startArg ^. crashRate maxCrashCount = rate ^. crashCount if recentCrashes < maxCrashCount then do logNotice ("restarting (" <> pack (show recentCrashes) <> "/" <> pack (show maxCrashCount) <> "): " <> pack (show cId) <> " of " <> pack (show broker)) res <- Broker.spawnChild broker cId logNotice ("restarted: " <> pack (show cId) <> " of " <> pack (show broker) <> ": " <> pack (show res)) crash <- startExonerationTimer @child cId reason (rate ^. crashTimeSpan) if isJust (currentModel ^? childWatchesById cId) then do logDebug ("recording crash for child: " <> pack (show cId) <> " of " <> pack (show broker)) Stateful.modifyModel (watched @child . at cId . _Just . crashes %~ Set.insert crash) else do logDebug ("recording crash for new child: " <> pack (show cId) <> " of " <> pack (show broker)) Stateful.modifyModel (watched @child . at cId .~ Just (MkChildWatch broker (Set.singleton crash))) else do logWarning ("restart rate exceeded: " <> pack (show rate) <> ", for child: " <> pack (show cId) <> " of " <> pack (show broker)) removeAndCleanChild @child cId forM_ (currentModel ^? brokers . at broker . _Just) $ \bw -> if bw ^. isPermanent then do logError ("a child of a permanent broker crashed too often, interrupting: " <> pack (show broker)) let r = ExitUnhandledError "restart frequency exceeded" demonitor (bw ^. brokerMonitor) sendShutdown (broker ^. fromEndpoint) r exitBecause r -- TODO shutdown all other permanent brokers! else logError ("a child of a temporary broker crashed too often: " <> pack (show broker)) Effectful.OnDown pd@(ProcessDown _mref _ pid) -> do logDebug ("received " <> pack (show pd)) let broker = asEndpoint pid removeBroker @child me broker Effectful.OnTimeOut t -> do logError ("received: " <> pack (show t)) Effectful.OnMessage (fromStrictDynamic -> Just (MkExonerationTimer cId ref :: ExonerationTimer (Broker.ChildId child))) -> do logInfo ("exonerating: " <> pack (show cId)) Stateful.modifyModel (watched @child . at cId . _Just . crashes %~ Set.filter (\c -> c^.exonerationTimerReference /= ref)) Effectful.OnMessage t -> do logError ("received: " <> pack (show t)) Effectful.OnInterrupt reason -> do logError ("received: " <> pack (show reason)) -- ------------------ Start Argument crashRate :: Lens' (Stateful.StartArgument (Watchdog child)) CrashRate crashRate = lens _crashRate (\m x -> m {_crashRate = x}) -- ----------------- Crash Rate -- | The limit of crashes (see 'CrashCount') per time span (see 'CrashTimeSpan') that justifies restarting -- child processes. -- -- Used as parameter for 'startLink'. -- -- Use 'crashesPerSeconds' to construct a value. -- -- This governs how long the 'ExonerationTimer' runs before cleaning up a 'CrashReport' in a 'ChildWatch'. -- -- @since 0.30.0 data CrashRate = CrashesPerSeconds { _crashCount :: CrashCount , _crashTimeSpan :: CrashTimeSpan } deriving (Typeable, Eq, Ord) -- | The default is three crashes in 30 seconds. -- -- @since 0.30.0 instance Default CrashRate where def = 3 `crashesPerSeconds` 30 instance Show CrashRate where showsPrec d (CrashesPerSeconds count time) = showParen (d>=7) (shows count . showString " crashes/" . shows time . showString " seconds") instance NFData CrashRate where rnf (CrashesPerSeconds c t) = c `seq` t `seq` () -- | Number of crashes in 'CrashRate'. -- -- @since 0.30.0 type CrashCount = Int -- | Time span in which crashes are counted in 'CrashRate'. -- -- @since 0.30.0 type CrashTimeSpan = Int -- | A smart constructor for 'CrashRate'. -- -- The first parameter is the number of crashes allowed per number of seconds (second parameter) -- until the watchdog should give up restarting a child. -- -- @since 0.30.0 crashesPerSeconds :: CrashCount -> CrashTimeSpan -> CrashRate crashesPerSeconds = CrashesPerSeconds -- | A lens for '_crashCount'. -- -- @since 0.30.0 crashCount :: Lens' CrashRate CrashCount crashCount = lens _crashCount (\(CrashesPerSeconds _ time) count -> CrashesPerSeconds count time ) -- | A lens for '_crashTimeSpan'. -- -- @since 0.30.0 crashTimeSpan :: Lens' CrashRate CrashTimeSpan crashTimeSpan = lens _crashTimeSpan (\(CrashesPerSeconds count _) time -> CrashesPerSeconds count time) -- ------------------ Crash -- | An internal data structure that records a single crash of a child of an attached 'Broker'. -- -- See 'attachPermanent' and 'attachTemporary'. -- -- @since 0.30.0 data CrashReport a = MkCrashReport { _exonerationTimerReference :: TimerReference -- ^ After a crash, an 'ExonerationTimer' according to the 'CrashRate' of the 'Watchdog' -- is started, this is the reference , _crashTime :: UTCTime -- ^ Recorded time of the crash , _crashReason :: Interrupt 'NoRecovery -- ^ Recorded crash reason } deriving (Eq, Ord, Typeable) instance (Show a, Typeable a) => Show (CrashReport a) where showsPrec d c = showParen (d>=10) ( showString "crash report: " . showString " time: " . showsPrec 10 (c^.crashTime) . showString " reason: " . showsPrec 10 (c^.crashReason) . showString " " . showsPrec 10 (c^.exonerationTimerReference) ) instance NFData (CrashReport a) where rnf (MkCrashReport !a !b !c) = rnf a `seq` rnf b `seq` rnf c `seq` () -- | Lens for '_crashTime' -- -- @since 0.30.0 crashTime :: Lens' (CrashReport a) UTCTime crashTime = lens _crashTime (\c t -> c { _crashTime = t}) -- | Lens for '_crashReason' -- -- @since 0.30.0 crashReason :: Lens' (CrashReport a) (Interrupt 'NoRecovery) crashReason = lens _crashReason (\c t -> c { _crashReason = t}) -- | Lens for '_exonerationTimerReference' -- -- @since 0.30.0 exonerationTimerReference :: Lens' (CrashReport a) TimerReference exonerationTimerReference = lens _exonerationTimerReference (\c t -> c { _exonerationTimerReference = t}) startExonerationTimer :: forall child a q e . (HasProcesses e q, Lifted IO q, Lifted IO e, Show a, NFData a, Typeable a, Typeable child) => a -> Interrupt 'NoRecovery -> CrashTimeSpan -> Eff e (CrashReport a) startExonerationTimer cId r t = do let title = MkProcessTitle ("ExonerationTimer<" <> pack (showSTypeable @child ">") <> pack (show cId)) me <- self ref <- sendAfterWithTitle title me (TimeoutMicros (t * 1_000_000)) (MkExonerationTimer cId) now <- lift getCurrentTime return (MkCrashReport ref now r) -- | The timer started based on the 'CrashRate' '_crashTimeSpan' when a 'CrashReport' is recorded. -- -- After this timer elapses, the 'Watchdog' server will remove the 'CrashReport' from the 'ChildWatch' of -- that child. -- -- @since 0.30.0 data ExonerationTimer a = MkExonerationTimer !a !TimerReference deriving (Eq, Ord, Typeable) instance NFData a => NFData (ExonerationTimer a) where rnf (MkExonerationTimer !x !r) = rnf r `seq` rnf x `seq` () instance Show a => Show (ExonerationTimer a) where showsPrec d (MkExonerationTimer x r) = showParen (d >= 10) ( showString "exonerate: " . showsPrec 10 x . showString " after: " . showsPrec 10 r ) -- --------------------------- Child Watches -- | An internal data structure that keeps the 'CrashReport's of a child of an attached 'Broker' monitored by a 'Watchdog'. -- -- See 'attachPermanent' and 'attachTemporary', 'ExonerationTimer', 'CrashRate'. -- -- @since 0.30.0 data ChildWatch child = MkChildWatch { _parent :: Endpoint (Broker child) -- ^ The attached 'Broker' that started the child , _crashes :: Set (CrashReport (Broker.ChildId child)) -- ^ The crashes of the child. If the number of crashes -- surpasses the allowed number of crashes before the -- 'ExonerationTimer's clean them, the child is finally crashed. } deriving Typeable instance NFData (ChildWatch child) where rnf (MkChildWatch p c) = rnf p `seq` rnf c `seq` () instance (Typeable child, Typeable (Broker.ChildId child), Show (Broker.ChildId child)) => Show (ChildWatch child) where showsPrec d (MkChildWatch p c) = showParen (d>=10) ( showString "child-watch: parent: " . showsPrec 10 p . showString " crashes: " . foldr (.) id (showsPrec 10 <$> Set.toList c) ) -- | A lens for '_parent'. -- -- @since 0.30.0 parent :: Lens' (ChildWatch child) (Endpoint (Broker child)) parent = lens _parent (\m x -> m {_parent = x}) -- | A lens for '_crashes' -- -- @since 0.30.0 crashes :: Lens' (ChildWatch child) (Set (CrashReport (Broker.ChildId child))) crashes = lens _crashes (\m x -> m {_crashes = x}) -- ------------------ Model instance Default (Stateful.Model (Watchdog child)) where def = WatchdogModel def Map.empty instance ( Show (Broker.ChildId child) , Typeable (Broker.ChildId child) , Typeable child ) => Show (Stateful.Model (Watchdog child)) where showsPrec d (WatchdogModel bs cs) = showParen (d>=10) (showString "watchdog model broker watches: " . showsPrec 10 bs . showString " watchdog model child watches: " . showsPrec 10 cs ) -- -------------------------- Model -> Child Watches watched :: Lens' (Stateful.Model (Watchdog child)) (Map (Broker.ChildId child) (ChildWatch child)) watched = lens _watched (\m x -> m {_watched = x}) childWatches :: IndexedTraversal' (Broker.ChildId child) (Stateful.Model (Watchdog child)) (ChildWatch child) childWatches = watched . itraversed childWatchesById :: Eq (Broker.ChildId child) => Broker.ChildId child -> Traversal' (Stateful.Model (Watchdog child)) (ChildWatch child) childWatchesById theCId = childWatches . ifiltered (\cId _ -> cId == theCId) childWatchesByParenAndId :: Eq (Broker.ChildId child) => Endpoint (Broker child) -> Broker.ChildId child -> Traversal' (Stateful.Model (Watchdog child)) (ChildWatch child) childWatchesByParenAndId theParent theCId = childWatches . ifiltered (\cId cw -> cw ^. parent == theParent && cId == theCId) countRecentCrashes :: Eq (Broker.ChildId child) => Endpoint (Broker child) -> Broker.ChildId child -> Stateful.Model (Watchdog child) -> CrashCount countRecentCrashes theParent theCId theModel = length (theModel ^.. childWatchesByParenAndId theParent theCId . crashes . folded) -- --------------------------- Model -> Broker Watches brokers :: Lens' (Stateful.Model (Watchdog child)) (Map (Endpoint (Broker child)) BrokerWatch) brokers = lens _brokers (\m x -> m {_brokers = x}) -- -------------------------- Server Implementation Helpers removeAndCleanChild :: forall child q e. ( HasProcesses e q , Typeable child , Typeable (Broker.ChildId child) , Ord (Broker.ChildId child) , Show (Broker.ChildId child) , Member (Stateful.ModelState (Watchdog child)) e , Member Logs e ) => Broker.ChildId child -> Eff e () removeAndCleanChild cId = do oldModel <- Stateful.modifyAndGetModel (watched @child . at cId .~ Nothing) forMOf_ (childWatchesById cId) oldModel $ \w -> do logDebug ("removing client entry: " <> pack (show cId)) forMOf_ (crashes . folded . exonerationTimerReference) w cancelTimer logDebug (pack (show w)) removeBroker :: forall child q e. ( HasProcesses e q , Typeable child , Tangible (Broker.ChildId child) , Typeable (Effectful.ServerPdu child) , Ord (Broker.ChildId child) , Show (Broker.ChildId child) , Member (Stateful.ModelState (Watchdog child)) e , Member Logs e ) => Endpoint (Watchdog child) -> Endpoint (Broker child) -> Eff e () removeBroker me broker = do oldModel <- Stateful.getAndModifyModel @(Watchdog child) ( (brokers . at broker .~ Nothing) . (watched %~ Map.filter (\cw -> cw^.parent /= broker)) ) forM_ (oldModel ^? brokers . at broker . _Just) $ \deadBroker -> do logNotice ("dettaching: " <> pack (show deadBroker) <> " " <> pack (show broker)) let forgottenChildren = oldModel ^.. watched . itraversed . filtered (\cw -> cw^.parent == broker) traverse_ (logNotice . ("forgetting: " <>) . pack . show) forgottenChildren Observer.forgetObserver @(Broker.ChildEvent child) broker me when (view isPermanent deadBroker) $ do logError ("permanent broker exited: " <> pack (show broker)) exitBecause (ExitOtherProcessNotRunning (broker ^. fromEndpoint))