{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE NoImplicitPrelude #-}
module Control.Concurrent.Capataz.Internal.Process where
import RIO
import Control.Concurrent.Capataz.Internal.Types
import Control.Concurrent.Capataz.Internal.Util
(readProcessMap, sortProcessesByTerminationOrder)
import RIO.Time (UTCTime, getCurrentTime)
getProcessAsync :: Process m -> Async ()
getProcessAsync process = case process of
WorkerProcess Worker { workerAsync } -> workerAsync
SupervisorProcess Supervisor { supervisorAsync } -> supervisorAsync
getProcessThreadId :: Process m -> ProcessThreadId
getProcessThreadId = PTID . asyncThreadId . getProcessAsync
getProcessId :: Process m -> ProcessId
getProcessId process = case process of
WorkerProcess Worker { workerId } -> workerId
SupervisorProcess Supervisor { supervisorId } -> supervisorId
getProcessName :: ProcessSpec m -> ProcessName
getProcessName procSpec = case procSpec of
WorkerSpec WorkerOptions { workerName } -> workerName
SupervisorSpec SupervisorOptions { supervisorName } -> supervisorName
getProcessType :: ProcessSpec m -> ProcessType
getProcessType processSpec = case processSpec of
WorkerSpec{} -> WorkerType
SupervisorSpec{} -> SupervisorType
getProcessSpec :: Process m -> ProcessSpec m
getProcessSpec process = case process of
WorkerProcess Worker { workerOptions } -> WorkerSpec workerOptions
SupervisorProcess Supervisor { supervisorOptions } ->
SupervisorSpec supervisorOptions
notifyProcessFailed
:: MonadIO m => SupervisorEnv m -> Process m -> SomeException -> m ()
notifyProcessFailed SupervisorEnv { supervisorId, supervisorName, notifyEvent } process processError
= do
eventTime <- getCurrentTime
notifyEvent ProcessFailed
{ supervisorId
, supervisorName
, processId = getProcessId process
, processName = getProcessName (getProcessSpec process)
, processType = getProcessType (getProcessSpec process)
, processThreadId = getProcessThreadId process
, processError
, eventTime
}
notifyProcessTerminated
:: MonadIO m => SupervisorEnv m -> Process m -> Text -> m ()
notifyProcessTerminated SupervisorEnv { supervisorId, supervisorName, notifyEvent } process terminationReason
= do
eventTime <- getCurrentTime
notifyEvent ProcessTerminated
{ supervisorId
, supervisorName
, processId = getProcessId process
, processName = getProcessName (getProcessSpec process)
, processType = getProcessType (getProcessSpec process)
, processThreadId = getProcessThreadId process
, terminationReason
, eventTime
}
notifyProcessStarted
:: MonadIO m
=> Maybe (ProcessId, RestartCount)
-> ParentSupervisorEnv m
-> Process m
-> m ()
notifyProcessStarted mRestartInfo ParentSupervisorEnv { supervisorId, supervisorName, notifyEvent } process
= do
eventTime <- getCurrentTime
case mRestartInfo of
Just (_processId, processRestartCount) -> notifyEvent ProcessRestarted
{ supervisorId
, supervisorName
, processId = getProcessId process
, processName = getProcessName (getProcessSpec process)
, processType = getProcessType (getProcessSpec process)
, processThreadId = getProcessThreadId process
, processRestartCount
, eventTime
}
Nothing -> notifyEvent ProcessStarted
{ supervisorId
, supervisorName
, processId = getProcessId process
, processName = getProcessName (getProcessSpec process)
, processType = getProcessType (getProcessSpec process)
, processThreadId = getProcessThreadId process
, eventTime
}
notifyProcessCompleted :: SupervisorEnv m -> Process m -> UTCTime -> m ()
notifyProcessCompleted SupervisorEnv { supervisorId, supervisorName, notifyEvent } process eventTime
= notifyEvent ProcessCompleted
{ supervisorId
, supervisorName
, processId = getProcessId process
, processName = getProcessName (getProcessSpec process)
, processType = getProcessType (getProcessSpec process)
, processThreadId = getProcessThreadId process
, eventTime
}
callProcessOnCompletion :: Monad m => ProcessSpec m -> m ()
callProcessOnCompletion procSpec = case procSpec of
WorkerSpec WorkerOptions { workerOnCompletion } -> workerOnCompletion
_ -> return ()
callProcessOnFailure :: ProcessSpec m -> SomeException -> m ()
callProcessOnFailure procSpec err = case procSpec of
WorkerSpec WorkerOptions { workerOnFailure } -> workerOnFailure err
SupervisorSpec SupervisorOptions { supervisorOnFailure } ->
supervisorOnFailure err
callProcessOnTermination :: Monad m => ProcessSpec m -> m ()
callProcessOnTermination procSpec = case procSpec of
WorkerSpec WorkerOptions { workerOnTermination } -> workerOnTermination
_ -> return ()
handleProcessException
:: (MonadUnliftIO m)
=> (m () -> m a)
-> ParentSupervisorEnv m
-> ProcessSpec m
-> ProcessId
-> RestartCount
-> SomeException
-> m MonitorEvent
handleProcessException unmask ParentSupervisorEnv { supervisorId, supervisorName, notifyEvent } procSpec processId restartCount err
= do
let processName = getProcessName procSpec
processThreadId <- PTID <$> myThreadId
monitorEventTime <- getCurrentTime
case fromAnyException err of
Just RestartProcessException -> return ProcessForcedRestart
{ processId
, processName
, monitorEventTime
}
Just TerminateProcessException { processTerminationReason } -> do
eErrResult <- unsafeTry $ unmask $ callProcessOnTermination procSpec
notifyEvent ProcessCallbackExecuted
{ supervisorId
, supervisorName
, processThreadId
, processId
, processName
, processType = getProcessType procSpec
, processCallbackError = either Just (const Nothing) eErrResult
, processCallbackType = OnTermination
, eventTime = monitorEventTime
}
case eErrResult of
Left processCallbackError -> return ProcessFailed'
{ processId
, processName
, processError = toException ProcessCallbackFailed
{ processId
, processCallbackError
, processCallbackType = OnTermination
, processError = Just err
}
, processRestartCount = restartCount
, monitorEventTime
}
Right _ -> return ProcessTerminated'
{ processId
, processName
, monitorEventTime
, processTerminationReason
, processRestartCount = restartCount
}
Just BrutallyTerminateProcessException { processTerminationReason } ->
return ProcessTerminated'
{ processId
, processName
, monitorEventTime
, processTerminationReason
, processRestartCount = restartCount
}
_ -> do
eErrResult <- unsafeTry $ unmask $ callProcessOnFailure procSpec err
notifyEvent ProcessCallbackExecuted
{ supervisorId
, supervisorName
, processId
, processName
, processType = getProcessType procSpec
, processThreadId
, processCallbackError = either Just (const Nothing) eErrResult
, processCallbackType = OnFailure
, eventTime = monitorEventTime
}
case eErrResult of
Left processCallbackError -> return ProcessFailed'
{ processId
, processName
, monitorEventTime
, processRestartCount = restartCount
, processError = toException ProcessCallbackFailed
{ processId
, processCallbackError
, processCallbackType = OnFailure
, processError = Just err
}
}
Right _ -> return ProcessFailed'
{ processId
, processName
, processError = err
, processRestartCount = restartCount
, monitorEventTime
}
handleProcessCompletion
:: (MonadUnliftIO m)
=> (m () -> m a)
-> ParentSupervisorEnv m
-> ProcessSpec m
-> ProcessId
-> RestartCount
-> m MonitorEvent
handleProcessCompletion unmask ParentSupervisorEnv { supervisorId, supervisorName, notifyEvent } procSpec processId restartCount
= do
let processName = getProcessName procSpec
processThreadId <- PTID <$> myThreadId
monitorEventTime <- getCurrentTime
eCompResult <- unsafeTry $ unmask $ callProcessOnCompletion procSpec
notifyEvent ProcessCallbackExecuted
{ supervisorId
, supervisorName
, processId
, processName
, processType = getProcessType procSpec
, processThreadId
, processCallbackError = either Just (const Nothing) eCompResult
, processCallbackType = OnCompletion
, eventTime = monitorEventTime
}
case eCompResult of
Left err -> return ProcessFailed'
{ processId
, processName
, processError = toException ProcessCallbackFailed
{ processId
, processCallbackError = err
, processError = Nothing
, processCallbackType = OnCompletion
}
, processRestartCount = restartCount
, monitorEventTime
}
Right _ ->
return ProcessCompleted' {processName , processId , monitorEventTime }
terminateProcess
:: (MonadUnliftIO m)
=> Text
-> SupervisorEnv m
-> Process m
-> m ()
terminateProcess processTerminationReason env process = do
case process of
WorkerProcess worker -> terminateWorker processTerminationReason worker
SupervisorProcess supervisor ->
terminateSupervisor processTerminationReason supervisor
notifyProcessTerminated env process processTerminationReason
terminateWorker :: (MonadUnliftIO m) => Text -> Worker m -> m ()
terminateWorker processTerminationReason Worker { workerId, workerOptions, workerAsync }
= do
let processId = workerId
WorkerOptions { workerTerminationPolicy } = workerOptions
case workerTerminationPolicy of
Infinity -> cancelWith
workerAsync
TerminateProcessException {processId , processTerminationReason }
BrutalTermination -> cancelWith
workerAsync
BrutallyTerminateProcessException
{ processId
, processTerminationReason
}
TimeoutMillis millis -> do
result <- asyncWithUnmask $ \unmask -> unmask $ race_
(do
threadDelay (millis * 1000)
cancelWith
workerAsync
BrutallyTerminateProcessException
{ processId
, processTerminationReason
}
)
(cancelWith
workerAsync
TerminateProcessException {processId , processTerminationReason }
)
wait result
terminateSupervisor :: MonadIO m => Text -> Supervisor m -> m ()
terminateSupervisor processTerminationReason Supervisor { supervisorId = processId, supervisorAsync }
= cancelWith
supervisorAsync
TerminateProcessException {processId , processTerminationReason }
terminateProcessMap :: (MonadUnliftIO m) => Text -> SupervisorEnv m -> m ()
terminateProcessMap terminationReason env@SupervisorEnv { supervisorId, supervisorName, supervisorProcessTerminationOrder, notifyEvent }
= do
eventTime <- getCurrentTime
processMap <- readProcessMap env
let processList = sortProcessesByTerminationOrder
supervisorProcessTerminationOrder
processMap
notifyEvent ProcessTerminationStarted
{ supervisorId
, supervisorName
, terminationReason
, eventTime
}
forM_ processList (terminateProcess terminationReason env)
notifyEvent ProcessTerminationFinished
{ supervisorId
, supervisorName
, terminationReason
, eventTime
}