{-
  Humble module inspired to Erlang supervisors,
  with minimal dependencies.
-}

{-# LANGUAGE GADTs #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}

module Control.Concurrent.Supervisor.Types
  ( SupervisionCtx
  , Supervisor
  , QueueLike(..)
  , Child_
  , DeadLetter
  , RestartAction
  , SupervisionEvent(..)
  , RestartStrategy(..)
  , RestartResult(..)
  -- * Creating a new supervisor
  -- $new
  , newSupervisor
  -- * Restart Policies
  , fibonacciRetryPolicy
  -- * Stopping a supervisor
  -- $shutdown
  , shutdownSupervisor
  -- * Accessing Supervisor event log
  -- $log
  , eventStream
  , activeChildren
  -- * Supervise a forked thread
  -- $fork
  , forkSupervised
  -- * Monitor another supervisor
  -- $monitor
  , monitorWith
  ) where

import           Control.Concurrent
import           Control.Concurrent.STM
import           Control.Exception
import           Control.Monad
import           Control.Monad.IO.Class
import           Control.Retry
import qualified Data.HashMap.Strict as Map
import           Data.IORef
import           Data.Time
import           Numeric.Natural
import           System.Clock (Clock(Monotonic), TimeSpec, getTime)

--------------------------------------------------------------------------------
type Mailbox = TChan DeadLetter

--------------------------------------------------------------------------------
data SupervisionCtx q = SupervisionCtx {
    SupervisionCtx q -> Mailbox
_sc_mailbox          :: Mailbox
  , SupervisionCtx q -> IORef (Maybe Mailbox)
_sc_parent_mailbox   :: !(IORef (Maybe Mailbox))
  -- ^ The mailbox of the parent process (which is monitoring this one), if any.
  , SupervisionCtx q -> IORef (HashMap ThreadId (Child_ q))
_sc_children         :: !(IORef (Map.HashMap ThreadId (Child_ q)))
  , SupervisionCtx q -> q SupervisionEvent
_sc_eventStream      :: q SupervisionEvent
  , SupervisionCtx q -> Natural
_sc_eventStreamSize  :: !Natural
  , SupervisionCtx q -> RestartStrategy
_sc_strategy         :: !RestartStrategy
  }

--------------------------------------------------------------------------------
data Supervisor q = Supervisor {
        Supervisor q -> ThreadId
_sp_myTid          :: !ThreadId
      , Supervisor q -> SupervisionCtx q
_sp_ctx            :: !(SupervisionCtx q)
      }

class QueueLike q where
  newQueueIO :: Natural -> IO (q a)
  readQueue  :: q a -> STM a
  writeQueue :: q a -> a -> STM ()

instance QueueLike TQueue where
  newQueueIO :: Natural -> IO (TQueue a)
newQueueIO = IO (TQueue a) -> Natural -> IO (TQueue a)
forall a b. a -> b -> a
const IO (TQueue a)
forall a. IO (TQueue a)
newTQueueIO
  readQueue :: TQueue a -> STM a
readQueue  = TQueue a -> STM a
forall a. TQueue a -> STM a
readTQueue
  writeQueue :: TQueue a -> a -> STM ()
writeQueue = TQueue a -> a -> STM ()
forall a. TQueue a -> a -> STM ()
writeTQueue

instance QueueLike TBQueue where
  newQueueIO :: Natural -> IO (TBQueue a)
newQueueIO = Natural -> IO (TBQueue a)
forall a. Natural -> IO (TBQueue a)
newTBQueueIO
  readQueue :: TBQueue a -> STM a
readQueue  = TBQueue a -> STM a
forall a. TBQueue a -> STM a
readTBQueue
  writeQueue :: TBQueue a -> a -> STM ()
writeQueue TBQueue a
q a
e = do
    Bool
isFull <- TBQueue a -> STM Bool
forall a. TBQueue a -> STM Bool
isFullTBQueue TBQueue a
q
    Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
isFull (STM () -> STM ()) -> STM () -> STM ()
forall a b. (a -> b) -> a -> b
$ TBQueue a -> a -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue a
q a
e

--------------------------------------------------------------------------------
data DeadLetter = DeadLetter !LetterEpoch !ThreadId !SomeException

--------------------------------------------------------------------------------
type Epoch = TimeSpec
newtype LetterEpoch = LetterEpoch Epoch deriving Int -> LetterEpoch -> ShowS
[LetterEpoch] -> ShowS
LetterEpoch -> String
(Int -> LetterEpoch -> ShowS)
-> (LetterEpoch -> String)
-> ([LetterEpoch] -> ShowS)
-> Show LetterEpoch
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [LetterEpoch] -> ShowS
$cshowList :: [LetterEpoch] -> ShowS
show :: LetterEpoch -> String
$cshow :: LetterEpoch -> String
showsPrec :: Int -> LetterEpoch -> ShowS
$cshowsPrec :: Int -> LetterEpoch -> ShowS
Show
newtype ChildEpoch  = ChildEpoch  Epoch deriving Int -> ChildEpoch -> ShowS
[ChildEpoch] -> ShowS
ChildEpoch -> String
(Int -> ChildEpoch -> ShowS)
-> (ChildEpoch -> String)
-> ([ChildEpoch] -> ShowS)
-> Show ChildEpoch
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [ChildEpoch] -> ShowS
$cshowList :: [ChildEpoch] -> ShowS
show :: ChildEpoch -> String
$cshow :: ChildEpoch -> String
showsPrec :: Int -> ChildEpoch -> ShowS
$cshowsPrec :: Int -> ChildEpoch -> ShowS
Show

--------------------------------------------------------------------------------
data RestartResult =
    Restarted !ThreadId !ThreadId !RetryStatus !UTCTime
    -- ^ The supervised `Child_` was restarted successfully.
  | StaleDeadLetter !ThreadId !LetterEpoch !ChildEpoch !UTCTime
    -- ^ A stale `DeadLetter` was received.
  | RestartFailed SupervisionEvent
    -- ^ The restart failed for a reason decribed by a `SupervisionEvent`
  deriving Int -> RestartResult -> ShowS
[RestartResult] -> ShowS
RestartResult -> String
(Int -> RestartResult -> ShowS)
-> (RestartResult -> String)
-> ([RestartResult] -> ShowS)
-> Show RestartResult
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [RestartResult] -> ShowS
$cshowList :: [RestartResult] -> ShowS
show :: RestartResult -> String
$cshow :: RestartResult -> String
showsPrec :: Int -> RestartResult -> ShowS
$cshowsPrec :: Int -> RestartResult -> ShowS
Show

--------------------------------------------------------------------------------
data Child_ q = Worker !ChildEpoch !RetryStatus (RetryPolicyM IO) RestartAction
              | Supvsr !ChildEpoch !RetryStatus (RetryPolicyM IO) !(Supervisor q)

--------------------------------------------------------------------------------
type RestartAction = ThreadId -> IO ThreadId

--------------------------------------------------------------------------------
data SupervisionEvent =
     ChildBorn !ThreadId !UTCTime
   | ChildDied !ThreadId !SomeException !UTCTime
   | ChildRestarted !ThreadId !ThreadId !RetryStatus !UTCTime
   | ChildNotFound  !ThreadId !UTCTime
   | StaleDeadLetterReceived  !ThreadId !LetterEpoch !ChildEpoch !UTCTime
   | ChildRestartLimitReached !ThreadId !RetryStatus !UTCTime
   | ChildFinished !ThreadId !UTCTime
   deriving Int -> SupervisionEvent -> ShowS
[SupervisionEvent] -> ShowS
SupervisionEvent -> String
(Int -> SupervisionEvent -> ShowS)
-> (SupervisionEvent -> String)
-> ([SupervisionEvent] -> ShowS)
-> Show SupervisionEvent
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [SupervisionEvent] -> ShowS
$cshowList :: [SupervisionEvent] -> ShowS
show :: SupervisionEvent -> String
$cshow :: SupervisionEvent -> String
showsPrec :: Int -> SupervisionEvent -> ShowS
$cshowsPrec :: Int -> SupervisionEvent -> ShowS
Show

--------------------------------------------------------------------------------
-- | Erlang inspired strategies. At the moment only the 'OneForOne' is
-- implemented.
data RestartStrategy = OneForOne
  deriving Int -> RestartStrategy -> ShowS
[RestartStrategy] -> ShowS
RestartStrategy -> String
(Int -> RestartStrategy -> ShowS)
-> (RestartStrategy -> String)
-> ([RestartStrategy] -> ShowS)
-> Show RestartStrategy
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [RestartStrategy] -> ShowS
$cshowList :: [RestartStrategy] -> ShowS
show :: RestartStrategy -> String
$cshow :: RestartStrategy -> String
showsPrec :: Int -> RestartStrategy -> ShowS
$cshowsPrec :: Int -> RestartStrategy -> ShowS
Show

--------------------------------------------------------------------------------
-- | Smart constructor which offers a default throttling based on
-- fibonacci numbers.
fibonacciRetryPolicy :: RetryPolicyM IO
fibonacciRetryPolicy :: RetryPolicyM IO
fibonacciRetryPolicy = Int -> RetryPolicyM IO
forall (m :: * -> *). Monad m => Int -> RetryPolicyM m
fibonacciBackoff Int
100

--------------------------------------------------------------------------------
getEpoch :: MonadIO m => m Epoch
getEpoch :: m Epoch
getEpoch = IO Epoch -> m Epoch
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Epoch -> m Epoch) -> IO Epoch -> m Epoch
forall a b. (a -> b) -> a -> b
$ Clock -> IO Epoch
getTime Clock
Monotonic

--------------------------------------------------------------------------------
tryNotifyParent :: IORef (Maybe Mailbox) -> ThreadId -> SomeException -> IO ()
tryNotifyParent :: IORef (Maybe Mailbox) -> ThreadId -> SomeException -> IO ()
tryNotifyParent IORef (Maybe Mailbox)
mbPMbox ThreadId
myId SomeException
ex = do
  IORef (Maybe Mailbox) -> IO (Maybe Mailbox)
forall a. IORef a -> IO a
readIORef IORef (Maybe Mailbox)
mbPMbox IO (Maybe Mailbox) -> (Maybe Mailbox -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \Maybe Mailbox
m -> case Maybe Mailbox
m of
    Maybe Mailbox
Nothing -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
    Just Mailbox
m' -> do
      Epoch
e <- IO Epoch
forall (m :: * -> *). MonadIO m => m Epoch
getEpoch
      STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ Mailbox -> DeadLetter -> STM ()
forall a. TChan a -> a -> STM ()
writeTChan Mailbox
m' (LetterEpoch -> ThreadId -> SomeException -> DeadLetter
DeadLetter (Epoch -> LetterEpoch
LetterEpoch Epoch
e) ThreadId
myId SomeException
ex)

-- $new
-- In order to create a new supervisor, you need a `SupervisorSpec`,
-- which can be acquired by a call to `newSupervisor`:

-- $supervise

--------------------------------------------------------------------------------
newSupervisor :: QueueLike q
              => RestartStrategy
              -> Natural
              -> IO (Supervisor q)
newSupervisor :: RestartStrategy -> Natural -> IO (Supervisor q)
newSupervisor RestartStrategy
strategy Natural
size = do
  IORef (Maybe Mailbox)
parentMbx <- Maybe Mailbox -> IO (IORef (Maybe Mailbox))
forall a. a -> IO (IORef a)
newIORef Maybe Mailbox
forall a. Maybe a
Nothing
  Mailbox
mbx <- IO Mailbox
forall a. IO (TChan a)
newTChanIO
  q SupervisionEvent
es  <- Natural -> IO (q SupervisionEvent)
forall (q :: * -> *) a. QueueLike q => Natural -> IO (q a)
newQueueIO Natural
size
  IORef (HashMap ThreadId (Child_ q))
cld <- HashMap ThreadId (Child_ q)
-> IO (IORef (HashMap ThreadId (Child_ q)))
forall a. a -> IO (IORef a)
newIORef HashMap ThreadId (Child_ q)
forall k v. HashMap k v
Map.empty
  let ctx :: SupervisionCtx q
ctx = SupervisionCtx :: forall (q :: * -> *).
Mailbox
-> IORef (Maybe Mailbox)
-> IORef (HashMap ThreadId (Child_ q))
-> q SupervisionEvent
-> Natural
-> RestartStrategy
-> SupervisionCtx q
SupervisionCtx {
          _sc_mailbox :: Mailbox
_sc_mailbox         = Mailbox
mbx
        , _sc_parent_mailbox :: IORef (Maybe Mailbox)
_sc_parent_mailbox  = IORef (Maybe Mailbox)
parentMbx
        , _sc_eventStream :: q SupervisionEvent
_sc_eventStream     = q SupervisionEvent
es
        , _sc_children :: IORef (HashMap ThreadId (Child_ q))
_sc_children        = IORef (HashMap ThreadId (Child_ q))
cld
        , _sc_strategy :: RestartStrategy
_sc_strategy        = RestartStrategy
strategy
        , _sc_eventStreamSize :: Natural
_sc_eventStreamSize = Natural
size
        }
  ThreadId
tid <- IO () -> (Either SomeException () -> IO ()) -> IO ThreadId
forall a. IO a -> (Either SomeException a -> IO ()) -> IO ThreadId
forkFinally (SupervisionCtx q -> IO ()
forall (q :: * -> *). QueueLike q => SupervisionCtx q -> IO ()
handleEvents SupervisionCtx q
ctx) ((Either SomeException () -> IO ()) -> IO ThreadId)
-> (Either SomeException () -> IO ()) -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ \Either SomeException ()
res -> case Either SomeException ()
res of
    Left SomeException
ex -> do
      IO ThreadId
-> (ThreadId -> IO ThreadId) -> (ThreadId -> IO ()) -> IO ()
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket IO ThreadId
myThreadId ThreadId -> IO ThreadId
forall (m :: * -> *) a. Monad m => a -> m a
return ((ThreadId -> IO ()) -> IO ()) -> (ThreadId -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \ThreadId
myId -> do
        -- If we have a parent supervisor watching us, notify it we died.
        IORef (Maybe Mailbox) -> ThreadId -> SomeException -> IO ()
tryNotifyParent IORef (Maybe Mailbox)
parentMbx ThreadId
myId SomeException
ex
    Right ()
v -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
v
  SupervisionCtx q -> ThreadId -> IO (Supervisor q)
forall (m :: * -> *) (q :: * -> *).
Monad m =>
SupervisionCtx q -> ThreadId -> m (Supervisor q)
go SupervisionCtx q
ctx ThreadId
tid
  where
    go :: SupervisionCtx q -> ThreadId -> m (Supervisor q)
go SupervisionCtx q
ctx ThreadId
tid = do
      Supervisor q -> m (Supervisor q)
forall (m :: * -> *) a. Monad m => a -> m a
return Supervisor :: forall (q :: * -> *). ThreadId -> SupervisionCtx q -> Supervisor q
Supervisor {
        _sp_myTid :: ThreadId
_sp_myTid = ThreadId
tid
      , _sp_ctx :: SupervisionCtx q
_sp_ctx   = SupervisionCtx q
ctx
      }

-- $log

--------------------------------------------------------------------------------
-- | Gives you access to the event this supervisor is generating, allowing you
-- to react. It's using a bounded queue to explicitly avoid memory leaks in case
-- you do not want to drain the queue to listen to incoming events.
eventStream :: QueueLike q => Supervisor q -> q SupervisionEvent
eventStream :: Supervisor q -> q SupervisionEvent
eventStream Supervisor{SupervisionCtx q
_sp_ctx :: SupervisionCtx q
_sp_ctx :: forall (q :: * -> *). Supervisor q -> SupervisionCtx q
_sp_ctx} = SupervisionCtx q -> q SupervisionEvent
forall (q :: * -> *). SupervisionCtx q -> q SupervisionEvent
_sc_eventStream SupervisionCtx q
_sp_ctx

--------------------------------------------------------------------------------
-- | Returns the number of active threads at a given moment in time.
activeChildren :: QueueLike q => Supervisor q -> IO Int
activeChildren :: Supervisor q -> IO Int
activeChildren Supervisor{SupervisionCtx q
_sp_ctx :: SupervisionCtx q
_sp_ctx :: forall (q :: * -> *). Supervisor q -> SupervisionCtx q
_sp_ctx} = do
  IORef (HashMap ThreadId (Child_ q))
-> IO (HashMap ThreadId (Child_ q))
forall a. IORef a -> IO a
readIORef (SupervisionCtx q -> IORef (HashMap ThreadId (Child_ q))
forall (q :: * -> *).
SupervisionCtx q -> IORef (HashMap ThreadId (Child_ q))
_sc_children SupervisionCtx q
_sp_ctx) IO (HashMap ThreadId (Child_ q))
-> (HashMap ThreadId (Child_ q) -> IO Int) -> IO Int
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Int -> IO Int
forall (m :: * -> *) a. Monad m => a -> m a
return (Int -> IO Int)
-> (HashMap ThreadId (Child_ q) -> Int)
-> HashMap ThreadId (Child_ q)
-> IO Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [ThreadId] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length ([ThreadId] -> Int)
-> (HashMap ThreadId (Child_ q) -> [ThreadId])
-> HashMap ThreadId (Child_ q)
-> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. HashMap ThreadId (Child_ q) -> [ThreadId]
forall k v. HashMap k v -> [k]
Map.keys

-- $shutdown

--------------------------------------------------------------------------------
-- | Shutdown the given supervisor. This will cause the supervised children to
-- be killed as well. To do so, we explore the children tree, killing workers as we go,
-- and recursively calling `shutdownSupervisor` in case we hit a monitored `Supervisor`.
shutdownSupervisor :: QueueLike q => Supervisor q -> IO ()
shutdownSupervisor :: Supervisor q -> IO ()
shutdownSupervisor (Supervisor ThreadId
tid SupervisionCtx q
ctx) = do
  HashMap ThreadId (Child_ q)
chMap <- IORef (HashMap ThreadId (Child_ q))
-> IO (HashMap ThreadId (Child_ q))
forall a. IORef a -> IO a
readIORef (SupervisionCtx q -> IORef (HashMap ThreadId (Child_ q))
forall (q :: * -> *).
SupervisionCtx q -> IORef (HashMap ThreadId (Child_ q))
_sc_children SupervisionCtx q
ctx)
  [(ThreadId, Child_ q)] -> IO ()
forall (q :: * -> *).
QueueLike q =>
[(ThreadId, Child_ q)] -> IO ()
processChildren (HashMap ThreadId (Child_ q) -> [(ThreadId, Child_ q)]
forall k v. HashMap k v -> [(k, v)]
Map.toList HashMap ThreadId (Child_ q)
chMap)
  ThreadId -> IO ()
killThread ThreadId
tid
  where
    processChildren :: [(ThreadId, Child_ q)] -> IO ()
processChildren [] = () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
    processChildren ((ThreadId, Child_ q)
x:[(ThreadId, Child_ q)]
xs) = do
      case (ThreadId, Child_ q)
x of
        (ThreadId
workerTid, Worker{}) -> ThreadId -> IO ()
killThread ThreadId
workerTid
        (ThreadId
_, Supvsr ChildEpoch
_ RetryStatus
_ RetryPolicyM IO
_ Supervisor q
s) -> Supervisor q -> IO ()
forall (q :: * -> *). QueueLike q => Supervisor q -> IO ()
shutdownSupervisor Supervisor q
s
      [(ThreadId, Child_ q)] -> IO ()
processChildren [(ThreadId, Child_ q)]
xs

-- $fork

--------------------------------------------------------------------------------
-- | Fork a thread in a supervised mode.
forkSupervised :: QueueLike q
               => Supervisor q
               -- ^ The 'Supervisor'
               -> RetryPolicyM IO
               -- ^ The retry policy to use
               -> IO ()
               -- ^ The computation to run
               -> IO ThreadId
forkSupervised :: Supervisor q -> RetryPolicyM IO -> IO () -> IO ThreadId
forkSupervised sup :: Supervisor q
sup@Supervisor{ThreadId
SupervisionCtx q
_sp_ctx :: SupervisionCtx q
_sp_myTid :: ThreadId
_sp_ctx :: forall (q :: * -> *). Supervisor q -> SupervisionCtx q
_sp_myTid :: forall (q :: * -> *). Supervisor q -> ThreadId
..} RetryPolicyM IO
policy IO ()
act =
  IO ThreadId
-> (ThreadId -> IO ThreadId)
-> (ThreadId -> IO ThreadId)
-> IO ThreadId
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket (Supervisor q -> IO () -> IO ThreadId
forall (q :: * -> *).
QueueLike q =>
Supervisor q -> IO () -> IO ThreadId
supervised Supervisor q
sup IO ()
act) ThreadId -> IO ThreadId
forall (m :: * -> *) a. Monad m => a -> m a
return ((ThreadId -> IO ThreadId) -> IO ThreadId)
-> (ThreadId -> IO ThreadId) -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ \ThreadId
newChild -> do
    Epoch
e <- IO Epoch
forall (m :: * -> *). MonadIO m => m Epoch
getEpoch
    let ch :: Child_ q
ch = ChildEpoch
-> RetryStatus
-> RetryPolicyM IO
-> (ThreadId -> IO ThreadId)
-> Child_ q
forall (q :: * -> *).
ChildEpoch
-> RetryStatus
-> RetryPolicyM IO
-> (ThreadId -> IO ThreadId)
-> Child_ q
Worker (Epoch -> ChildEpoch
ChildEpoch Epoch
e) RetryStatus
defaultRetryStatus RetryPolicyM IO
policy (IO ThreadId -> ThreadId -> IO ThreadId
forall a b. a -> b -> a
const (Supervisor q -> IO () -> IO ThreadId
forall (q :: * -> *).
QueueLike q =>
Supervisor q -> IO () -> IO ThreadId
supervised Supervisor q
sup IO ()
act))
    IORef (HashMap ThreadId (Child_ q))
-> (HashMap ThreadId (Child_ q)
    -> (HashMap ThreadId (Child_ q), ()))
-> IO ()
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' (SupervisionCtx q -> IORef (HashMap ThreadId (Child_ q))
forall (q :: * -> *).
SupervisionCtx q -> IORef (HashMap ThreadId (Child_ q))
_sc_children SupervisionCtx q
_sp_ctx) ((HashMap ThreadId (Child_ q) -> (HashMap ThreadId (Child_ q), ()))
 -> IO ())
-> (HashMap ThreadId (Child_ q)
    -> (HashMap ThreadId (Child_ q), ()))
-> IO ()
forall a b. (a -> b) -> a -> b
$ \HashMap ThreadId (Child_ q)
chMap -> (ThreadId
-> Child_ q
-> HashMap ThreadId (Child_ q)
-> HashMap ThreadId (Child_ q)
forall k v.
(Eq k, Hashable k) =>
k -> v -> HashMap k v -> HashMap k v
Map.insert ThreadId
newChild Child_ q
ch HashMap ThreadId (Child_ q)
chMap, ())
    UTCTime
now <- IO UTCTime
getCurrentTime
    STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ q SupervisionEvent -> SupervisionEvent -> STM ()
forall (q :: * -> *) a. QueueLike q => q a -> a -> STM ()
writeQueue (SupervisionCtx q -> q SupervisionEvent
forall (q :: * -> *). SupervisionCtx q -> q SupervisionEvent
_sc_eventStream SupervisionCtx q
_sp_ctx) (ThreadId -> UTCTime -> SupervisionEvent
ChildBorn ThreadId
newChild UTCTime
now)
    ThreadId -> IO ThreadId
forall (m :: * -> *) a. Monad m => a -> m a
return ThreadId
newChild

--------------------------------------------------------------------------------
supervised :: QueueLike q => Supervisor q -> IO () -> IO ThreadId
supervised :: Supervisor q -> IO () -> IO ThreadId
supervised Supervisor{ThreadId
SupervisionCtx q
_sp_ctx :: SupervisionCtx q
_sp_myTid :: ThreadId
_sp_ctx :: forall (q :: * -> *). Supervisor q -> SupervisionCtx q
_sp_myTid :: forall (q :: * -> *). Supervisor q -> ThreadId
..} IO ()
act = IO () -> (Either SomeException () -> IO ()) -> IO ThreadId
forall a. IO a -> (Either SomeException a -> IO ()) -> IO ThreadId
forkFinally IO ()
act ((Either SomeException () -> IO ()) -> IO ThreadId)
-> (Either SomeException () -> IO ()) -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ \Either SomeException ()
res -> case Either SomeException ()
res of
  Left SomeException
ex -> IO ThreadId
-> (ThreadId -> IO ThreadId) -> (ThreadId -> IO ()) -> IO ()
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket IO ThreadId
myThreadId ThreadId -> IO ThreadId
forall (m :: * -> *) a. Monad m => a -> m a
return ((ThreadId -> IO ()) -> IO ()) -> (ThreadId -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \ThreadId
myId -> do
    Epoch
e <- IO Epoch
forall (m :: * -> *). MonadIO m => m Epoch
getEpoch
    STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ Mailbox -> DeadLetter -> STM ()
forall a. TChan a -> a -> STM ()
writeTChan (SupervisionCtx q -> Mailbox
forall (q :: * -> *). SupervisionCtx q -> Mailbox
_sc_mailbox SupervisionCtx q
_sp_ctx) (LetterEpoch -> ThreadId -> SomeException -> DeadLetter
DeadLetter (Epoch -> LetterEpoch
LetterEpoch Epoch
e) ThreadId
myId SomeException
ex)
  Right ()
_ -> IO ThreadId
-> (ThreadId -> IO ThreadId) -> (ThreadId -> IO ()) -> IO ()
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket IO ThreadId
myThreadId ThreadId -> IO ThreadId
forall (m :: * -> *) a. Monad m => a -> m a
return ((ThreadId -> IO ()) -> IO ()) -> (ThreadId -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \ThreadId
myId -> do
    UTCTime
now <- IO UTCTime
getCurrentTime
    IORef (HashMap ThreadId (Child_ q))
-> (HashMap ThreadId (Child_ q)
    -> (HashMap ThreadId (Child_ q), ()))
-> IO ()
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' (SupervisionCtx q -> IORef (HashMap ThreadId (Child_ q))
forall (q :: * -> *).
SupervisionCtx q -> IORef (HashMap ThreadId (Child_ q))
_sc_children SupervisionCtx q
_sp_ctx) ((HashMap ThreadId (Child_ q) -> (HashMap ThreadId (Child_ q), ()))
 -> IO ())
-> (HashMap ThreadId (Child_ q)
    -> (HashMap ThreadId (Child_ q), ()))
-> IO ()
forall a b. (a -> b) -> a -> b
$ \HashMap ThreadId (Child_ q)
chMap -> (ThreadId
-> HashMap ThreadId (Child_ q) -> HashMap ThreadId (Child_ q)
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> HashMap k v
Map.delete ThreadId
myId HashMap ThreadId (Child_ q)
chMap, ())
    STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ q SupervisionEvent -> SupervisionEvent -> STM ()
forall (q :: * -> *) a. QueueLike q => q a -> a -> STM ()
writeQueue (SupervisionCtx q -> q SupervisionEvent
forall (q :: * -> *). SupervisionCtx q -> q SupervisionEvent
_sc_eventStream SupervisionCtx q
_sp_ctx) (ThreadId -> UTCTime -> SupervisionEvent
ChildFinished ThreadId
myId UTCTime
now)

--------------------------------------------------------------------------------
-- | Ignore any stale `DeadLetter`, which is a `DeadLetter` with an `Epoch`
-- smaller than the one stored in the `Child_` to restart. Such stale `DeadLetter`
-- are simply ignored.
ignoringStaleLetters :: ThreadId
                     -> LetterEpoch
                     -> ChildEpoch
                     -> IO RestartResult
                     -> IO RestartResult
ignoringStaleLetters :: ThreadId
-> LetterEpoch
-> ChildEpoch
-> IO RestartResult
-> IO RestartResult
ignoringStaleLetters ThreadId
tid deadLetterEpoch :: LetterEpoch
deadLetterEpoch@(LetterEpoch Epoch
l) childEpoch :: ChildEpoch
childEpoch@(ChildEpoch Epoch
c) IO RestartResult
act = do
  UTCTime
now <- IO UTCTime
getCurrentTime
  if Epoch
l Epoch -> Epoch -> Bool
forall a. Ord a => a -> a -> Bool
< Epoch
c then RestartResult -> IO RestartResult
forall (m :: * -> *) a. Monad m => a -> m a
return (ThreadId -> LetterEpoch -> ChildEpoch -> UTCTime -> RestartResult
StaleDeadLetter ThreadId
tid LetterEpoch
deadLetterEpoch ChildEpoch
childEpoch UTCTime
now) else IO RestartResult
act

--------------------------------------------------------------------------------
restartChild :: QueueLike q
             => SupervisionCtx q
             -> LetterEpoch
             -> UTCTime
             -> ThreadId
             -> IO RestartResult
restartChild :: SupervisionCtx q
-> LetterEpoch -> UTCTime -> ThreadId -> IO RestartResult
restartChild SupervisionCtx q
ctx LetterEpoch
deadLetterEpoch UTCTime
now ThreadId
newDeath = do
  HashMap ThreadId (Child_ q)
chMap <- IORef (HashMap ThreadId (Child_ q))
-> IO (HashMap ThreadId (Child_ q))
forall a. IORef a -> IO a
readIORef (SupervisionCtx q -> IORef (HashMap ThreadId (Child_ q))
forall (q :: * -> *).
SupervisionCtx q -> IORef (HashMap ThreadId (Child_ q))
_sc_children SupervisionCtx q
ctx)
  case ThreadId -> HashMap ThreadId (Child_ q) -> Maybe (Child_ q)
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> Maybe v
Map.lookup ThreadId
newDeath HashMap ThreadId (Child_ q)
chMap of
    Maybe (Child_ q)
Nothing -> RestartResult -> IO RestartResult
forall (m :: * -> *) a. Monad m => a -> m a
return (RestartResult -> IO RestartResult)
-> RestartResult -> IO RestartResult
forall a b. (a -> b) -> a -> b
$ SupervisionEvent -> RestartResult
RestartFailed (ThreadId -> UTCTime -> SupervisionEvent
ChildNotFound ThreadId
newDeath UTCTime
now)
    Just (Worker ChildEpoch
workerEpoch RetryStatus
rState RetryPolicyM IO
rPolicy ThreadId -> IO ThreadId
act) -> ThreadId
-> LetterEpoch
-> ChildEpoch
-> IO RestartResult
-> IO RestartResult
ignoringStaleLetters ThreadId
newDeath LetterEpoch
deadLetterEpoch ChildEpoch
workerEpoch (IO RestartResult -> IO RestartResult)
-> IO RestartResult -> IO RestartResult
forall a b. (a -> b) -> a -> b
$ do
      RetryStatus
-> RetryPolicyM IO
-> (RetryStatus -> IO RestartResult)
-> (RetryStatus -> IO RestartResult)
-> IO RestartResult
runRetryPolicy RetryStatus
rState RetryPolicyM IO
rPolicy RetryStatus -> IO RestartResult
emitEventChildRestartLimitReached ((RetryStatus -> IO RestartResult) -> IO RestartResult)
-> (RetryStatus -> IO RestartResult) -> IO RestartResult
forall a b. (a -> b) -> a -> b
$ \RetryStatus
newRState -> do
        Epoch
e <- IO Epoch
forall (m :: * -> *). MonadIO m => m Epoch
getEpoch
        let ch :: Child_ q
ch = ChildEpoch
-> RetryStatus
-> RetryPolicyM IO
-> (ThreadId -> IO ThreadId)
-> Child_ q
forall (q :: * -> *).
ChildEpoch
-> RetryStatus
-> RetryPolicyM IO
-> (ThreadId -> IO ThreadId)
-> Child_ q
Worker (Epoch -> ChildEpoch
ChildEpoch Epoch
e) RetryStatus
newRState RetryPolicyM IO
rPolicy ThreadId -> IO ThreadId
act
        ThreadId
newThreadId <- ThreadId -> IO ThreadId
act ThreadId
newDeath
        IORef (HashMap ThreadId (Child_ q))
-> HashMap ThreadId (Child_ q) -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (SupervisionCtx q -> IORef (HashMap ThreadId (Child_ q))
forall (q :: * -> *).
SupervisionCtx q -> IORef (HashMap ThreadId (Child_ q))
_sc_children SupervisionCtx q
ctx) (ThreadId
-> Child_ q
-> HashMap ThreadId (Child_ q)
-> HashMap ThreadId (Child_ q)
forall k v.
(Eq k, Hashable k) =>
k -> v -> HashMap k v -> HashMap k v
Map.insert ThreadId
newThreadId Child_ q
ch (HashMap ThreadId (Child_ q) -> HashMap ThreadId (Child_ q))
-> HashMap ThreadId (Child_ q) -> HashMap ThreadId (Child_ q)
forall a b. (a -> b) -> a -> b
$! ThreadId
-> HashMap ThreadId (Child_ q) -> HashMap ThreadId (Child_ q)
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> HashMap k v
Map.delete ThreadId
newDeath HashMap ThreadId (Child_ q)
chMap)
        ThreadId -> RetryStatus -> IO RestartResult
emitEventChildRestarted ThreadId
newThreadId RetryStatus
newRState
    Just (Supvsr ChildEpoch
supervisorEpoch RetryStatus
rState RetryPolicyM IO
rPolicy (Supervisor ThreadId
deathSup SupervisionCtx q
ctx')) -> do
      ThreadId
-> LetterEpoch
-> ChildEpoch
-> IO RestartResult
-> IO RestartResult
ignoringStaleLetters ThreadId
newDeath LetterEpoch
deadLetterEpoch ChildEpoch
supervisorEpoch (IO RestartResult -> IO RestartResult)
-> IO RestartResult -> IO RestartResult
forall a b. (a -> b) -> a -> b
$ do
        RetryStatus
-> RetryPolicyM IO
-> (RetryStatus -> IO RestartResult)
-> (RetryStatus -> IO RestartResult)
-> IO RestartResult
runRetryPolicy RetryStatus
rState RetryPolicyM IO
rPolicy RetryStatus -> IO RestartResult
emitEventChildRestartLimitReached ((RetryStatus -> IO RestartResult) -> IO RestartResult)
-> (RetryStatus -> IO RestartResult) -> IO RestartResult
forall a b. (a -> b) -> a -> b
$ \RetryStatus
newRState -> do
          Epoch
e <- IO Epoch
forall (m :: * -> *). MonadIO m => m Epoch
getEpoch
          Supervisor q
restartedSup <- RestartStrategy -> Natural -> IO (Supervisor q)
forall (q :: * -> *).
QueueLike q =>
RestartStrategy -> Natural -> IO (Supervisor q)
newSupervisor (SupervisionCtx q -> RestartStrategy
forall (q :: * -> *). SupervisionCtx q -> RestartStrategy
_sc_strategy SupervisionCtx q
ctx) (SupervisionCtx q -> Natural
forall (q :: * -> *). SupervisionCtx q -> Natural
_sc_eventStreamSize SupervisionCtx q
ctx')
          let ch :: Child_ q
ch = ChildEpoch
-> RetryStatus -> RetryPolicyM IO -> Supervisor q -> Child_ q
forall (q :: * -> *).
ChildEpoch
-> RetryStatus -> RetryPolicyM IO -> Supervisor q -> Child_ q
Supvsr (Epoch -> ChildEpoch
ChildEpoch Epoch
e) RetryStatus
newRState RetryPolicyM IO
rPolicy Supervisor q
restartedSup
          -- TODO: shutdown children?
          let newThreadId :: ThreadId
newThreadId = Supervisor q -> ThreadId
forall (q :: * -> *). Supervisor q -> ThreadId
_sp_myTid Supervisor q
restartedSup
          IORef (HashMap ThreadId (Child_ q))
-> HashMap ThreadId (Child_ q) -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (SupervisionCtx q -> IORef (HashMap ThreadId (Child_ q))
forall (q :: * -> *).
SupervisionCtx q -> IORef (HashMap ThreadId (Child_ q))
_sc_children SupervisionCtx q
ctx) (ThreadId
-> Child_ q
-> HashMap ThreadId (Child_ q)
-> HashMap ThreadId (Child_ q)
forall k v.
(Eq k, Hashable k) =>
k -> v -> HashMap k v -> HashMap k v
Map.insert ThreadId
newThreadId Child_ q
ch (HashMap ThreadId (Child_ q) -> HashMap ThreadId (Child_ q))
-> HashMap ThreadId (Child_ q) -> HashMap ThreadId (Child_ q)
forall a b. (a -> b) -> a -> b
$! ThreadId
-> HashMap ThreadId (Child_ q) -> HashMap ThreadId (Child_ q)
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> HashMap k v
Map.delete ThreadId
deathSup HashMap ThreadId (Child_ q)
chMap)
          ThreadId -> RetryStatus -> IO RestartResult
emitEventChildRestarted ThreadId
newThreadId RetryStatus
newRState
  where
    emitEventChildRestarted :: ThreadId -> RetryStatus -> IO RestartResult
emitEventChildRestarted ThreadId
newThreadId RetryStatus
newRState = do
      RestartResult -> IO RestartResult
forall (m :: * -> *) a. Monad m => a -> m a
return (RestartResult -> IO RestartResult)
-> RestartResult -> IO RestartResult
forall a b. (a -> b) -> a -> b
$ ThreadId -> ThreadId -> RetryStatus -> UTCTime -> RestartResult
Restarted ThreadId
newDeath ThreadId
newThreadId RetryStatus
newRState UTCTime
now
    emitEventChildRestartLimitReached :: RetryStatus -> IO RestartResult
emitEventChildRestartLimitReached RetryStatus
newRState = do
      RestartResult -> IO RestartResult
forall (m :: * -> *) a. Monad m => a -> m a
return (RestartResult -> IO RestartResult)
-> RestartResult -> IO RestartResult
forall a b. (a -> b) -> a -> b
$ SupervisionEvent -> RestartResult
RestartFailed (ThreadId -> RetryStatus -> UTCTime -> SupervisionEvent
ChildRestartLimitReached ThreadId
newDeath RetryStatus
newRState UTCTime
now)
    runRetryPolicy :: RetryStatus
                   -> RetryPolicyM IO
                   -> (RetryStatus -> IO RestartResult)
                   -> (RetryStatus -> IO RestartResult)
                   -> IO RestartResult
    runRetryPolicy :: RetryStatus
-> RetryPolicyM IO
-> (RetryStatus -> IO RestartResult)
-> (RetryStatus -> IO RestartResult)
-> IO RestartResult
runRetryPolicy RetryStatus
rState RetryPolicyM IO
rPolicy RetryStatus -> IO RestartResult
ifAbort RetryStatus -> IO RestartResult
ifThrottle = do
     Maybe Int
maybeDelay <- RetryPolicyM IO -> RetryStatus -> IO (Maybe Int)
forall (m :: * -> *).
RetryPolicyM m -> RetryStatus -> m (Maybe Int)
getRetryPolicyM RetryPolicyM IO
rPolicy RetryStatus
rState
     case Maybe Int
maybeDelay of
       Maybe Int
Nothing -> RetryStatus -> IO RestartResult
ifAbort RetryStatus
rState
       Just Int
delay ->
         let newRState :: RetryStatus
newRState = RetryStatus
rState { rsIterNumber :: Int
rsIterNumber = RetryStatus -> Int
rsIterNumber RetryStatus
rState Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1
                                , rsCumulativeDelay :: Int
rsCumulativeDelay = RetryStatus -> Int
rsCumulativeDelay RetryStatus
rState Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
delay
                                , rsPreviousDelay :: Maybe Int
rsPreviousDelay = Int -> Maybe Int
forall a. a -> Maybe a
Just (Int -> (Int -> Int) -> Maybe Int -> Int
forall b a. b -> (a -> b) -> Maybe a -> b
maybe Int
0 (Int -> Int -> Int
forall a b. a -> b -> a
const Int
delay) (RetryStatus -> Maybe Int
rsPreviousDelay RetryStatus
rState))
                                }
         in Int -> IO ()
threadDelay Int
delay IO () -> IO RestartResult -> IO RestartResult
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> RetryStatus -> IO RestartResult
ifThrottle RetryStatus
newRState

--------------------------------------------------------------------------------
restartOneForOne :: QueueLike q
                 => SupervisionCtx q
                 -> LetterEpoch
                 -> UTCTime
                 -> ThreadId
                 -> IO RestartResult
restartOneForOne :: SupervisionCtx q
-> LetterEpoch -> UTCTime -> ThreadId -> IO RestartResult
restartOneForOne = SupervisionCtx q
-> LetterEpoch -> UTCTime -> ThreadId -> IO RestartResult
forall (q :: * -> *).
QueueLike q =>
SupervisionCtx q
-> LetterEpoch -> UTCTime -> ThreadId -> IO RestartResult
restartChild

--------------------------------------------------------------------------------
handleEvents :: QueueLike q => SupervisionCtx q -> IO ()
handleEvents :: SupervisionCtx q -> IO ()
handleEvents ctx :: SupervisionCtx q
ctx@SupervisionCtx{q SupervisionEvent
Natural
IORef (Maybe Mailbox)
IORef (HashMap ThreadId (Child_ q))
Mailbox
RestartStrategy
_sc_strategy :: RestartStrategy
_sc_eventStreamSize :: Natural
_sc_eventStream :: q SupervisionEvent
_sc_children :: IORef (HashMap ThreadId (Child_ q))
_sc_parent_mailbox :: IORef (Maybe Mailbox)
_sc_mailbox :: Mailbox
_sc_strategy :: forall (q :: * -> *). SupervisionCtx q -> RestartStrategy
_sc_eventStreamSize :: forall (q :: * -> *). SupervisionCtx q -> Natural
_sc_eventStream :: forall (q :: * -> *). SupervisionCtx q -> q SupervisionEvent
_sc_children :: forall (q :: * -> *).
SupervisionCtx q -> IORef (HashMap ThreadId (Child_ q))
_sc_parent_mailbox :: forall (q :: * -> *). SupervisionCtx q -> IORef (Maybe Mailbox)
_sc_mailbox :: forall (q :: * -> *). SupervisionCtx q -> Mailbox
..} = do
  (DeadLetter LetterEpoch
epoch ThreadId
newDeath SomeException
ex) <- STM DeadLetter -> IO DeadLetter
forall a. STM a -> IO a
atomically (STM DeadLetter -> IO DeadLetter)
-> STM DeadLetter -> IO DeadLetter
forall a b. (a -> b) -> a -> b
$ Mailbox -> STM DeadLetter
forall a. TChan a -> STM a
readTChan Mailbox
_sc_mailbox
  UTCTime
now <- IO UTCTime
getCurrentTime
  STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ q SupervisionEvent -> SupervisionEvent -> STM ()
forall (q :: * -> *) a. QueueLike q => q a -> a -> STM ()
writeQueue q SupervisionEvent
_sc_eventStream (ThreadId -> SomeException -> UTCTime -> SupervisionEvent
ChildDied ThreadId
newDeath SomeException
ex UTCTime
now)
  -- If we catch an `AsyncException`, we have nothing but good
  -- reasons NOT to restart the thread.
  case SomeException -> Maybe AsyncException
forall e. Exception e => SomeException -> Maybe e
asyncExceptionFromException SomeException
ex of
    Just (AsyncException
_ :: AsyncException) -> do
      -- Remove the `Child_` from the map, log what happenend.
      IORef (HashMap ThreadId (Child_ q))
-> (HashMap ThreadId (Child_ q)
    -> (HashMap ThreadId (Child_ q), ()))
-> IO ()
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef (HashMap ThreadId (Child_ q))
_sc_children ((HashMap ThreadId (Child_ q) -> (HashMap ThreadId (Child_ q), ()))
 -> IO ())
-> (HashMap ThreadId (Child_ q)
    -> (HashMap ThreadId (Child_ q), ()))
-> IO ()
forall a b. (a -> b) -> a -> b
$ \HashMap ThreadId (Child_ q)
chMap -> (ThreadId
-> HashMap ThreadId (Child_ q) -> HashMap ThreadId (Child_ q)
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> HashMap k v
Map.delete ThreadId
newDeath HashMap ThreadId (Child_ q)
chMap, ())
      STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ q SupervisionEvent -> SupervisionEvent -> STM ()
forall (q :: * -> *) a. QueueLike q => q a -> a -> STM ()
writeQueue q SupervisionEvent
_sc_eventStream (ThreadId -> SomeException -> UTCTime -> SupervisionEvent
ChildDied ThreadId
newDeath SomeException
ex UTCTime
now)
      SupervisionCtx q -> IO ()
forall (q :: * -> *). QueueLike q => SupervisionCtx q -> IO ()
handleEvents SupervisionCtx q
ctx
    Maybe AsyncException
Nothing -> do
      RestartResult
restartResult <- case RestartStrategy
_sc_strategy of
        RestartStrategy
OneForOne -> SupervisionCtx q
-> LetterEpoch -> UTCTime -> ThreadId -> IO RestartResult
forall (q :: * -> *).
QueueLike q =>
SupervisionCtx q
-> LetterEpoch -> UTCTime -> ThreadId -> IO RestartResult
restartOneForOne SupervisionCtx q
ctx LetterEpoch
epoch UTCTime
now ThreadId
newDeath
      -- TODO: shutdown supervisor?
      STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ case RestartResult
restartResult of
        StaleDeadLetter ThreadId
tid LetterEpoch
le ChildEpoch
we UTCTime
tm -> do
          q SupervisionEvent -> SupervisionEvent -> STM ()
forall (q :: * -> *) a. QueueLike q => q a -> a -> STM ()
writeQueue q SupervisionEvent
_sc_eventStream (ThreadId
-> LetterEpoch -> ChildEpoch -> UTCTime -> SupervisionEvent
StaleDeadLetterReceived ThreadId
tid LetterEpoch
le ChildEpoch
we UTCTime
tm)
        RestartFailed SupervisionEvent
reason -> do
          q SupervisionEvent -> SupervisionEvent -> STM ()
forall (q :: * -> *) a. QueueLike q => q a -> a -> STM ()
writeQueue q SupervisionEvent
_sc_eventStream SupervisionEvent
reason
        Restarted ThreadId
oldId ThreadId
newId RetryStatus
rStatus UTCTime
tm ->
          q SupervisionEvent -> SupervisionEvent -> STM ()
forall (q :: * -> *) a. QueueLike q => q a -> a -> STM ()
writeQueue q SupervisionEvent
_sc_eventStream (ThreadId -> ThreadId -> RetryStatus -> UTCTime -> SupervisionEvent
ChildRestarted ThreadId
oldId ThreadId
newId RetryStatus
rStatus UTCTime
tm)
      SupervisionCtx q -> IO ()
forall (q :: * -> *). QueueLike q => SupervisionCtx q -> IO ()
handleEvents SupervisionCtx q
ctx

-- $monitor

--------------------------------------------------------------------------------
-- | Monitor another supervisor. To achieve these, we simulate a new 'DeadLetter',
-- so that the first supervisor will effectively restart the monitored one.
-- Thanks to the fact that for the supervisor the restart means we just copy over
-- its internal state, it should be perfectly fine to do so.
-- Returns the `ThreadId` of the monitored supervisor.
monitorWith :: QueueLike q
            => RetryPolicyM IO
            -- ^ The retry policy to use
            -> Supervisor q
            -- ^ The supervisor
            -> Supervisor q
            -- ^ The 'supervised' supervisor
            -> IO ThreadId
monitorWith :: RetryPolicyM IO -> Supervisor q -> Supervisor q -> IO ThreadId
monitorWith RetryPolicyM IO
policy Supervisor q
sup1 Supervisor q
sup2 = do
  let sup1Children :: IORef (HashMap ThreadId (Child_ q))
sup1Children = SupervisionCtx q -> IORef (HashMap ThreadId (Child_ q))
forall (q :: * -> *).
SupervisionCtx q -> IORef (HashMap ThreadId (Child_ q))
_sc_children (Supervisor q -> SupervisionCtx q
forall (q :: * -> *). Supervisor q -> SupervisionCtx q
_sp_ctx Supervisor q
sup1)
  let sup1Mailbox :: Mailbox
sup1Mailbox  = SupervisionCtx q -> Mailbox
forall (q :: * -> *). SupervisionCtx q -> Mailbox
_sc_mailbox (Supervisor q -> SupervisionCtx q
forall (q :: * -> *). Supervisor q -> SupervisionCtx q
_sp_ctx Supervisor q
sup1)
  let sup2Id :: ThreadId
sup2Id = Supervisor q -> ThreadId
forall (q :: * -> *). Supervisor q -> ThreadId
_sp_myTid Supervisor q
sup2
  let sup2ParentMailbox :: IORef (Maybe Mailbox)
sup2ParentMailbox = SupervisionCtx q -> IORef (Maybe Mailbox)
forall (q :: * -> *). SupervisionCtx q -> IORef (Maybe Mailbox)
_sc_parent_mailbox (Supervisor q -> SupervisionCtx q
forall (q :: * -> *). Supervisor q -> SupervisionCtx q
_sp_ctx Supervisor q
sup2)

  IORef (Maybe Mailbox) -> IO (Maybe Mailbox)
forall a. IORef a -> IO a
readIORef IORef (Maybe Mailbox)
sup2ParentMailbox IO (Maybe Mailbox) -> (Maybe Mailbox -> IO ThreadId) -> IO ThreadId
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \Maybe Mailbox
mbox -> case Maybe Mailbox
mbox of
    Just Mailbox
_ -> ThreadId -> IO ThreadId
forall (m :: * -> *) a. Monad m => a -> m a
return ThreadId
sup2Id -- Do nothing, this supervisor is already being monitored.
    Maybe Mailbox
Nothing -> do
      Epoch
e <- IO Epoch
forall (m :: * -> *). MonadIO m => m Epoch
getEpoch
      let sup2RetryStatus :: RetryStatus
sup2RetryStatus = RetryStatus
defaultRetryStatus
      let ch' :: Child_ q
ch' = ChildEpoch
-> RetryStatus -> RetryPolicyM IO -> Supervisor q -> Child_ q
forall (q :: * -> *).
ChildEpoch
-> RetryStatus -> RetryPolicyM IO -> Supervisor q -> Child_ q
Supvsr (Epoch -> ChildEpoch
ChildEpoch Epoch
e) RetryStatus
sup2RetryStatus RetryPolicyM IO
policy Supervisor q
sup2
      IORef (HashMap ThreadId (Child_ q))
-> (HashMap ThreadId (Child_ q)
    -> (HashMap ThreadId (Child_ q), ()))
-> IO ()
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef (HashMap ThreadId (Child_ q))
sup1Children ((HashMap ThreadId (Child_ q) -> (HashMap ThreadId (Child_ q), ()))
 -> IO ())
-> (HashMap ThreadId (Child_ q)
    -> (HashMap ThreadId (Child_ q), ()))
-> IO ()
forall a b. (a -> b) -> a -> b
$ \HashMap ThreadId (Child_ q)
chMap -> (ThreadId
-> Child_ q
-> HashMap ThreadId (Child_ q)
-> HashMap ThreadId (Child_ q)
forall k v.
(Eq k, Hashable k) =>
k -> v -> HashMap k v -> HashMap k v
Map.insert ThreadId
sup2Id Child_ q
ch' HashMap ThreadId (Child_ q)
chMap, ())
      Mailbox
duped <- STM Mailbox -> IO Mailbox
forall a. STM a -> IO a
atomically (STM Mailbox -> IO Mailbox) -> STM Mailbox -> IO Mailbox
forall a b. (a -> b) -> a -> b
$ Mailbox -> STM Mailbox
forall a. TChan a -> STM (TChan a)
dupTChan Mailbox
sup1Mailbox
      IORef (Maybe Mailbox)
-> (Maybe Mailbox -> (Maybe Mailbox, ())) -> IO ()
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef (Maybe Mailbox)
sup2ParentMailbox ((Maybe Mailbox -> (Maybe Mailbox, ())) -> IO ())
-> (Maybe Mailbox -> (Maybe Mailbox, ())) -> IO ()
forall a b. (a -> b) -> a -> b
$ (Maybe Mailbox, ()) -> Maybe Mailbox -> (Maybe Mailbox, ())
forall a b. a -> b -> a
const (Mailbox -> Maybe Mailbox
forall a. a -> Maybe a
Just Mailbox
duped, ())
      ThreadId -> IO ThreadId
forall (m :: * -> *) a. Monad m => a -> m a
return ThreadId
sup2Id