{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE NoImplicitPrelude #-}
{-# LANGUAGE OverloadedStrings #-}
module Control.Concurrent.Capataz.Internal.Types where
import RIO
import RIO.Time (NominalDiffTime, UTCTime)
import Control.Teardown (HasTeardown (..), Teardown)
import Data.UUID (UUID)
import qualified Control.Exception as UnsafeE
import Data.Typeable (cast)
import Data.Text.Prettyprint.Doc (Pretty (..), (<+>))
import qualified Data.Text.Prettyprint.Doc as Pretty
import Text.Show.Pretty (ppShow)
type CapatazId = UUID
type WorkerId = UUID
type SupervisorId = UUID
type ProcessId = UUID
type WorkerAction m = WorkerId -> m ()
type ProcessName = Text
type CapatazName = Text
type SupervisorName = Text
type WorkerName = Text
type RestartCount = Int
type ProcessMap m = HashMap ProcessId (Process m)
type ParentSupervisor = Supervisor
newtype ProcessThreadId
= PTID ThreadId
deriving (Generic, Eq, Show)
instance Pretty ProcessThreadId where
pretty (PTID tid) =
case words (show tid) of
(_:threadNumber:_) -> pretty threadNumber
_ -> "unknown"
data CapatazEvent
= InvalidSupervisorStatusReached {
supervisorId :: !SupervisorId
, supervisorName :: !SupervisorName
, eventTime :: !UTCTime
}
| SupervisorStatusChanged {
supervisorId :: !SupervisorId
, supervisorName :: !SupervisorName
, prevSupervisorStatus :: !SupervisorStatus
, newSupervisorStatus :: !SupervisorStatus
, eventTime :: !UTCTime
}
| ProcessTerminated {
supervisorId :: !SupervisorId
, supervisorName :: !SupervisorName
, processThreadId :: !ProcessThreadId
, processId :: !ProcessId
, processName :: !ProcessName
, processType :: !ProcessType
, terminationReason :: !Text
, eventTime :: !UTCTime
}
| ProcessStarted {
supervisorId :: !SupervisorId
, supervisorName :: !SupervisorName
, processThreadId :: !ProcessThreadId
, processId :: !ProcessId
, processName :: !ProcessName
, processType :: !ProcessType
, eventTime :: !UTCTime
}
| ProcessRestarted {
supervisorId :: !SupervisorId
, supervisorName :: !SupervisorName
, processThreadId :: !ProcessThreadId
, processId :: !ProcessId
, processName :: !ProcessName
, processType :: !ProcessType
, processRestartCount :: !Int
, eventTime :: !UTCTime
}
| ProcessCompleted {
supervisorId :: !SupervisorId
, supervisorName :: !SupervisorName
, processThreadId :: !ProcessThreadId
, processId :: !ProcessId
, processName :: !ProcessName
, processType :: !ProcessType
, eventTime :: !UTCTime
}
| ProcessFailed {
supervisorName :: !SupervisorName
, supervisorId :: !SupervisorId
, processThreadId :: !ProcessThreadId
, processId :: !ProcessId
, processName :: !ProcessName
, processType :: !ProcessType
, processError :: !SomeException
, eventTime :: !UTCTime
}
| ProcessCallbackExecuted {
supervisorId :: !SupervisorId
, supervisorName :: !SupervisorName
, processThreadId :: !ProcessThreadId
, processId :: !ProcessId
, processName :: !ProcessName
, processType :: !ProcessType
, processCallbackError :: !(Maybe SomeException)
, processCallbackType :: !CallbackType
, eventTime :: !UTCTime
}
| ProcessTerminationStarted {
supervisorName :: !SupervisorName
, supervisorId :: !SupervisorId
, terminationReason :: !Text
, eventTime :: !UTCTime
}
| ProcessTerminationFinished {
supervisorName :: !SupervisorName
, supervisorId :: !SupervisorId
, terminationReason :: !Text
, eventTime :: !UTCTime
}
| CapatazFailed {
supervisorId :: !SupervisorId
, supervisorName :: !SupervisorName
, supervisorError :: !SomeException
, eventTime :: !UTCTime
}
| CapatazTerminated {
supervisorName :: !SupervisorName
, supervisorId :: !SupervisorId
, eventTime :: !UTCTime
}
deriving (Generic, Show)
instance Pretty CapatazEvent where
pretty ev =
case ev of
InvalidSupervisorStatusReached {supervisorId, supervisorName} ->
"Supervisor got into an state that should never have happened, please"
<+> "report a ticket to"
<+> "https://github.com/roman/Haskell-capataz/issues/new with title:"
<+> Pretty.dquotes "InvalidSupervisorStatusReached error"
<> prettyAttributes [("Supervisor ID"
, prettySupervisorId supervisorId supervisorName)]
SupervisorStatusChanged {
supervisorId
, supervisorName
, prevSupervisorStatus
, newSupervisorStatus
} ->
"Supervisor changed state"
<> prettyAttributes [ ("Supervisor ID"
, prettySupervisorId supervisorId supervisorName)
, ("Previous State"
, Pretty.dquotes $ Pretty.pretty prevSupervisorStatus)
, ("New State"
, Pretty.dquotes (Pretty.pretty newSupervisorStatus))
]
ProcessStarted {
supervisorId
, supervisorName
, processId
, processThreadId
, processName
, processType
} ->
"Supervisor spawned new process"
<> prettyAttributes [ ("Supervisor ID"
, prettySupervisorId supervisorId supervisorName)
, ( "Process ID"
, prettyProcessId processId processName processThreadId)
, ( "Process Type",
pretty processType)
]
ProcessFailed {
processId
, processThreadId
, processName
, processType
, processError
} ->
"Process failed with error"
<> prettyAttributes [ ( "Process ID"
, prettyProcessId processId processName processThreadId)
, ( "Process Type", pretty processType)
, ( "Error"
, Pretty.nest 2 (Pretty.hardline
<> "|" <+> pretty (ppShow processError)))
]
ProcessRestarted {
supervisorId
, supervisorName
, processId
, processThreadId
, processName
, processType
} ->
"Supervisor restarted failed process"
<> prettyAttributes [ ("Supervisor ID"
, prettySupervisorId supervisorId supervisorName)
, ( "Process ID"
, prettyProcessId processId processName processThreadId)
, ( "Process Type", pretty processType)
]
ProcessTerminated {
supervisorId
, supervisorName
, processId
, processThreadId
, processName
, processType
, terminationReason
} ->
"Supervisor terminated process"
<> prettyAttributes [ ("Supervisor ID"
, prettySupervisorId supervisorId supervisorName)
, ( "Process ID"
, prettyProcessId processId processName processThreadId)
, ( "Process Type", pretty processType)
, ( "Reason"
, Pretty.nest 2
(Pretty.fillBreak 10 $ pretty terminationReason))
]
ProcessCompleted {
processId
, processThreadId
, processName
, processType
} ->
"Process completed execution with no errors"
<> prettyAttributes [ ( "Process ID"
, prettyProcessId processId processName processThreadId)
, ( "Process Type", pretty processType)
]
ProcessCallbackExecuted {
processId
, processThreadId
, processName
, processType
, processCallbackError
, processCallbackType
} ->
case processCallbackError of
Nothing ->
"Process executed callback"
<> prettyAttributes [ ( "Process ID"
, prettyProcessId processId processName processThreadId)
, ( "Process Type", pretty processType)
, ( "Callback", pretty processCallbackType)
]
Just err ->
"Process executed callback and it failed"
<> prettyAttributes [ ( "Process ID"
, prettyProcessId processId processName processThreadId)
, ( "Process Type", pretty processType)
, ( "Callback", pretty processCallbackType)
, ( "Error"
, Pretty.nest 2
(Pretty.hardline
<> "|"
<+> pretty (ppShow err)))
]
ProcessTerminationStarted {
supervisorId
, supervisorName
, terminationReason
} ->
"Supervisor started termination of its children"
<> prettyAttributes [ ("Supervisor ID"
, prettySupervisorId supervisorId supervisorName)
, ( "Reason"
, Pretty.nest 2
(Pretty.fillBreak 10 $ pretty terminationReason))
]
ProcessTerminationFinished {
supervisorId
, supervisorName
} ->
"Supervisor finished termination of its children"
<> prettyAttributes [ ("Supervisor ID"
, prettySupervisorId supervisorId supervisorName)
]
CapatazFailed {
supervisorId
, supervisorName
, supervisorError
} ->
"Root Supervisor had a fatal failure"
<> prettyAttributes [ ("Supervisor ID"
, prettySupervisorId supervisorId supervisorName)
, ( "Error"
, Pretty.nest 2
(Pretty.hardline
<> "|"
<+> pretty (ppShow supervisorError)))
]
CapatazTerminated {
supervisorId
, supervisorName
} ->
"Root supervisor was terminated"
<> prettyAttributes [ ("Supervisor ID"
, prettySupervisorId supervisorId supervisorName)
]
where
prettyAttributes attrList =
Pretty.nest 4
(Pretty.hardline
<> Pretty.vsep (map (\(k, v) -> Pretty.fill 20 (k <> ":") <+> v)
attrList))
prettySupervisorId supId supName =
Pretty.angles
$ pretty (show supId) <> "/" <> pretty supName
prettyProcessId procId procName procTid =
Pretty.angles
$ pretty (show procId) <> "/" <> pretty procName <> "/" <> pretty procTid
instance Display CapatazEvent where
display = displayShow . pretty
data WorkerTerminationPolicy
= Infinity
| BrutalTermination
| TimeoutMillis !Int
deriving (Generic, Show, Eq, Ord)
defWorkerTerminationPolicy :: WorkerTerminationPolicy
defWorkerTerminationPolicy = TimeoutMillis 3000
instance NFData WorkerTerminationPolicy
data ProcessRestartAction
= ResetRestartCount
| IncreaseRestartCount
| HaltSupervisor
deriving (Generic, Show, Eq)
instance NFData ProcessRestartAction
data ProcessTerminationOrder
= NewestFirst
| OldestFirst
deriving (Generic, Show, Eq, Ord)
defProcessTerminationOrder :: ProcessTerminationOrder
defProcessTerminationOrder = OldestFirst
instance NFData ProcessTerminationOrder
data SupervisorRestartStrategy
= AllForOne
| OneForOne
deriving (Generic, Show, Eq, Ord)
defSupervisorRestartStategy :: SupervisorRestartStrategy
defSupervisorRestartStategy = OneForOne
instance NFData SupervisorRestartStrategy
data CapatazOptions m
= CapatazOptions {
supervisorName :: !SupervisorName
, supervisorIntensity :: !Int
, supervisorPeriodSeconds :: !NominalDiffTime
, supervisorRestartStrategy :: !SupervisorRestartStrategy
, supervisorProcessSpecList :: ![ProcessSpec m]
, supervisorProcessTerminationOrder :: !ProcessTerminationOrder
, supervisorOnIntensityReached :: !(m ())
, supervisorOnFailure :: !(SomeException -> m ())
, notifyEvent :: !(CapatazEvent -> m ())
}
data WorkerRestartStrategy
= Permanent
| Transient
| Temporary
deriving (Generic, Show, Eq)
instance NFData WorkerRestartStrategy
defWorkerRestartStrategy :: WorkerRestartStrategy
defWorkerRestartStrategy = Transient
data WorkerOptions m
= WorkerOptions {
workerAction :: WorkerAction m
, workerName :: !WorkerName
, workerOnFailure :: !(SomeException -> m ())
, workerOnCompletion :: !(m ())
, workerOnTermination :: !(m ())
, workerTerminationPolicy :: !WorkerTerminationPolicy
, workerRestartStrategy :: !WorkerRestartStrategy
}
deriving (Generic)
data Worker m
= Worker {
workerId :: !WorkerId
, workerAsync :: !(Async ())
, workerCreationTime :: !UTCTime
, workerName :: !WorkerName
, workerOptions :: !(WorkerOptions m)
}
data ProcessEnv
= ProcessEnv {
processId :: !ProcessId
, processName :: !ProcessName
, processAsync :: !(Async ())
, processCreationTime :: !UTCTime
, processRestartStrategy :: !WorkerRestartStrategy
}
data SupervisorOptions m
= SupervisorOptions {
supervisorName :: Text
, supervisorIntensity :: !Int
, supervisorPeriodSeconds :: !NominalDiffTime
, supervisorRestartStrategy :: !SupervisorRestartStrategy
, supervisorProcessSpecList :: ![ProcessSpec m]
, supervisorProcessTerminationOrder :: !ProcessTerminationOrder
, supervisorOnIntensityReached :: !(m ())
, supervisorOnFailure :: !(SomeException -> m ())
}
data Supervisor m
= Supervisor {
supervisorId :: !SupervisorId
, supervisorName :: !SupervisorName
, supervisorOptions :: !(SupervisorOptions m)
, supervisorCreationTime :: !UTCTime
, supervisorAsync :: !(Async ())
, supervisorNotify :: SupervisorMessage m -> m ()
, supervisorEnv :: !(SupervisorEnv m)
}
data ControlAction m
= ForkWorker {
workerOptions :: !(WorkerOptions m)
, returnWorkerId :: !(WorkerId -> m ())
}
| ForkSupervisor {
supervisorOptions :: !(SupervisorOptions m)
, returnSupervisor :: !(Supervisor m -> m ())
}
| TerminateProcess {
processId :: !ProcessId
, processTerminationReason :: !Text
, notifyProcessTermination :: !(Bool -> m ())
}
deriving (Generic)
data CapatazSignal
= CapatazFailure
| RestartProcessException
| TerminateProcessException {
processId :: !ProcessId
, processTerminationReason :: !Text
}
| BrutallyTerminateProcessException {
processId :: !ProcessId
, processTerminationReason :: !Text
}
deriving (Generic, Show)
instance Exception CapatazSignal
instance NFData CapatazSignal
data CapatazError
= SupervisorIntensityReached {
processId :: !ProcessId
, processName :: !ProcessName
, processRestartCount :: !Int
}
deriving (Generic, Show)
instance Exception CapatazError
instance NFData CapatazError
data CallbackType
= OnCompletion
| OnFailure
| OnTermination
deriving (Generic, Show, Eq)
instance Pretty CallbackType where
pretty = Pretty.pretty . show
data ProcessType
= SupervisorType
| WorkerType
deriving (Show, Eq)
instance Pretty ProcessType where
pretty ty =
case ty of
SupervisorType -> "Supervisor"
WorkerType -> "Worker"
data ProcessError
= ProcessCallbackFailed {
processId :: !WorkerId
, processError :: !(Maybe SomeException)
, processCallbackError :: !SomeException
, processCallbackType :: !CallbackType
}
deriving (Generic, Show)
instance Exception ProcessError
data MonitorEvent
= ProcessTerminated' {
processId :: !ProcessId
, processName :: !ProcessName
, processRestartCount :: !RestartCount
, processTerminationReason :: !Text
, monitorEventTime :: !UTCTime
}
| ProcessFailed' {
processId :: !WorkerId
, processName :: !WorkerName
, processRestartCount :: !RestartCount
, processError :: !SomeException
, monitorEventTime :: !UTCTime
}
| ProcessCompleted' {
processId :: !ProcessId
, processName :: !ProcessName
, monitorEventTime :: !UTCTime
}
| ProcessForcedRestart {
processId :: !ProcessId
, processName :: !ProcessName
, monitorEventTime :: !UTCTime
}
deriving (Show)
data SupervisorStatus
= Initializing
| Running
| Halting
| Halted
deriving (Generic, Show, Eq)
instance NFData SupervisorStatus
instance Pretty SupervisorStatus where
pretty =
pretty . show
data SupervisorMessage m
= ControlAction !(ControlAction m)
| MonitorEvent !MonitorEvent
deriving (Generic)
data Process m
= WorkerProcess !(Worker m)
| SupervisorProcess !(Supervisor m)
data ProcessSpec m
= WorkerSpec (WorkerOptions m)
| SupervisorSpec (SupervisorOptions m)
data Capataz m
= Capataz {
capatazSupervisor :: !(Supervisor m)
, capatazTeardown :: !Teardown
}
instance HasTeardown (Capataz m) where
getTeardown Capataz {capatazTeardown} =
capatazTeardown
data ParentSupervisorEnv m
= ParentSupervisorEnv {
supervisorId :: !SupervisorId
, supervisorName :: !SupervisorName
, supervisorNotify :: !(SupervisorMessage m -> m ())
, notifyEvent :: !(CapatazEvent -> m ())
}
data SupervisorEnv m
= SupervisorEnv {
supervisorId :: !SupervisorId
, supervisorName :: !SupervisorName
, supervisorNotify :: !(SupervisorMessage m -> m ())
, supervisorGetNotification :: !(STM (SupervisorMessage m))
, supervisorProcessMap :: !(IORef (ProcessMap m))
, supervisorStatusVar :: !(TVar SupervisorStatus)
, supervisorOptions :: !(SupervisorOptions m)
, supervisorIntensity :: !Int
, supervisorPeriodSeconds :: !NominalDiffTime
, supervisorRestartStrategy :: !SupervisorRestartStrategy
, supervisorProcessTerminationOrder :: !ProcessTerminationOrder
, supervisorOnIntensityReached :: !(m ())
, supervisorOnIntensityReached :: !(SomeException -> m ())
, notifyEvent :: !(CapatazEvent -> m ())
}
defCapatazOptions
:: Monad m
=> Text
-> (CapatazOptions m -> CapatazOptions m)
-> CapatazOptions m
defCapatazOptions supervisorName modFn = modFn CapatazOptions
{ supervisorName
, supervisorIntensity = 2
, supervisorPeriodSeconds = 5
, supervisorRestartStrategy = defSupervisorRestartStategy
, supervisorProcessSpecList = []
, supervisorProcessTerminationOrder = OldestFirst
, supervisorOnIntensityReached = return ()
, supervisorOnFailure = const $ return ()
, notifyEvent = const $ return ()
}
supervisorSpec
:: Monad m
=> SupervisorName
-> (SupervisorOptions m -> SupervisorOptions m)
-> ProcessSpec m
supervisorSpec sName modFn =
SupervisorSpec (buildSupervisorOptions sName modFn)
{-# INLINE supervisorSpec #-}
supervisorSpecWithDefaults
:: Monad m
=> SupervisorName
-> ProcessSpec m
supervisorSpecWithDefaults sName = supervisorSpec sName id
{-# INLINE supervisorSpecWithDefaults #-}
workerSpec
:: Monad m
=> WorkerName
-> m ()
-> (WorkerOptions m -> WorkerOptions m)
-> ProcessSpec m
workerSpec wName wAction modFn =
WorkerSpec (buildWorkerOptions wName wAction modFn)
{-# INLINE workerSpec #-}
workerSpec1
:: Monad m
=> WorkerName
-> (WorkerId -> m ())
-> (WorkerOptions m -> WorkerOptions m)
-> ProcessSpec m
workerSpec1 wName wAction modFn =
WorkerSpec (buildWorkerOptions1 wName wAction modFn)
{-# INLINE workerSpec1 #-}
workerSpecWithDefaults
:: Monad m
=> WorkerName
-> m ()
-> ProcessSpec m
workerSpecWithDefaults wName wAction = workerSpec wName wAction id
{-# INLINE workerSpecWithDefaults #-}
buildSupervisorOptions
:: Monad m
=> SupervisorName
-> (SupervisorOptions m -> SupervisorOptions m)
-> SupervisorOptions m
buildSupervisorOptions supervisorName modFn = modFn SupervisorOptions
{ supervisorName
, supervisorIntensity = 2
, supervisorPeriodSeconds = 5
, supervisorRestartStrategy = defSupervisorRestartStategy
, supervisorProcessSpecList = []
, supervisorProcessTerminationOrder = OldestFirst
, supervisorOnIntensityReached = return ()
, supervisorOnFailure = const $ return ()
}
{-# INLINE buildSupervisorOptions #-}
buildSupervisorOptionsWithDefaults
:: Monad m
=> SupervisorName
-> SupervisorOptions m
buildSupervisorOptionsWithDefaults = flip buildSupervisorOptions id
{-# INLINE buildSupervisorOptionsWithDefaults #-}
buildWorkerOptions
:: Monad m
=> WorkerName
-> m ()
-> (WorkerOptions m -> WorkerOptions m)
-> WorkerOptions m
buildWorkerOptions workerName workerAction f =
buildWorkerOptions1 workerName (const workerAction) f
{-# INLINE buildWorkerOptions #-}
buildWorkerOptions1
:: Monad m
=> WorkerName
-> (WorkerId -> m ())
-> (WorkerOptions m -> WorkerOptions m)
-> WorkerOptions m
buildWorkerOptions1 workerName workerAction f = f WorkerOptions
{ workerName
, workerAction = workerAction
, workerOnFailure = const $ return ()
, workerOnCompletion = return ()
, workerOnTermination = return ()
, workerTerminationPolicy = defWorkerTerminationPolicy
, workerRestartStrategy = defWorkerRestartStrategy
}
{-# INLINE buildWorkerOptions1 #-}
buildWorkerOptionsWithDefaults
:: Monad m
=> WorkerName
-> m ()
-> WorkerOptions m
buildWorkerOptionsWithDefaults wName wAction =
buildWorkerOptions wName wAction id
{-# INLINE buildWorkerOptionsWithDefaults #-}
getMaskingState :: MonadIO m => m UnsafeE.MaskingState
getMaskingState = liftIO UnsafeE.getMaskingState
unsafeTry :: (Exception e, MonadUnliftIO m) => m a -> m (Either e a)
unsafeTry action = withRunInIO $ \run -> UnsafeE.try (run action)
fromAnyException :: Exception e => SomeException -> Maybe e
fromAnyException ex = case UnsafeE.fromException ex of
Just (UnsafeE.SomeAsyncException innerEx1) -> case cast innerEx1 of
Just (AsyncExceptionWrapper innerEx2) -> cast innerEx2
Nothing -> cast innerEx1
Nothing -> fromException ex