{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE NoImplicitPrelude #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Control.Concurrent.Capataz.Internal.Supervisor where
import RIO
import qualified RIO.HashMap as HashMap
import RIO.Time (NominalDiffTime, UTCTime, diffUTCTime, getCurrentTime)
import qualified Data.UUID.V4 as UUID
import Control.Concurrent.Capataz.Internal.Types
import qualified Control.Concurrent.Capataz.Internal.Process as Process
import qualified Control.Concurrent.Capataz.Internal.Util as Util
import qualified Control.Concurrent.Capataz.Internal.Worker as Worker
forkSupervisor
:: (MonadUnliftIO m)
=> ParentSupervisorEnv m
-> SupervisorOptions m
-> Maybe (ProcessId, RestartCount)
-> m (Supervisor m)
forkSupervisor parentEnv supervisorOptions mRestartInfo = do
(supervisorId, restartCount) <- case mRestartInfo of
Just (supervisorId, restartCount) -> pure (supervisorId, restartCount)
Nothing -> (,) <$> liftIO UUID.nextRandom <*> pure 0
supervisor <- supervisorMain parentEnv
supervisorOptions
supervisorId
restartCount
Process.notifyProcessStarted mRestartInfo
parentEnv
(SupervisorProcess supervisor)
return supervisor
buildSupervisorEnv
:: MonadIO m
=> (CapatazEvent -> m ())
-> (SupervisorMessage m -> m ())
-> STM (SupervisorMessage m)
-> SupervisorId
-> SupervisorOptions m
-> m (SupervisorEnv m)
buildSupervisorEnv notifyEvent supervisorNotify supervisorGetNotification supervisorId supervisorOptions@SupervisorOptions {..}
= do
supervisorProcessMap <- newIORef mempty
supervisorStatusVar <- newTVarIO Initializing
return SupervisorEnv {..}
handleMonitorEvent
:: (MonadUnliftIO m) => SupervisorEnv m -> MonitorEvent -> m Bool
handleMonitorEvent env monitorEv = do
case monitorEv of
ProcessForcedRestart{} ->
return ()
ProcessCompleted' { processId, monitorEventTime } ->
handleProcessCompleted env processId monitorEventTime
ProcessFailed' { processId, processError, processRestartCount } ->
handleProcessFailed env processId processError processRestartCount
ProcessTerminated' { processId, processRestartCount, processTerminationReason }
-> handleProcessTerminated env
processId
processTerminationReason
processRestartCount
return True
handleControlAction
:: (MonadUnliftIO m) => SupervisorEnv m -> ControlAction m -> m Bool
handleControlAction env controlAction = case controlAction of
ForkWorker { workerOptions, returnWorkerId } -> do
worker@Worker { workerId } <- Worker.forkWorker
(Util.toParentSupervisorEnv env)
workerOptions
Nothing
Util.appendProcessToMap env (WorkerProcess worker)
returnWorkerId workerId
return True
ForkSupervisor { supervisorOptions, returnSupervisor } -> do
supervisor <- forkSupervisor (Util.toParentSupervisorEnv env)
supervisorOptions
Nothing
Util.appendProcessToMap env (SupervisorProcess supervisor)
returnSupervisor supervisor
return True
TerminateProcess { processId, processTerminationReason, notifyProcessTermination }
-> do
mProcess <- Util.fetchProcess env processId
case mProcess of
Just process -> do
Process.terminateProcess processTerminationReason env process
notifyProcessTermination True
return True
_ -> do
notifyProcessTermination False
return True
haltSupervisor :: (MonadUnliftIO m) => Text -> SupervisorEnv m -> m ()
haltSupervisor reason env = do
Util.writeSupervisorStatus env Halting
Process.terminateProcessMap reason env
Util.resetProcessMap env (const HashMap.empty)
Util.writeSupervisorStatus env Halted
handleSupervisorMessage
:: (MonadUnliftIO m) => SupervisorEnv m -> SupervisorMessage m -> m Bool
handleSupervisorMessage env message = case message of
ControlAction controlAction -> handleControlAction env controlAction
MonitorEvent monitorEvent -> handleMonitorEvent env monitorEvent
supervisorLoop
:: (MonadUnliftIO m)
=> (forall b . m b -> m b)
-> ParentSupervisorEnv m
-> SupervisorEnv m
-> RestartCount
-> m ()
supervisorLoop unmask parentEnv@ParentSupervisorEnv { supervisorId, supervisorName, supervisorNotify = notifyParentSupervisor } env@SupervisorEnv { supervisorId = processId, supervisorName = processName, supervisorOptions, supervisorStatusVar, supervisorGetNotification, notifyEvent } restartCount
= do
processThreadId <- PTID <$> myThreadId
loopResult <-
unsafeTry
$ unmask
$ atomically
$ (,)
<$> Util.readSupervisorStatusSTM supervisorStatusVar
<*> supervisorGetNotification
case loopResult of
Left supervisorError -> handleSupervisorLoopError supervisorError
Right (status, message) -> case status of
Initializing -> do
eventTime <- getCurrentTime
notifyEvent InvalidSupervisorStatusReached
{ supervisorId
, supervisorName
, eventTime
}
supervisorLoop unmask parentEnv env restartCount
Running -> do
eContinueLoop <- unsafeTry $ unmask $ handleSupervisorMessage
env
message
case eContinueLoop of
Left supervisorError -> handleSupervisorLoopError supervisorError
Right continueLoop
| continueLoop -> supervisorLoop unmask parentEnv env restartCount
| otherwise -> do
eventTime <- getCurrentTime
notifyEvent ProcessTerminated
{ supervisorId
, supervisorName
, eventTime
, processId
, processName
, processThreadId
, processType = SupervisorType
, terminationReason = "Supervisor normal termination"
}
Halting ->
return ()
Halted ->
return ()
where
handleSupervisorLoopError err = do
haltSupervisor (tshow err) env
result <- Process.handleProcessException
unmask
parentEnv
(SupervisorSpec supervisorOptions)
processId
restartCount
err
notifyParentSupervisor (MonitorEvent result)
supervisorMain
:: (MonadUnliftIO m)
=> ParentSupervisorEnv m
-> SupervisorOptions m
-> SupervisorId
-> RestartCount
-> m (Supervisor m)
supervisorMain parentEnv@ParentSupervisorEnv { notifyEvent } supervisorOptions@SupervisorOptions { supervisorName, supervisorProcessSpecList } supervisorId restartCount
= do
supervisorCreationTime <- getCurrentTime
supervisorQueue <- newTQueueIO
let supervisorNotify = atomically . writeTQueue supervisorQueue
supervisorGetNotification = readTQueue supervisorQueue
supervisorEnv@SupervisorEnv{} <- buildSupervisorEnv
notifyEvent
supervisorNotify
supervisorGetNotification
supervisorId
supervisorOptions
supervisorAsync <- asyncWithUnmask $ \unmask -> do
Util.setProcessThreadName supervisorId supervisorName
supervisorLoop unmask parentEnv supervisorEnv restartCount
forM_
supervisorProcessSpecList
(\processSpec -> case processSpec of
WorkerSpec workerOptions -> do
worker <- Worker.forkWorker
(Util.toParentSupervisorEnv supervisorEnv)
workerOptions
Nothing
Util.appendProcessToMap supervisorEnv (WorkerProcess worker)
SupervisorSpec childSupervisorOptions -> do
supervisor <- forkSupervisor
(Util.toParentSupervisorEnv supervisorEnv)
childSupervisorOptions
Nothing
Util.appendProcessToMap supervisorEnv (SupervisorProcess supervisor)
)
Util.writeSupervisorStatus supervisorEnv Running
return Supervisor
{ supervisorId
, supervisorName
, supervisorAsync
, supervisorOptions
, supervisorEnv
, supervisorNotify
, supervisorCreationTime
}
calcDiffSeconds :: MonadIO m => UTCTime -> m NominalDiffTime
calcDiffSeconds creationTime = do
currentTime <- getCurrentTime
return $ diffUTCTime currentTime creationTime
calcRestartAction
:: SupervisorEnv m -> Int -> NominalDiffTime -> ProcessRestartAction
calcRestartAction SupervisorEnv { supervisorIntensity, supervisorPeriodSeconds } restartCount diffSeconds
= case () of
_
| diffSeconds
< supervisorPeriodSeconds
&& restartCount
>= supervisorIntensity
-> HaltSupervisor
| diffSeconds > supervisorPeriodSeconds
-> ResetRestartCount
| otherwise
-> IncreaseRestartCount
execCapatazRestartStrategy
:: (MonadUnliftIO m)
=> SupervisorEnv m
-> ProcessId
-> ProcessSpec m
-> Int
-> m ()
execCapatazRestartStrategy supervisorEnv@SupervisorEnv { supervisorRestartStrategy } processId processSpec processRestartCount
= case supervisorRestartStrategy of
AllForOne -> do
newProcessList <- restartProcessList supervisorEnv
processId
processRestartCount
let newProcessMap =
newProcessList
& fmap (\process -> (Util.getProcessId process, process))
& HashMap.fromList
Util.resetProcessMap supervisorEnv (const newProcessMap)
OneForOne -> do
Util.removeProcessFromMap supervisorEnv processId
newProcess <- case processSpec of
WorkerSpec workerOptions -> restartWorker supervisorEnv
workerOptions
processId
processRestartCount
SupervisorSpec supervisorOptions -> restartSupervisor
(Util.toParentSupervisorEnv supervisorEnv)
supervisorOptions
processId
processRestartCount
Util.appendProcessToMap supervisorEnv newProcess
execRestartAction
:: (MonadUnliftIO m)
=> SupervisorEnv m
-> ProcessId
-> ProcessSpec m
-> Text
-> UTCTime
-> Int
-> m ()
execRestartAction supervisorEnv@SupervisorEnv { supervisorOnIntensityReached } processId processSpec processName processCreationTime processRestartCount
= do
restartAction <- calcRestartAction supervisorEnv processRestartCount
<$> calcDiffSeconds processCreationTime
case restartAction of
HaltSupervisor -> do
(_ :: Either SomeException ()) <- unsafeTry supervisorOnIntensityReached
throwIO SupervisorIntensityReached
{ processId
, processName
, processRestartCount
}
ResetRestartCount ->
execCapatazRestartStrategy supervisorEnv processId processSpec 0
IncreaseRestartCount -> execCapatazRestartStrategy
supervisorEnv
processId
processSpec
(processRestartCount + 1)
restartProcessList
:: (MonadUnliftIO m)
=> SupervisorEnv m
-> WorkerId
-> RestartCount
-> m [Process m]
restartProcessList supervisorEnv@SupervisorEnv { supervisorProcessTerminationOrder } failingProcessId restartCount
= do
processMap <- Util.readProcessMap supervisorEnv
let processList = Util.sortProcessesByTerminationOrder
supervisorProcessTerminationOrder
processMap
newProcessList <- forM processList $ \process -> do
unless (failingProcessId == Process.getProcessId process)
$ forceRestartProcess supervisorEnv process
case process of
WorkerProcess Worker { workerId, workerOptions } -> do
let WorkerOptions { workerRestartStrategy } = workerOptions
case workerRestartStrategy of
Temporary -> return Nothing
_ ->
Just
<$> restartWorker supervisorEnv
workerOptions
workerId
restartCount
SupervisorProcess Supervisor { supervisorId, supervisorOptions } ->
Just
<$> restartSupervisor (Util.toParentSupervisorEnv supervisorEnv)
supervisorOptions
supervisorId
restartCount
return $ catMaybes newProcessList
forceRestartProcess :: MonadIO m => SupervisorEnv m -> Process m -> m ()
forceRestartProcess env process = do
Process.notifyProcessTerminated env process "forced restart"
cancelWith (Process.getProcessAsync process) RestartProcessException
restartWorker
:: (MonadUnliftIO m)
=> SupervisorEnv m
-> WorkerOptions m
-> WorkerId
-> RestartCount
-> m (Process m)
restartWorker supervisorEnv workerOptions workerId restartCount =
WorkerProcess <$> Worker.forkWorker
(Util.toParentSupervisorEnv supervisorEnv)
workerOptions
(Just (workerId, restartCount))
restartSupervisor
:: (MonadUnliftIO m)
=> ParentSupervisorEnv m
-> SupervisorOptions m
-> ProcessId
-> RestartCount
-> m (Process m)
restartSupervisor parentEnv supervisorOptions processId restartCount =
SupervisorProcess <$> forkSupervisor parentEnv
supervisorOptions
(Just (processId, restartCount))
handleWorkerCompleted
:: (MonadUnliftIO m) => SupervisorEnv m -> Worker m -> m ()
handleWorkerCompleted env worker = do
let Worker { workerId, workerOptions, workerCreationTime } = worker
WorkerOptions { workerName, workerRestartStrategy } = workerOptions
case workerRestartStrategy of
Permanent -> do
let restartCount = 0
execRestartAction env
workerId
(WorkerSpec workerOptions)
workerName
workerCreationTime
restartCount
_ -> Util.removeProcessFromMap env workerId
handleProcessCompleted
:: (MonadUnliftIO m) => SupervisorEnv m -> ProcessId -> UTCTime -> m ()
handleProcessCompleted env processId completionTime = do
mProcess <- Util.fetchProcess env processId
case mProcess of
Nothing -> return ()
Just process -> do
Process.notifyProcessCompleted env process completionTime
case process of
WorkerProcess worker -> handleWorkerCompleted env worker
_ ->
error
$ "ERROR: Supervisor ("
<> show (Process.getProcessId process)
<> ") should never complete"
handleWorkerFailed
:: (MonadUnliftIO m) => SupervisorEnv m -> Worker m -> Int -> m ()
handleWorkerFailed env worker restartCount = do
let Worker { workerId, workerCreationTime, workerOptions } = worker
WorkerOptions { workerName, workerRestartStrategy } = workerOptions
case workerRestartStrategy of
Temporary -> Util.removeProcessFromMap env workerId
_ -> execRestartAction env
workerId
(WorkerSpec workerOptions)
workerName
workerCreationTime
restartCount
handleSupervisorFailed
:: (MonadUnliftIO m) => SupervisorEnv m -> Supervisor m -> Int -> m ()
handleSupervisorFailed env supervisor restartCount = do
let Supervisor { supervisorId, supervisorCreationTime, supervisorOptions } =
supervisor
SupervisorOptions { supervisorName } = supervisorOptions
execRestartAction env
supervisorId
(SupervisorSpec supervisorOptions)
supervisorName
supervisorCreationTime
restartCount
handleProcessFailed
:: (MonadUnliftIO m)
=> SupervisorEnv m
-> WorkerId
-> SomeException
-> Int
-> m ()
handleProcessFailed env processId processError restartCount = do
mProcess <- Util.fetchProcess env processId
case mProcess of
Nothing -> return ()
Just process -> do
Process.notifyProcessFailed env process processError
case process of
WorkerProcess worker -> handleWorkerFailed env worker restartCount
SupervisorProcess supervisor ->
handleSupervisorFailed env supervisor restartCount
handleWorkerTerminated
:: (MonadUnliftIO m) => SupervisorEnv m -> Worker m -> Int -> m ()
handleWorkerTerminated env worker restartCount = do
let Worker { workerId, workerCreationTime, workerOptions } = worker
WorkerOptions { workerName, workerRestartStrategy } = workerOptions
case workerRestartStrategy of
Permanent -> execRestartAction env
workerId
(WorkerSpec workerOptions)
workerName
workerCreationTime
restartCount
_ -> Util.removeProcessFromMap env workerId
handleSupervisorTerminated
:: (MonadUnliftIO m) => SupervisorEnv m -> Supervisor m -> Int -> m ()
handleSupervisorTerminated env supervisor restartCount = do
let Supervisor { supervisorId, supervisorCreationTime, supervisorOptions } =
supervisor
SupervisorOptions { supervisorName } = supervisorOptions
execRestartAction env
supervisorId
(SupervisorSpec supervisorOptions)
supervisorName
supervisorCreationTime
restartCount
handleProcessTerminated
:: (MonadUnliftIO m) => SupervisorEnv m -> ProcessId -> Text -> Int -> m ()
handleProcessTerminated env processId terminationReason restartCount = do
mProcess <- Util.fetchProcess env processId
case mProcess of
Nothing -> return ()
Just process -> do
Process.notifyProcessTerminated env process terminationReason
case process of
WorkerProcess worker -> handleWorkerTerminated env worker restartCount
SupervisorProcess supervisor ->
handleSupervisorTerminated env supervisor restartCount