{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE DeriveDataTypeable #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE PatternGuards #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Control.Distributed.Process.Supervisor
(
ChildSpec(..)
, ChildKey
, ChildType(..)
, ChildStopPolicy(..)
, ChildStart(..)
, RegisteredName(LocalName, CustomRegister)
, RestartPolicy(..)
, ChildRef(..)
, isRunning
, isRestarting
, Child
, StaticLabel
, SupervisorPid
, ChildPid
, ToChildStart(..)
, start
, run
, MaxRestarts
, maxRestarts
, RestartLimit(..)
, limit
, defaultLimits
, RestartMode(..)
, RestartOrder(..)
, RestartStrategy(..)
, ShutdownMode(..)
, restartOne
, restartAll
, restartLeft
, restartRight
, addChild
, AddChildResult(..)
, StartChildResult(..)
, startChild
, startNewChild
, stopChild
, StopChildResult(..)
, deleteChild
, DeleteChildResult(..)
, restartChild
, RestartChildResult(..)
, shutdown
, shutdownAndWait
, lookupChild
, listChildren
, SupervisorStats(..)
, statistics
, getRestartIntensity
, definedChildren
, definedWorkers
, definedSupervisors
, runningChildren
, runningWorkers
, runningSupervisors
, StartFailure(..)
, ChildInitFailure(..)
) where
import Control.DeepSeq (NFData)
import Control.Distributed.Process.Supervisor.Types
import Control.Distributed.Process
( Process
, ProcessId
, MonitorRef
, DiedReason(..)
, Match
, Handler(..)
, Message
, ProcessMonitorNotification(..)
, Closure
, Static
, exit
, kill
, match
, matchIf
, monitor
, getSelfPid
, liftIO
, catchExit
, catchesExit
, catches
, die
, link
, send
, register
, spawnLocal
, unsafeWrapMessage
, unmonitor
, withMonitor_
, expect
, unClosure
, receiveWait
, receiveTimeout
, handleMessageIf
)
import Control.Distributed.Process.Management (mxNotify, MxEvent(MxUser))
import Control.Distributed.Process.Extras.Internal.Primitives hiding (monitor)
import Control.Distributed.Process.Extras.Internal.Types
( ExitReason(..)
)
import Control.Distributed.Process.ManagedProcess
( handleCall
, handleInfo
, reply
, continue
, stop
, stopWith
, input
, defaultProcess
, prioritised
, InitHandler
, InitResult(..)
, ProcessAction
, ProcessReply
, ProcessDefinition(..)
, PrioritisedProcessDefinition(..)
, Priority()
, DispatchPriority
, UnhandledMessagePolicy(Drop)
, ExitState
, exitState
)
import qualified Control.Distributed.Process.ManagedProcess.UnsafeClient as Unsafe
( call
, cast
)
import qualified Control.Distributed.Process.ManagedProcess as MP
( pserve
)
import Control.Distributed.Process.ManagedProcess.Server.Priority
( prioritiseCast_
, prioritiseCall_
, prioritiseInfo_
, setPriority
, evalAfter
)
import Control.Distributed.Process.ManagedProcess.Server.Restricted
( RestrictedProcess
, Result
, RestrictedAction
, getState
, putState
)
import qualified Control.Distributed.Process.ManagedProcess.Server.Restricted as Restricted
( handleCallIf
, handleCall
, handleCast
, reply
, continue
)
import Control.Distributed.Process.Extras.SystemLog
( LogClient
, LogChan
, LogText
, Logger(..)
)
import qualified Control.Distributed.Process.Extras.SystemLog as Log
import Control.Distributed.Process.Extras.Time
import Control.Distributed.Static
( staticClosure
)
import Control.Exception (SomeException, throwIO)
import Control.Monad.Catch (catch, finally, mask)
import Control.Monad (void, forM)
import Data.Accessor
( Accessor
, accessor
, (^:)
, (.>)
, (^=)
, (^.)
)
import Data.Binary (Binary)
import Data.Foldable (find, foldlM, toList)
import Data.List (foldl')
import qualified Data.List as List (lookup)
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as Map
import Data.Sequence
( Seq
, ViewL(EmptyL, (:<))
, ViewR(EmptyR, (:>))
, (<|)
, (|>)
, (><)
, filter)
import qualified Data.Sequence as Seq
import Data.Time.Clock
( NominalDiffTime
, UTCTime
, getCurrentTime
, diffUTCTime
)
import Data.Typeable (Typeable)
#if ! MIN_VERSION_base(4,6,0)
import Prelude hiding (catch, filter, init, rem)
#else
import Prelude hiding (filter, init, rem)
#endif
import GHC.Generics
class ToChildStart a where
toChildStart :: a -> Process ChildStart
instance ToChildStart (Closure (Process ())) where
toChildStart = return . RunClosure
instance ToChildStart (Closure (SupervisorPid -> Process (ChildPid, Message))) where
toChildStart = return . CreateHandle
instance ToChildStart (Static (Process ())) where
toChildStart = toChildStart . staticClosure
data DeleteChild = DeleteChild !ChildKey
deriving (Typeable, Generic)
instance Binary DeleteChild where
instance NFData DeleteChild where
data FindReq = FindReq ChildKey
deriving (Typeable, Generic)
instance Binary FindReq where
instance NFData FindReq where
data StatsReq = StatsReq
deriving (Typeable, Generic)
instance Binary StatsReq where
instance NFData StatsReq where
data ListReq = ListReq
deriving (Typeable, Generic)
instance Binary ListReq where
instance NFData ListReq where
type ImmediateStart = Bool
data AddChildReq = AddChild !ImmediateStart !ChildSpec
deriving (Typeable, Generic, Show)
instance Binary AddChildReq where
instance NFData AddChildReq where
data AddChildRes = Exists ChildRef | Added State
data StartChildReq = StartChild !ChildKey
deriving (Typeable, Generic)
instance Binary StartChildReq where
instance NFData StartChildReq where
data RestartChildReq = RestartChildReq !ChildKey
deriving (Typeable, Generic, Show, Eq)
instance Binary RestartChildReq where
instance NFData RestartChildReq where
data DelayedRestart = DelayedRestart !ChildKey !DiedReason
deriving (Typeable, Generic, Show, Eq)
instance Binary DelayedRestart where
instance NFData DelayedRestart
data StopChildReq = StopChildReq !ChildKey
deriving (Typeable, Generic, Show, Eq)
instance Binary StopChildReq where
instance NFData StopChildReq where
data IgnoreChildReq = IgnoreChildReq !ChildPid
deriving (Typeable, Generic)
instance Binary IgnoreChildReq where
instance NFData IgnoreChildReq where
type ChildSpecs = Seq Child
type Prefix = ChildSpecs
type Suffix = ChildSpecs
data StatsType = Active | Specified
data LogSink = LogProcess !LogClient | LogChan
instance Logger LogSink where
logMessage LogChan = logMessage Log.logChannel
logMessage (LogProcess client') = logMessage client'
data State = State {
_specs :: ChildSpecs
, _active :: Map ChildPid ChildKey
, _strategy :: RestartStrategy
, _restartPeriod :: NominalDiffTime
, _restarts :: [UTCTime]
, _stats :: SupervisorStats
, _logger :: LogSink
, shutdownStrategy :: ShutdownMode
}
supErrId :: String -> String
supErrId s = "Control.Distributed.Process" ++ s
start :: RestartStrategy -> ShutdownMode -> [ChildSpec] -> Process SupervisorPid
start rs ss cs = spawnLocal $ run rs ss cs
run :: RestartStrategy -> ShutdownMode -> [ChildSpec] -> Process ()
run rs ss specs' = MP.pserve (rs, ss, specs') supInit serverDefinition
statistics :: Addressable a => a -> Process (SupervisorStats)
statistics = (flip Unsafe.call) StatsReq
lookupChild :: Addressable a => a -> ChildKey -> Process (Maybe (ChildRef, ChildSpec))
lookupChild addr key = Unsafe.call addr $ FindReq key
listChildren :: Addressable a => a -> Process [Child]
listChildren addr = Unsafe.call addr ListReq
addChild :: Addressable a => a -> ChildSpec -> Process AddChildResult
addChild addr spec = Unsafe.call addr $ AddChild False spec
startChild :: Addressable a => a -> ChildKey -> Process StartChildResult
startChild addr key = Unsafe.call addr $ StartChild key
startNewChild :: Addressable a
=> a
-> ChildSpec
-> Process AddChildResult
startNewChild addr spec = Unsafe.call addr $ AddChild True spec
deleteChild :: Addressable a => a -> ChildKey -> Process DeleteChildResult
deleteChild sid childKey = Unsafe.call sid $ DeleteChild childKey
stopChild :: Addressable a
=> a
-> ChildKey
-> Process StopChildResult
stopChild sid = Unsafe.call sid . StopChildReq
restartChild :: Addressable a
=> a
-> ChildKey
-> Process RestartChildResult
restartChild sid = Unsafe.call sid . RestartChildReq
shutdown :: Resolvable a => a -> Process ()
shutdown sid = do
mPid <- resolve sid
case mPid of
Nothing -> return ()
Just p -> exit p ExitShutdown
shutdownAndWait :: Resolvable a => a -> Process ()
shutdownAndWait sid = do
mPid <- resolve sid
case mPid of
Nothing -> return ()
Just p -> withMonitor_ p $ do
shutdown p
receiveWait [ matchIf (\(ProcessMonitorNotification _ p' _) -> p' == p)
(\_ -> return ())
]
supInit :: InitHandler (RestartStrategy, ShutdownMode, [ChildSpec]) State
supInit (strategy', shutdown', specs') = do
logClient <- Log.client
let client' = case logClient of
Nothing -> LogChan
Just c -> LogProcess c
let initState = ( (
restartPeriod ^= configuredRestartPeriod
)
. (strategy ^= strategy')
. (logger ^= client')
$ emptyState shutdown'
)
catch (foldlM initChild initState specs' >>= return . (flip InitOk) Infinity)
(\(e :: SomeException) -> do
sup <- getSelfPid
logEntry Log.error $
mkReport "Could not init supervisor " sup "noproc" (show e)
return $ InitStop (show e))
where
initChild :: State -> ChildSpec -> Process State
initChild st ch =
case (findChild (childKey ch) st) of
Just (ref, _) -> die $ StartFailureDuplicateChild ref
Nothing -> tryStartChild ch >>= initialised st ch
configuredRestartPeriod =
let maxT' = maxT (intensity strategy')
tI = asTimeout maxT'
tMs = (fromIntegral tI * (0.000001 :: Float))
in fromRational (toRational tMs) :: NominalDiffTime
initialised :: State
-> ChildSpec
-> Either StartFailure ChildRef
-> Process State
initialised _ _ (Left err) = liftIO $ throwIO $ ChildInitFailure (show err)
initialised state spec (Right ref) = do
mPid <- resolve ref
case mPid of
Nothing -> die $ (supErrId ".initChild:child=") ++ (childKey spec) ++ ":InvalidChildRef"
Just childPid -> do
return $ ( (active ^: Map.insert childPid chId)
. (specs ^: (|> (ref, spec)))
$ bumpStats Active chType (+1) state
)
where chId = childKey spec
chType = childType spec
emptyState :: ShutdownMode -> State
emptyState strat = State {
_specs = Seq.empty
, _active = Map.empty
, _strategy = restartAll
, _restartPeriod = (fromIntegral (0 :: Integer)) :: NominalDiffTime
, _restarts = []
, _stats = emptyStats
, _logger = LogChan
, shutdownStrategy = strat
}
emptyStats :: SupervisorStats
emptyStats = SupervisorStats {
_children = 0
, _workers = 0
, _supervisors = 0
, _running = 0
, _activeSupervisors = 0
, _activeWorkers = 0
, totalRestarts = 0
}
serverDefinition :: PrioritisedProcessDefinition State
serverDefinition = prioritised processDefinition supPriorities
where
supPriorities :: [DispatchPriority State]
supPriorities = [
prioritiseCast_ (\(IgnoreChildReq _) -> setPriority 100)
, prioritiseInfo_ (\(ProcessMonitorNotification _ _ _) -> setPriority 99 )
, prioritiseInfo_ (\(DelayedRestart _ _) -> setPriority 80 )
, prioritiseCall_ (\(_ :: FindReq) ->
(setPriority 10) :: Priority (Maybe (ChildRef, ChildSpec)))
]
processDefinition :: ProcessDefinition State
processDefinition =
defaultProcess {
apiHandlers = [
Restricted.handleCast handleIgnore
, handleCall handleStopChild
, Restricted.handleCall handleDeleteChild
, Restricted.handleCallIf (input (\(AddChild immediate _) -> not immediate))
handleAddChild
, handleCall handleStartNewChild
, handleCall handleStartChild
, handleCall handleRestartChild
, Restricted.handleCall handleLookupChild
, Restricted.handleCall handleListChildren
, Restricted.handleCall handleGetStats
]
, infoHandlers = [ handleInfo handleMonitorSignal
, handleInfo handleDelayedRestart
]
, shutdownHandler = handleShutdown
, unhandledMessagePolicy = Drop
} :: ProcessDefinition State
handleLookupChild :: FindReq
-> RestrictedProcess State (Result (Maybe (ChildRef, ChildSpec)))
handleLookupChild (FindReq key) = getState >>= Restricted.reply . findChild key
handleListChildren :: ListReq
-> RestrictedProcess State (Result [Child])
handleListChildren _ = getState >>= Restricted.reply . toList . (^. specs)
handleAddChild :: AddChildReq
-> RestrictedProcess State (Result AddChildResult)
handleAddChild req = getState >>= return . doAddChild req True >>= doReply
where doReply :: AddChildRes -> RestrictedProcess State (Result AddChildResult)
doReply (Added s) = putState s >> Restricted.reply (ChildAdded ChildStopped)
doReply (Exists e) = Restricted.reply (ChildFailedToStart $ StartFailureDuplicateChild e)
handleIgnore :: IgnoreChildReq
-> RestrictedProcess State RestrictedAction
handleIgnore (IgnoreChildReq childPid) = do
state <- getState
let (cId, active') =
Map.updateLookupWithKey (\_ _ -> Nothing) childPid $ state ^. active
case cId of
Nothing -> Restricted.continue
Just c -> do
putState $ ( (active ^= active')
. (resetChildIgnored c)
$ state
)
Restricted.continue
where
resetChildIgnored :: ChildKey -> State -> State
resetChildIgnored key state =
maybe state id $ updateChild key (setChildStopped True) state
handleDeleteChild :: DeleteChild
-> RestrictedProcess State (Result DeleteChildResult)
handleDeleteChild (DeleteChild k) = getState >>= handleDelete k
where
handleDelete :: ChildKey
-> State
-> RestrictedProcess State (Result DeleteChildResult)
handleDelete key state =
let (prefix, suffix) = Seq.breakl ((== key) . childKey . snd) $ state ^. specs
in case (Seq.viewl suffix) of
EmptyL -> Restricted.reply ChildNotFound
child :< remaining -> tryDeleteChild child prefix remaining state
tryDeleteChild (ref, spec) pfx sfx st
| ref == ChildStopped = do
putState $ ( (specs ^= pfx >< sfx)
$ bumpStats Specified (childType spec) decrement st
)
Restricted.reply ChildDeleted
| otherwise = Restricted.reply $ ChildNotStopped ref
handleStartChild :: State
-> StartChildReq
-> Process (ProcessReply StartChildResult State)
handleStartChild state (StartChild key) =
let child = findChild key state in
case child of
Nothing ->
reply ChildStartUnknownId state
Just (ref@(ChildRunning _), _) ->
reply (ChildStartFailed (StartFailureAlreadyRunning ref)) state
Just (ref@(ChildRunningExtra _ _), _) ->
reply (ChildStartFailed (StartFailureAlreadyRunning ref)) state
Just (ref@(ChildRestarting _), _) ->
reply (ChildStartFailed (StartFailureAlreadyRunning ref)) state
Just (_, spec) -> do
started <- doStartChild spec state
case started of
Left err -> reply (ChildStartFailed err) state
Right (ref, st') -> reply (ChildStartOk ref) st'
handleStartNewChild :: State
-> AddChildReq
-> Process (ProcessReply AddChildResult State)
handleStartNewChild state req@(AddChild _ spec) =
let added = doAddChild req False state in
case added of
Exists e -> reply (ChildFailedToStart $ StartFailureDuplicateChild e) state
Added _ -> attemptStart state spec
where
attemptStart st ch = do
started <- tryStartChild ch
case started of
Left err -> reply (ChildFailedToStart err) $ removeChild spec st
Right ref -> do
let st' = ( (specs ^: (|> (ref, spec)))
$ bumpStats Specified (childType spec) (+1) st
)
in reply (ChildAdded ref) $ markActive st' ref ch
handleRestartChild :: State
-> RestartChildReq
-> Process (ProcessReply RestartChildResult State)
handleRestartChild state (RestartChildReq key) =
let child = findChild key state in
case child of
Nothing ->
reply ChildRestartUnknownId state
Just (ref@(ChildRunning _), _) ->
reply (ChildRestartFailed (StartFailureAlreadyRunning ref)) state
Just (ref@(ChildRunningExtra _ _), _) ->
reply (ChildRestartFailed (StartFailureAlreadyRunning ref)) state
Just (ref@(ChildRestarting _), _) ->
reply (ChildRestartFailed (StartFailureAlreadyRunning ref)) state
Just (_, spec) -> do
started <- doStartChild spec state
case started of
Left err -> reply (ChildRestartFailed err) state
Right (ref, st') -> reply (ChildRestartOk ref) st'
handleDelayedRestart :: State
-> DelayedRestart
-> Process (ProcessAction State)
handleDelayedRestart state (DelayedRestart key reason) =
let child = findChild key state in do
case child of
Nothing ->
continue state
Just ((ChildRestarting childPid), spec) -> do
tryRestartChild childPid state (state ^. active) spec reason
Just other -> do
die $ ExitOther $ (supErrId ".handleDelayedRestart:InvalidState: ") ++ (show other)
handleStopChild :: State
-> StopChildReq
-> Process (ProcessReply StopChildResult State)
handleStopChild state (StopChildReq key) =
let child = findChild key state in
case child of
Nothing ->
reply StopChildUnknownId state
Just (ChildStopped, _) ->
reply StopChildOk state
Just (ref, spec) ->
reply StopChildOk =<< doStopChild ref spec state
handleGetStats :: StatsReq
-> RestrictedProcess State (Result SupervisorStats)
handleGetStats _ = Restricted.reply . (^. stats) =<< getState
handleMonitorSignal :: State
-> ProcessMonitorNotification
-> Process (ProcessAction State)
handleMonitorSignal state (ProcessMonitorNotification _ childPid reason) = do
let (cId, active') =
Map.updateLookupWithKey (\_ _ -> Nothing) childPid $ state ^. active
let mSpec =
case cId of
Nothing -> Nothing
Just c -> fmap snd $ findChild c state
case mSpec of
Nothing -> continue $ (active ^= active') state
Just spec -> tryRestart childPid state active' spec reason
handleShutdown :: ExitState State -> ExitReason -> Process ()
handleShutdown state r@(ExitOther reason) = stopChildren (exitState state) r >> die reason
handleShutdown state r = stopChildren (exitState state) r
tryRestart :: ChildPid
-> State
-> Map ChildPid ChildKey
-> ChildSpec
-> DiedReason
-> Process (ProcessAction State)
tryRestart childPid state active' spec reason = do
sup <- getSelfPid
logEntry Log.debug $ do
mkReport "signalled restart" sup (childKey spec) (show reason)
case state ^. strategy of
RestartOne _ -> tryRestartChild childPid state active' spec reason
strat -> do
case (childRestart spec, isNormal reason) of
(Intrinsic, True) -> stopWith newState ExitNormal
(Transient, True) -> continue newState
(Temporary, _) -> continue removeTemp
_ -> tryRestartBranch strat spec reason $ newState
where
newState = (active ^= active') state
removeTemp = removeChild spec $ newState
isNormal (DiedException _) = False
isNormal _ = True
tryRestartBranch :: RestartStrategy
-> ChildSpec
-> DiedReason
-> State
-> Process (ProcessAction State)
tryRestartBranch rs sp dr st =
let mode' = mode rs
tree' = case rs of
RestartAll _ _ -> childSpecs
RestartLeft _ _ -> subTreeL
RestartRight _ _ -> subTreeR
_ -> error "IllegalState"
proc = case mode' of
RestartEach _ -> stopStart (order mode')
_ -> restartBranch mode'
in do us <- getSelfPid
a <- proc tree'
report $ SupervisorBranchRestarted us (childKey sp) dr rs
return a
where
stopStart :: RestartOrder -> ChildSpecs -> Process (ProcessAction State)
stopStart order' tree = do
let tree' = case order' of
LeftToRight -> tree
RightToLeft -> Seq.reverse tree
state <- addRestart activeState
case state of
Nothing -> do us <- getSelfPid
let reason = errorMaxIntensityReached
report $ SupervisorShutdown us (shutdownStrategy st) reason
die reason
Just st' -> apply (foldlM stopStartIt st' tree')
restartBranch :: RestartMode -> ChildSpecs -> Process (ProcessAction State)
restartBranch mode' tree = do
state <- addRestart activeState
case state of
Nothing -> die errorMaxIntensityReached
Just st' -> do
let (stopTree, startTree) = mkTrees mode' tree
foldlM stopIt st' stopTree >>= \s -> apply $ foldlM startIt s startTree
mkTrees :: RestartMode -> ChildSpecs -> (ChildSpecs, ChildSpecs)
mkTrees (RestartInOrder LeftToRight) t = (t, t)
mkTrees (RestartInOrder RightToLeft) t = let rev = Seq.reverse t in (rev, rev)
mkTrees (RestartRevOrder LeftToRight) t = (t, Seq.reverse t)
mkTrees (RestartRevOrder RightToLeft) t = (Seq.reverse t, t)
mkTrees _ _ = error "mkTrees.INVALID_STATE"
stopStartIt :: State -> Child -> Process State
stopStartIt s ch@(cr, cs) = do
us <- getSelfPid
cPid <- resolve cr
report $ SupervisedChildRestarting us cPid (childKey cs) (ExitOther "RestartedBySupervisor")
doStopChild cr cs s >>= (flip startIt) ch
stopIt :: State -> Child -> Process State
stopIt s (cr, cs) = do
us <- getSelfPid
cPid <- resolve cr
report $ SupervisedChildRestarting us cPid (childKey cs) (ExitOther "RestartedBySupervisor")
doStopChild cr cs s
startIt :: State -> Child -> Process State
startIt s (_, cs)
| isTemporary (childRestart cs) = return $ removeChild cs s
| otherwise = ensureActive cs =<< doStartChild cs s
ensureActive :: ChildSpec
-> Either StartFailure (ChildRef, State)
-> Process State
ensureActive cs it
| (Right (ref, st')) <- it = return $ markActive st' ref cs
| (Left err) <- it = die $ ExitOther $ branchErrId ++ (childKey cs) ++ ": " ++ (show err)
| otherwise = error "IllegalState"
branchErrId :: String
branchErrId = supErrId ".tryRestartBranch:child="
apply :: (Process State) -> Process (ProcessAction State)
apply proc = do
catchExit (proc >>= continue) (\(_ :: ProcessId) -> stop)
activeState = maybe st id $ updateChild (childKey sp)
(setChildStopped False) st
subTreeL :: ChildSpecs
subTreeL =
let (prefix, suffix) = splitTree Seq.breakl
in case (Seq.viewl suffix) of
child :< _ -> prefix |> child
EmptyL -> prefix
subTreeR :: ChildSpecs
subTreeR =
let (prefix, suffix) = splitTree Seq.breakr
in case (Seq.viewr suffix) of
_ :> child -> child <| prefix
EmptyR -> prefix
splitTree splitWith = splitWith ((== childKey sp) . childKey . snd) childSpecs
childSpecs :: ChildSpecs
childSpecs =
let cs = activeState ^. specs
ck = childKey sp
rs' = childRestart sp
in case (isTransient rs', isTemporary rs', dr) of
(True, _, DiedNormal) -> filter ((/= ck) . childKey . snd) cs
(_, True, _) -> filter ((/= ck) . childKey . snd) cs
_ -> cs
tryRestartChild :: ChildPid
-> State
-> Map ChildPid ChildKey
-> ChildSpec
-> DiedReason
-> Process (ProcessAction State)
tryRestartChild childPid st active' spec reason
| DiedNormal <- reason
, True <- isTransient (childRestart spec) = continue childDown
| True <- isTemporary (childRestart spec) = continue childRemoved
| DiedNormal <- reason
, True <- isIntrinsic (childRestart spec) = stopWith updateStopped ExitNormal
| otherwise = doRestartChild childPid spec reason st
where
childDown = (active ^= active') $ updateStopped
childRemoved = (active ^= active') $ removeChild spec st
updateStopped = maybe st id $ updateChild chKey (setChildStopped False) st
chKey = childKey spec
doRestartChild :: ChildPid -> ChildSpec -> DiedReason -> State -> Process (ProcessAction State)
doRestartChild pid spec reason state = do
state' <- addRestart state
case state' of
Nothing ->
case (childRestartDelay spec) of
Nothing -> die errorMaxIntensityReached
Just del -> doRestartDelay pid del spec reason state
Just st -> do
sup <- getSelfPid
report $ SupervisedChildRestarting sup (Just pid) (childKey spec) (ExitOther $ show reason)
start' <- doStartChild spec st
case start' of
Right (ref, st') -> continue $ markActive st' ref spec
Left err -> do
if isTemporary (childRestart spec)
then do
logEntry Log.warning $
mkReport "Error in temporary child" sup (childKey spec) (show err)
continue $ ( (active ^: Map.filter (/= chKey))
. (bumpStats Active chType decrement)
. (bumpStats Specified chType decrement)
$ removeChild spec st)
else do
logEntry Log.error $
mkReport "Unrecoverable error in child. Stopping supervisor"
sup (childKey spec) (show err)
stopWith st $ ExitOther $ "Unrecoverable error in child " ++ (childKey spec)
where
chKey = childKey spec
chType = childType spec
doRestartDelay :: ChildPid
-> TimeInterval
-> ChildSpec
-> DiedReason
-> State
-> Process (ProcessAction State)
doRestartDelay oldPid rDelay spec reason state = do
evalAfter rDelay
(DelayedRestart (childKey spec) reason)
$ ( (active ^: Map.filter (/= chKey))
. (bumpStats Active chType decrement)
$ maybe state id (updateChild chKey (setChildRestarting oldPid) state)
)
where
chKey = childKey spec
chType = childType spec
addRestart :: State -> Process (Maybe State)
addRestart state = do
now <- liftIO $ getCurrentTime
let acc = foldl' (accRestarts now) [] (now:restarted)
case length acc of
n | n > maxAttempts -> return Nothing
_ -> return $ Just $ (restarts ^= acc) $ state
where
maxAttempts = maxNumberOfRestarts $ maxR $ maxIntensity
slot = state ^. restartPeriod
restarted = state ^. restarts
maxIntensity = state ^. strategy .> restartIntensity
accRestarts :: UTCTime -> [UTCTime] -> UTCTime -> [UTCTime]
accRestarts now' acc r =
let diff = diffUTCTime now' r in
if diff > slot then acc else (r:acc)
doStartChild :: ChildSpec
-> State
-> Process (Either StartFailure (ChildRef, State))
doStartChild spec st = do
restart <- tryStartChild spec
case restart of
Left f -> return $ Left f
Right p -> do
let mState = updateChild chKey (chRunning p) st
case mState of
Nothing -> die $ (supErrId ".doStartChild.InternalError:") ++ show spec
Just s' -> return $ Right $ (p, markActive s' p spec)
where
chKey = childKey spec
chRunning :: ChildRef -> Child -> Prefix -> Suffix -> State -> Maybe State
chRunning newRef (_, chSpec) prefix suffix st' =
Just $ ( (specs ^= prefix >< ((newRef, chSpec) <| suffix))
$ bumpStats Active (childType spec) (+1) st'
)
tryStartChild :: ChildSpec
-> Process (Either StartFailure ChildRef)
tryStartChild ChildSpec{..} =
case childStart of
RunClosure proc -> do
mProc <- catch (unClosure proc >>= return . Right)
(\(e :: SomeException) -> return $ Left (show e))
case mProc of
Left err -> logStartFailure $ StartFailureBadClosure err
Right p -> wrapClosure childKey childRegName p >>= return . Right
CreateHandle fn -> do
mFn <- catch (unClosure fn >>= return . Right)
(\(e :: SomeException) -> return $ Left (show e))
case mFn of
Left err -> logStartFailure $ StartFailureBadClosure err
Right fn' -> do
wrapHandle childKey childRegName fn' >>= return . Right
where
logStartFailure sf = do
sup <- getSelfPid
report $ SupervisedChildStartFailure sup sf childKey
return $ Left sf
wrapClosure :: ChildKey
-> Maybe RegisteredName
-> Process ()
-> Process ChildRef
wrapClosure key regName proc = do
supervisor <- getSelfPid
childPid <- spawnLocal $ do
self <- getSelfPid
link supervisor
maybeRegister regName self
() <- expect
(proc
`catchesExit` [
(\_ m -> handleMessageIf m (\r -> r == ExitShutdown)
(\_ -> return ()))
, (\_ m -> handleMessageIf m (\(ExitOther _) -> True)
(\r -> logExit supervisor self r))
])
`catches` [ Handler $ filterInitFailures supervisor self
, Handler $ logFailure supervisor self ]
void $ monitor childPid
send childPid ()
let cRef = ChildRunning childPid
report $ SupervisedChildStarted supervisor cRef key
return cRef
wrapHandle :: ChildKey
-> Maybe RegisteredName
-> (SupervisorPid -> Process (ChildPid, Message))
-> Process ChildRef
wrapHandle key regName proc = do
super <- getSelfPid
(childPid, msg) <- proc super
void $ monitor childPid
maybeRegister regName childPid
let cRef = ChildRunningExtra childPid msg
report $ SupervisedChildStarted super cRef key
return cRef
maybeRegister :: Maybe RegisteredName -> ChildPid -> Process ()
maybeRegister Nothing _ = return ()
maybeRegister (Just (LocalName n)) pid = register n pid
maybeRegister (Just (CustomRegister clj)) pid = do
mProc <- catch (unClosure clj >>= return . Right)
(\(e :: SomeException) -> return $ Left (show e))
case mProc of
Left err -> die $ ExitOther (show err)
Right p -> p pid
filterInitFailures :: SupervisorPid
-> ChildPid
-> ChildInitFailure
-> Process ()
filterInitFailures sup childPid ex = do
case ex of
ChildInitFailure _ -> do
report $ SupervisedChildInitFailed sup childPid ex
liftIO $ throwIO ex
ChildInitIgnore -> Unsafe.cast sup $ IgnoreChildReq childPid
stopChildren :: State -> ExitReason -> Process ()
stopChildren state er = do
us <- getSelfPid
let strat = shutdownStrategy state
report $ SupervisorShutdown us strat er
case strat of
ParallelShutdown -> do
let allChildren = toList $ state ^. specs
terminatorPids <- forM allChildren $ \ch -> do
pid <- spawnLocal $ void $ syncStop ch $ (active ^= Map.empty) state
mRef <- monitor pid
return (mRef, pid)
terminationErrors <- collectExits [] $ zip terminatorPids (map snd allChildren)
case terminationErrors of
[] -> return ()
_ -> do
sup <- getSelfPid
void $ logEntry Log.error $
mkReport "Errors in stopChildren / ParallelShutdown"
sup "n/a" (show terminationErrors)
SequentialShutdown ord -> do
let specs' = state ^. specs
let allChildren = case ord of
RightToLeft -> Seq.reverse specs'
LeftToRight -> specs'
void $ foldlM (flip syncStop) state (toList allChildren)
where
syncStop :: Child -> State -> Process State
syncStop (cr, cs) state' = doStopChild cr cs state'
collectExits :: [(ProcessId, DiedReason)]
-> [((MonitorRef, ProcessId), ChildSpec)]
-> Process [(ProcessId, DiedReason)]
collectExits errors [] = return errors
collectExits errors pids = do
(ref, pid, reason) <- receiveWait [
match (\(ProcessMonitorNotification ref' pid' reason') -> do
return (ref', pid', reason'))
]
let remaining = [p | p <- pids, (snd $ fst p) /= pid]
let spec = List.lookup (ref, pid) pids
case (reason, spec) of
(DiedUnknownId, _) -> collectExits errors remaining
(DiedNormal, _) -> collectExits errors remaining
(_, Nothing) -> collectExits errors remaining
(DiedException _, Just sp') -> do
if (childStop sp') == StopImmediately
then collectExits errors remaining
else collectExits ((pid, reason):errors) remaining
_ -> collectExits ((pid, reason):errors) remaining
doStopChild :: ChildRef -> ChildSpec -> State -> Process State
doStopChild ref spec state = do
us <- getSelfPid
mPid <- resolve ref
case mPid of
Nothing -> return state
Just pid -> do
stopped <- childShutdown (childStop spec) pid state
report $ SupervisedChildStopped us ref stopped
return $ ( (active ^: Map.delete pid)
$ updateStopped
)
where
chKey = childKey spec
updateStopped = maybe state id $ updateChild chKey (setChildStopped False) state
childShutdown :: ChildStopPolicy
-> ChildPid
-> State
-> Process DiedReason
childShutdown policy childPid st = mask $ \restore -> do
case policy of
(StopTimeout t) -> exit childPid ExitShutdown >> await restore childPid t st
StopImmediately -> do
kill childPid "StoppedBySupervisor"
void $ await restore childPid Infinity st
return DiedNormal
where
await restore' childPid' delay state = do
mRef <- monitor childPid'
let recv = case delay of
Infinity -> receiveWait (matches mRef) >>= return . Just
NoDelay -> receiveTimeout 0 (matches mRef)
Delay t -> receiveTimeout (asTimeout t) (matches mRef)
res <- recv `finally` (unmonitor mRef)
restore' $ maybe (childShutdown StopImmediately childPid' state) return res
matches :: MonitorRef -> [Match DiedReason]
matches m = [
matchIf (\(ProcessMonitorNotification m' _ _) -> m == m')
(\(ProcessMonitorNotification _ _ r) -> return r)
]
errorMaxIntensityReached :: ExitReason
errorMaxIntensityReached = ExitOther "ReachedMaxRestartIntensity"
report :: MxSupervisor -> Process ()
report = mxNotify . MxUser . unsafeWrapMessage
logExit :: SupervisorPid -> ChildPid -> ExitReason -> Process ()
logExit sup pid er = do
report $ SupervisedChildDied sup pid er
logFailure :: SupervisorPid -> ChildPid -> SomeException -> Process ()
logFailure sup childPid ex = do
logEntry Log.notice $ mkReport "Detected Child Exit" sup (show childPid) (show ex)
liftIO $ throwIO ex
logEntry :: (LogChan -> LogText -> Process ()) -> String -> Process ()
logEntry lg = Log.report lg Log.logChannel
mkReport :: String -> SupervisorPid -> String -> String -> String
mkReport b s c r = foldl' (\x xs -> xs ++ " " ++ x) "" (reverse items)
where
items :: [String]
items = [ "[" ++ s' ++ "]" | s' <- [ b
, "supervisor: " ++ show s
, "child: " ++ c
, "reason: " ++ r] ]
type Ignored = Bool
setChildStopped :: Ignored -> Child -> Prefix -> Suffix -> State -> Maybe State
setChildStopped ignored child prefix remaining st =
let spec = snd child
rType = childRestart spec
newRef = if ignored then ChildStartIgnored else ChildStopped
in case isTemporary rType of
True -> Just $ (specs ^= prefix >< remaining) $ st
False -> Just $ (specs ^= prefix >< ((newRef, spec) <| remaining)) st
setChildRestarting :: ChildPid -> Child -> Prefix -> Suffix -> State -> Maybe State
setChildRestarting oldPid child prefix remaining st =
let spec = snd child
newRef = ChildRestarting oldPid
in Just $ (specs ^= prefix >< ((newRef, spec) <| remaining)) st
doAddChild :: AddChildReq -> Bool -> State -> AddChildRes
doAddChild (AddChild _ spec) update st =
let chType = childType spec
in case (findChild (childKey spec) st) of
Just (ref, _) -> Exists ref
Nothing ->
case update of
True -> Added $ ( (specs ^: (|> (ChildStopped, spec)))
$ bumpStats Specified chType (+1) st
)
False -> Added st
updateChild :: ChildKey
-> (Child -> Prefix -> Suffix -> State -> Maybe State)
-> State
-> Maybe State
updateChild key updateFn state =
let (prefix, suffix) = Seq.breakl ((== key) . childKey . snd) $ state ^. specs
in case (Seq.viewl suffix) of
EmptyL -> Nothing
child :< remaining -> updateFn child prefix remaining state
removeChild :: ChildSpec -> State -> State
removeChild spec state =
let k = childKey spec
in specs ^: filter ((/= k) . childKey . snd) $ state
markActive :: State -> ChildRef -> ChildSpec -> State
markActive state ref spec =
case ref of
ChildRunning (pid :: ChildPid) -> inserted pid
ChildRunningExtra pid _ -> inserted pid
_ -> error $ "InternalError"
where
inserted pid' = active ^: Map.insert pid' (childKey spec) $ state
decrement :: Int -> Int
decrement n = n - 1
findChild :: ChildKey -> State -> Maybe (ChildRef, ChildSpec)
findChild key st = find ((== key) . childKey . snd) $ st ^. specs
bumpStats :: StatsType -> ChildType -> (Int -> Int) -> State -> State
bumpStats Specified Supervisor fn st = (bump fn) . (stats .> supervisors ^: fn) $ st
bumpStats Specified Worker fn st = (bump fn) . (stats .> workers ^: fn) $ st
bumpStats Active Worker fn st = (stats .> running ^: fn) . (stats .> activeWorkers ^: fn) $ st
bumpStats Active Supervisor fn st = (stats .> running ^: fn) . (stats .> activeSupervisors ^: fn) $ st
bump :: (Int -> Int) -> State -> State
bump with' = stats .> children ^: with'
isTemporary :: RestartPolicy -> Bool
isTemporary = (== Temporary)
isTransient :: RestartPolicy -> Bool
isTransient = (== Transient)
isIntrinsic :: RestartPolicy -> Bool
isIntrinsic = (== Intrinsic)
active :: Accessor State (Map ChildPid ChildKey)
active = accessor _active (\act' st -> st { _active = act' })
strategy :: Accessor State RestartStrategy
strategy = accessor _strategy (\s st -> st { _strategy = s })
restartIntensity :: Accessor RestartStrategy RestartLimit
restartIntensity = accessor intensity (\i l -> l { intensity = i })
getRestartIntensity :: RestartStrategy -> RestartLimit
getRestartIntensity = (^. restartIntensity)
restartPeriod :: Accessor State NominalDiffTime
restartPeriod = accessor _restartPeriod (\p st -> st { _restartPeriod = p })
restarts :: Accessor State [UTCTime]
restarts = accessor _restarts (\r st -> st { _restarts = r })
specs :: Accessor State ChildSpecs
specs = accessor _specs (\sp' st -> st { _specs = sp' })
stats :: Accessor State SupervisorStats
stats = accessor _stats (\st' st -> st { _stats = st' })
logger :: Accessor State LogSink
logger = accessor _logger (\l st -> st { _logger = l })
children :: Accessor SupervisorStats Int
children = accessor _children (\c st -> st { _children = c })
definedChildren :: SupervisorStats -> Int
definedChildren = (^. children)
workers :: Accessor SupervisorStats Int
workers = accessor _workers (\c st -> st { _workers = c })
definedWorkers :: SupervisorStats -> Int
definedWorkers = (^. workers)
supervisors :: Accessor SupervisorStats Int
supervisors = accessor _supervisors (\c st -> st { _supervisors = c })
definedSupervisors :: SupervisorStats -> Int
definedSupervisors = (^. supervisors)
running :: Accessor SupervisorStats Int
running = accessor _running (\r st -> st { _running = r })
runningChildren :: SupervisorStats -> Int
runningChildren = (^. running)
activeWorkers :: Accessor SupervisorStats Int
activeWorkers = accessor _activeWorkers (\c st -> st { _activeWorkers = c })
runningWorkers :: SupervisorStats -> Int
runningWorkers = (^. activeWorkers)
activeSupervisors :: Accessor SupervisorStats Int
activeSupervisors = accessor _activeSupervisors (\c st -> st { _activeSupervisors = c })
runningSupervisors :: SupervisorStats -> Int
runningSupervisors = (^. activeSupervisors)