{-# LANGUAGE GADTs #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Control.Concurrent.Supervisor.Types
( SupervisionCtx
, Supervisor
, QueueLike(..)
, Child_
, DeadLetter
, RestartAction
, SupervisionEvent(..)
, RestartStrategy(..)
, RestartResult(..)
, newSupervisor
, fibonacciRetryPolicy
, shutdownSupervisor
, eventStream
, activeChildren
, forkSupervised
, monitorWith
) where
import Control.Concurrent
import Control.Concurrent.STM
import Control.Exception
import Control.Monad
import Control.Monad.IO.Class
import Control.Retry
import qualified Data.HashMap.Strict as Map
import Data.IORef
import Data.Time
import Numeric.Natural
import System.Clock (Clock(Monotonic), TimeSpec, getTime)
type Mailbox = TChan DeadLetter
data SupervisionCtx q = SupervisionCtx {
SupervisionCtx q -> Mailbox
_sc_mailbox :: Mailbox
, SupervisionCtx q -> IORef (Maybe Mailbox)
_sc_parent_mailbox :: !(IORef (Maybe Mailbox))
, SupervisionCtx q -> IORef (HashMap ThreadId (Child_ q))
_sc_children :: !(IORef (Map.HashMap ThreadId (Child_ q)))
, SupervisionCtx q -> q SupervisionEvent
_sc_eventStream :: q SupervisionEvent
, SupervisionCtx q -> Natural
_sc_eventStreamSize :: !Natural
, SupervisionCtx q -> RestartStrategy
_sc_strategy :: !RestartStrategy
}
data Supervisor q = Supervisor {
Supervisor q -> ThreadId
_sp_myTid :: !ThreadId
, Supervisor q -> SupervisionCtx q
_sp_ctx :: !(SupervisionCtx q)
}
class QueueLike q where
newQueueIO :: Natural -> IO (q a)
readQueue :: q a -> STM a
writeQueue :: q a -> a -> STM ()
instance QueueLike TQueue where
newQueueIO :: Natural -> IO (TQueue a)
newQueueIO = IO (TQueue a) -> Natural -> IO (TQueue a)
forall a b. a -> b -> a
const IO (TQueue a)
forall a. IO (TQueue a)
newTQueueIO
readQueue :: TQueue a -> STM a
readQueue = TQueue a -> STM a
forall a. TQueue a -> STM a
readTQueue
writeQueue :: TQueue a -> a -> STM ()
writeQueue = TQueue a -> a -> STM ()
forall a. TQueue a -> a -> STM ()
writeTQueue
instance QueueLike TBQueue where
newQueueIO :: Natural -> IO (TBQueue a)
newQueueIO = Natural -> IO (TBQueue a)
forall a. Natural -> IO (TBQueue a)
newTBQueueIO
readQueue :: TBQueue a -> STM a
readQueue = TBQueue a -> STM a
forall a. TBQueue a -> STM a
readTBQueue
writeQueue :: TBQueue a -> a -> STM ()
writeQueue TBQueue a
q a
e = do
Bool
isFull <- TBQueue a -> STM Bool
forall a. TBQueue a -> STM Bool
isFullTBQueue TBQueue a
q
Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
isFull (STM () -> STM ()) -> STM () -> STM ()
forall a b. (a -> b) -> a -> b
$ TBQueue a -> a -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue a
q a
e
data DeadLetter = DeadLetter !LetterEpoch !ThreadId !SomeException
type Epoch = TimeSpec
newtype LetterEpoch = LetterEpoch Epoch deriving Int -> LetterEpoch -> ShowS
[LetterEpoch] -> ShowS
LetterEpoch -> String
(Int -> LetterEpoch -> ShowS)
-> (LetterEpoch -> String)
-> ([LetterEpoch] -> ShowS)
-> Show LetterEpoch
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [LetterEpoch] -> ShowS
$cshowList :: [LetterEpoch] -> ShowS
show :: LetterEpoch -> String
$cshow :: LetterEpoch -> String
showsPrec :: Int -> LetterEpoch -> ShowS
$cshowsPrec :: Int -> LetterEpoch -> ShowS
Show
newtype ChildEpoch = ChildEpoch Epoch deriving Int -> ChildEpoch -> ShowS
[ChildEpoch] -> ShowS
ChildEpoch -> String
(Int -> ChildEpoch -> ShowS)
-> (ChildEpoch -> String)
-> ([ChildEpoch] -> ShowS)
-> Show ChildEpoch
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [ChildEpoch] -> ShowS
$cshowList :: [ChildEpoch] -> ShowS
show :: ChildEpoch -> String
$cshow :: ChildEpoch -> String
showsPrec :: Int -> ChildEpoch -> ShowS
$cshowsPrec :: Int -> ChildEpoch -> ShowS
Show
data RestartResult =
Restarted !ThreadId !ThreadId !RetryStatus !UTCTime
| StaleDeadLetter !ThreadId !LetterEpoch !ChildEpoch !UTCTime
| RestartFailed SupervisionEvent
deriving Int -> RestartResult -> ShowS
[RestartResult] -> ShowS
RestartResult -> String
(Int -> RestartResult -> ShowS)
-> (RestartResult -> String)
-> ([RestartResult] -> ShowS)
-> Show RestartResult
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [RestartResult] -> ShowS
$cshowList :: [RestartResult] -> ShowS
show :: RestartResult -> String
$cshow :: RestartResult -> String
showsPrec :: Int -> RestartResult -> ShowS
$cshowsPrec :: Int -> RestartResult -> ShowS
Show
data Child_ q = Worker !ChildEpoch !RetryStatus (RetryPolicyM IO) RestartAction
| Supvsr !ChildEpoch !RetryStatus (RetryPolicyM IO) !(Supervisor q)
type RestartAction = ThreadId -> IO ThreadId
data SupervisionEvent =
ChildBorn !ThreadId !UTCTime
| ChildDied !ThreadId !SomeException !UTCTime
| ChildRestarted !ThreadId !ThreadId !RetryStatus !UTCTime
| ChildNotFound !ThreadId !UTCTime
| StaleDeadLetterReceived !ThreadId !LetterEpoch !ChildEpoch !UTCTime
| ChildRestartLimitReached !ThreadId !RetryStatus !UTCTime
| ChildFinished !ThreadId !UTCTime
deriving Int -> SupervisionEvent -> ShowS
[SupervisionEvent] -> ShowS
SupervisionEvent -> String
(Int -> SupervisionEvent -> ShowS)
-> (SupervisionEvent -> String)
-> ([SupervisionEvent] -> ShowS)
-> Show SupervisionEvent
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [SupervisionEvent] -> ShowS
$cshowList :: [SupervisionEvent] -> ShowS
show :: SupervisionEvent -> String
$cshow :: SupervisionEvent -> String
showsPrec :: Int -> SupervisionEvent -> ShowS
$cshowsPrec :: Int -> SupervisionEvent -> ShowS
Show
data RestartStrategy = OneForOne
deriving Int -> RestartStrategy -> ShowS
[RestartStrategy] -> ShowS
RestartStrategy -> String
(Int -> RestartStrategy -> ShowS)
-> (RestartStrategy -> String)
-> ([RestartStrategy] -> ShowS)
-> Show RestartStrategy
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [RestartStrategy] -> ShowS
$cshowList :: [RestartStrategy] -> ShowS
show :: RestartStrategy -> String
$cshow :: RestartStrategy -> String
showsPrec :: Int -> RestartStrategy -> ShowS
$cshowsPrec :: Int -> RestartStrategy -> ShowS
Show
fibonacciRetryPolicy :: RetryPolicyM IO
fibonacciRetryPolicy :: RetryPolicyM IO
fibonacciRetryPolicy = Int -> RetryPolicyM IO
forall (m :: * -> *). Monad m => Int -> RetryPolicyM m
fibonacciBackoff Int
100
getEpoch :: MonadIO m => m Epoch
getEpoch :: m Epoch
getEpoch = IO Epoch -> m Epoch
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Epoch -> m Epoch) -> IO Epoch -> m Epoch
forall a b. (a -> b) -> a -> b
$ Clock -> IO Epoch
getTime Clock
Monotonic
tryNotifyParent :: IORef (Maybe Mailbox) -> ThreadId -> SomeException -> IO ()
tryNotifyParent :: IORef (Maybe Mailbox) -> ThreadId -> SomeException -> IO ()
tryNotifyParent IORef (Maybe Mailbox)
mbPMbox ThreadId
myId SomeException
ex = do
IORef (Maybe Mailbox) -> IO (Maybe Mailbox)
forall a. IORef a -> IO a
readIORef IORef (Maybe Mailbox)
mbPMbox IO (Maybe Mailbox) -> (Maybe Mailbox -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \Maybe Mailbox
m -> case Maybe Mailbox
m of
Maybe Mailbox
Nothing -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Just Mailbox
m' -> do
Epoch
e <- IO Epoch
forall (m :: * -> *). MonadIO m => m Epoch
getEpoch
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ Mailbox -> DeadLetter -> STM ()
forall a. TChan a -> a -> STM ()
writeTChan Mailbox
m' (LetterEpoch -> ThreadId -> SomeException -> DeadLetter
DeadLetter (Epoch -> LetterEpoch
LetterEpoch Epoch
e) ThreadId
myId SomeException
ex)
newSupervisor :: QueueLike q
=> RestartStrategy
-> Natural
-> IO (Supervisor q)
newSupervisor :: RestartStrategy -> Natural -> IO (Supervisor q)
newSupervisor RestartStrategy
strategy Natural
size = do
IORef (Maybe Mailbox)
parentMbx <- Maybe Mailbox -> IO (IORef (Maybe Mailbox))
forall a. a -> IO (IORef a)
newIORef Maybe Mailbox
forall a. Maybe a
Nothing
Mailbox
mbx <- IO Mailbox
forall a. IO (TChan a)
newTChanIO
q SupervisionEvent
es <- Natural -> IO (q SupervisionEvent)
forall (q :: * -> *) a. QueueLike q => Natural -> IO (q a)
newQueueIO Natural
size
IORef (HashMap ThreadId (Child_ q))
cld <- HashMap ThreadId (Child_ q)
-> IO (IORef (HashMap ThreadId (Child_ q)))
forall a. a -> IO (IORef a)
newIORef HashMap ThreadId (Child_ q)
forall k v. HashMap k v
Map.empty
let ctx :: SupervisionCtx q
ctx = SupervisionCtx :: forall (q :: * -> *).
Mailbox
-> IORef (Maybe Mailbox)
-> IORef (HashMap ThreadId (Child_ q))
-> q SupervisionEvent
-> Natural
-> RestartStrategy
-> SupervisionCtx q
SupervisionCtx {
_sc_mailbox :: Mailbox
_sc_mailbox = Mailbox
mbx
, _sc_parent_mailbox :: IORef (Maybe Mailbox)
_sc_parent_mailbox = IORef (Maybe Mailbox)
parentMbx
, _sc_eventStream :: q SupervisionEvent
_sc_eventStream = q SupervisionEvent
es
, _sc_children :: IORef (HashMap ThreadId (Child_ q))
_sc_children = IORef (HashMap ThreadId (Child_ q))
cld
, _sc_strategy :: RestartStrategy
_sc_strategy = RestartStrategy
strategy
, _sc_eventStreamSize :: Natural
_sc_eventStreamSize = Natural
size
}
ThreadId
tid <- IO () -> (Either SomeException () -> IO ()) -> IO ThreadId
forall a. IO a -> (Either SomeException a -> IO ()) -> IO ThreadId
forkFinally (SupervisionCtx q -> IO ()
forall (q :: * -> *). QueueLike q => SupervisionCtx q -> IO ()
handleEvents SupervisionCtx q
ctx) ((Either SomeException () -> IO ()) -> IO ThreadId)
-> (Either SomeException () -> IO ()) -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ \Either SomeException ()
res -> case Either SomeException ()
res of
Left SomeException
ex -> do
IO ThreadId
-> (ThreadId -> IO ThreadId) -> (ThreadId -> IO ()) -> IO ()
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket IO ThreadId
myThreadId ThreadId -> IO ThreadId
forall (m :: * -> *) a. Monad m => a -> m a
return ((ThreadId -> IO ()) -> IO ()) -> (ThreadId -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \ThreadId
myId -> do
IORef (Maybe Mailbox) -> ThreadId -> SomeException -> IO ()
tryNotifyParent IORef (Maybe Mailbox)
parentMbx ThreadId
myId SomeException
ex
Right ()
v -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
v
SupervisionCtx q -> ThreadId -> IO (Supervisor q)
forall (m :: * -> *) (q :: * -> *).
Monad m =>
SupervisionCtx q -> ThreadId -> m (Supervisor q)
go SupervisionCtx q
ctx ThreadId
tid
where
go :: SupervisionCtx q -> ThreadId -> m (Supervisor q)
go SupervisionCtx q
ctx ThreadId
tid = do
Supervisor q -> m (Supervisor q)
forall (m :: * -> *) a. Monad m => a -> m a
return Supervisor :: forall (q :: * -> *). ThreadId -> SupervisionCtx q -> Supervisor q
Supervisor {
_sp_myTid :: ThreadId
_sp_myTid = ThreadId
tid
, _sp_ctx :: SupervisionCtx q
_sp_ctx = SupervisionCtx q
ctx
}
eventStream :: QueueLike q => Supervisor q -> q SupervisionEvent
eventStream :: Supervisor q -> q SupervisionEvent
eventStream Supervisor{SupervisionCtx q
_sp_ctx :: SupervisionCtx q
_sp_ctx :: forall (q :: * -> *). Supervisor q -> SupervisionCtx q
_sp_ctx} = SupervisionCtx q -> q SupervisionEvent
forall (q :: * -> *). SupervisionCtx q -> q SupervisionEvent
_sc_eventStream SupervisionCtx q
_sp_ctx
activeChildren :: QueueLike q => Supervisor q -> IO Int
activeChildren :: Supervisor q -> IO Int
activeChildren Supervisor{SupervisionCtx q
_sp_ctx :: SupervisionCtx q
_sp_ctx :: forall (q :: * -> *). Supervisor q -> SupervisionCtx q
_sp_ctx} = do
IORef (HashMap ThreadId (Child_ q))
-> IO (HashMap ThreadId (Child_ q))
forall a. IORef a -> IO a
readIORef (SupervisionCtx q -> IORef (HashMap ThreadId (Child_ q))
forall (q :: * -> *).
SupervisionCtx q -> IORef (HashMap ThreadId (Child_ q))
_sc_children SupervisionCtx q
_sp_ctx) IO (HashMap ThreadId (Child_ q))
-> (HashMap ThreadId (Child_ q) -> IO Int) -> IO Int
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Int -> IO Int
forall (m :: * -> *) a. Monad m => a -> m a
return (Int -> IO Int)
-> (HashMap ThreadId (Child_ q) -> Int)
-> HashMap ThreadId (Child_ q)
-> IO Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [ThreadId] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length ([ThreadId] -> Int)
-> (HashMap ThreadId (Child_ q) -> [ThreadId])
-> HashMap ThreadId (Child_ q)
-> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. HashMap ThreadId (Child_ q) -> [ThreadId]
forall k v. HashMap k v -> [k]
Map.keys
shutdownSupervisor :: QueueLike q => Supervisor q -> IO ()
shutdownSupervisor :: Supervisor q -> IO ()
shutdownSupervisor (Supervisor ThreadId
tid SupervisionCtx q
ctx) = do
HashMap ThreadId (Child_ q)
chMap <- IORef (HashMap ThreadId (Child_ q))
-> IO (HashMap ThreadId (Child_ q))
forall a. IORef a -> IO a
readIORef (SupervisionCtx q -> IORef (HashMap ThreadId (Child_ q))
forall (q :: * -> *).
SupervisionCtx q -> IORef (HashMap ThreadId (Child_ q))
_sc_children SupervisionCtx q
ctx)
[(ThreadId, Child_ q)] -> IO ()
forall (q :: * -> *).
QueueLike q =>
[(ThreadId, Child_ q)] -> IO ()
processChildren (HashMap ThreadId (Child_ q) -> [(ThreadId, Child_ q)]
forall k v. HashMap k v -> [(k, v)]
Map.toList HashMap ThreadId (Child_ q)
chMap)
ThreadId -> IO ()
killThread ThreadId
tid
where
processChildren :: [(ThreadId, Child_ q)] -> IO ()
processChildren [] = () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
processChildren ((ThreadId, Child_ q)
x:[(ThreadId, Child_ q)]
xs) = do
case (ThreadId, Child_ q)
x of
(ThreadId
workerTid, Worker{}) -> ThreadId -> IO ()
killThread ThreadId
workerTid
(ThreadId
_, Supvsr ChildEpoch
_ RetryStatus
_ RetryPolicyM IO
_ Supervisor q
s) -> Supervisor q -> IO ()
forall (q :: * -> *). QueueLike q => Supervisor q -> IO ()
shutdownSupervisor Supervisor q
s
[(ThreadId, Child_ q)] -> IO ()
processChildren [(ThreadId, Child_ q)]
xs
forkSupervised :: QueueLike q
=> Supervisor q
-> RetryPolicyM IO
-> IO ()
-> IO ThreadId
forkSupervised :: Supervisor q -> RetryPolicyM IO -> IO () -> IO ThreadId
forkSupervised sup :: Supervisor q
sup@Supervisor{ThreadId
SupervisionCtx q
_sp_ctx :: SupervisionCtx q
_sp_myTid :: ThreadId
_sp_ctx :: forall (q :: * -> *). Supervisor q -> SupervisionCtx q
_sp_myTid :: forall (q :: * -> *). Supervisor q -> ThreadId
..} RetryPolicyM IO
policy IO ()
act =
IO ThreadId
-> (ThreadId -> IO ThreadId)
-> (ThreadId -> IO ThreadId)
-> IO ThreadId
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket (Supervisor q -> IO () -> IO ThreadId
forall (q :: * -> *).
QueueLike q =>
Supervisor q -> IO () -> IO ThreadId
supervised Supervisor q
sup IO ()
act) ThreadId -> IO ThreadId
forall (m :: * -> *) a. Monad m => a -> m a
return ((ThreadId -> IO ThreadId) -> IO ThreadId)
-> (ThreadId -> IO ThreadId) -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ \ThreadId
newChild -> do
Epoch
e <- IO Epoch
forall (m :: * -> *). MonadIO m => m Epoch
getEpoch
let ch :: Child_ q
ch = ChildEpoch
-> RetryStatus
-> RetryPolicyM IO
-> (ThreadId -> IO ThreadId)
-> Child_ q
forall (q :: * -> *).
ChildEpoch
-> RetryStatus
-> RetryPolicyM IO
-> (ThreadId -> IO ThreadId)
-> Child_ q
Worker (Epoch -> ChildEpoch
ChildEpoch Epoch
e) RetryStatus
defaultRetryStatus RetryPolicyM IO
policy (IO ThreadId -> ThreadId -> IO ThreadId
forall a b. a -> b -> a
const (Supervisor q -> IO () -> IO ThreadId
forall (q :: * -> *).
QueueLike q =>
Supervisor q -> IO () -> IO ThreadId
supervised Supervisor q
sup IO ()
act))
IORef (HashMap ThreadId (Child_ q))
-> (HashMap ThreadId (Child_ q)
-> (HashMap ThreadId (Child_ q), ()))
-> IO ()
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' (SupervisionCtx q -> IORef (HashMap ThreadId (Child_ q))
forall (q :: * -> *).
SupervisionCtx q -> IORef (HashMap ThreadId (Child_ q))
_sc_children SupervisionCtx q
_sp_ctx) ((HashMap ThreadId (Child_ q) -> (HashMap ThreadId (Child_ q), ()))
-> IO ())
-> (HashMap ThreadId (Child_ q)
-> (HashMap ThreadId (Child_ q), ()))
-> IO ()
forall a b. (a -> b) -> a -> b
$ \HashMap ThreadId (Child_ q)
chMap -> (ThreadId
-> Child_ q
-> HashMap ThreadId (Child_ q)
-> HashMap ThreadId (Child_ q)
forall k v.
(Eq k, Hashable k) =>
k -> v -> HashMap k v -> HashMap k v
Map.insert ThreadId
newChild Child_ q
ch HashMap ThreadId (Child_ q)
chMap, ())
UTCTime
now <- IO UTCTime
getCurrentTime
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ q SupervisionEvent -> SupervisionEvent -> STM ()
forall (q :: * -> *) a. QueueLike q => q a -> a -> STM ()
writeQueue (SupervisionCtx q -> q SupervisionEvent
forall (q :: * -> *). SupervisionCtx q -> q SupervisionEvent
_sc_eventStream SupervisionCtx q
_sp_ctx) (ThreadId -> UTCTime -> SupervisionEvent
ChildBorn ThreadId
newChild UTCTime
now)
ThreadId -> IO ThreadId
forall (m :: * -> *) a. Monad m => a -> m a
return ThreadId
newChild
supervised :: QueueLike q => Supervisor q -> IO () -> IO ThreadId
supervised :: Supervisor q -> IO () -> IO ThreadId
supervised Supervisor{ThreadId
SupervisionCtx q
_sp_ctx :: SupervisionCtx q
_sp_myTid :: ThreadId
_sp_ctx :: forall (q :: * -> *). Supervisor q -> SupervisionCtx q
_sp_myTid :: forall (q :: * -> *). Supervisor q -> ThreadId
..} IO ()
act = IO () -> (Either SomeException () -> IO ()) -> IO ThreadId
forall a. IO a -> (Either SomeException a -> IO ()) -> IO ThreadId
forkFinally IO ()
act ((Either SomeException () -> IO ()) -> IO ThreadId)
-> (Either SomeException () -> IO ()) -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ \Either SomeException ()
res -> case Either SomeException ()
res of
Left SomeException
ex -> IO ThreadId
-> (ThreadId -> IO ThreadId) -> (ThreadId -> IO ()) -> IO ()
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket IO ThreadId
myThreadId ThreadId -> IO ThreadId
forall (m :: * -> *) a. Monad m => a -> m a
return ((ThreadId -> IO ()) -> IO ()) -> (ThreadId -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \ThreadId
myId -> do
Epoch
e <- IO Epoch
forall (m :: * -> *). MonadIO m => m Epoch
getEpoch
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ Mailbox -> DeadLetter -> STM ()
forall a. TChan a -> a -> STM ()
writeTChan (SupervisionCtx q -> Mailbox
forall (q :: * -> *). SupervisionCtx q -> Mailbox
_sc_mailbox SupervisionCtx q
_sp_ctx) (LetterEpoch -> ThreadId -> SomeException -> DeadLetter
DeadLetter (Epoch -> LetterEpoch
LetterEpoch Epoch
e) ThreadId
myId SomeException
ex)
Right ()
_ -> IO ThreadId
-> (ThreadId -> IO ThreadId) -> (ThreadId -> IO ()) -> IO ()
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket IO ThreadId
myThreadId ThreadId -> IO ThreadId
forall (m :: * -> *) a. Monad m => a -> m a
return ((ThreadId -> IO ()) -> IO ()) -> (ThreadId -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \ThreadId
myId -> do
UTCTime
now <- IO UTCTime
getCurrentTime
IORef (HashMap ThreadId (Child_ q))
-> (HashMap ThreadId (Child_ q)
-> (HashMap ThreadId (Child_ q), ()))
-> IO ()
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' (SupervisionCtx q -> IORef (HashMap ThreadId (Child_ q))
forall (q :: * -> *).
SupervisionCtx q -> IORef (HashMap ThreadId (Child_ q))
_sc_children SupervisionCtx q
_sp_ctx) ((HashMap ThreadId (Child_ q) -> (HashMap ThreadId (Child_ q), ()))
-> IO ())
-> (HashMap ThreadId (Child_ q)
-> (HashMap ThreadId (Child_ q), ()))
-> IO ()
forall a b. (a -> b) -> a -> b
$ \HashMap ThreadId (Child_ q)
chMap -> (ThreadId
-> HashMap ThreadId (Child_ q) -> HashMap ThreadId (Child_ q)
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> HashMap k v
Map.delete ThreadId
myId HashMap ThreadId (Child_ q)
chMap, ())
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ q SupervisionEvent -> SupervisionEvent -> STM ()
forall (q :: * -> *) a. QueueLike q => q a -> a -> STM ()
writeQueue (SupervisionCtx q -> q SupervisionEvent
forall (q :: * -> *). SupervisionCtx q -> q SupervisionEvent
_sc_eventStream SupervisionCtx q
_sp_ctx) (ThreadId -> UTCTime -> SupervisionEvent
ChildFinished ThreadId
myId UTCTime
now)
ignoringStaleLetters :: ThreadId
-> LetterEpoch
-> ChildEpoch
-> IO RestartResult
-> IO RestartResult
ignoringStaleLetters :: ThreadId
-> LetterEpoch
-> ChildEpoch
-> IO RestartResult
-> IO RestartResult
ignoringStaleLetters ThreadId
tid deadLetterEpoch :: LetterEpoch
deadLetterEpoch@(LetterEpoch Epoch
l) childEpoch :: ChildEpoch
childEpoch@(ChildEpoch Epoch
c) IO RestartResult
act = do
UTCTime
now <- IO UTCTime
getCurrentTime
if Epoch
l Epoch -> Epoch -> Bool
forall a. Ord a => a -> a -> Bool
< Epoch
c then RestartResult -> IO RestartResult
forall (m :: * -> *) a. Monad m => a -> m a
return (ThreadId -> LetterEpoch -> ChildEpoch -> UTCTime -> RestartResult
StaleDeadLetter ThreadId
tid LetterEpoch
deadLetterEpoch ChildEpoch
childEpoch UTCTime
now) else IO RestartResult
act
restartChild :: QueueLike q
=> SupervisionCtx q
-> LetterEpoch
-> UTCTime
-> ThreadId
-> IO RestartResult
restartChild :: SupervisionCtx q
-> LetterEpoch -> UTCTime -> ThreadId -> IO RestartResult
restartChild SupervisionCtx q
ctx LetterEpoch
deadLetterEpoch UTCTime
now ThreadId
newDeath = do
HashMap ThreadId (Child_ q)
chMap <- IORef (HashMap ThreadId (Child_ q))
-> IO (HashMap ThreadId (Child_ q))
forall a. IORef a -> IO a
readIORef (SupervisionCtx q -> IORef (HashMap ThreadId (Child_ q))
forall (q :: * -> *).
SupervisionCtx q -> IORef (HashMap ThreadId (Child_ q))
_sc_children SupervisionCtx q
ctx)
case ThreadId -> HashMap ThreadId (Child_ q) -> Maybe (Child_ q)
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> Maybe v
Map.lookup ThreadId
newDeath HashMap ThreadId (Child_ q)
chMap of
Maybe (Child_ q)
Nothing -> RestartResult -> IO RestartResult
forall (m :: * -> *) a. Monad m => a -> m a
return (RestartResult -> IO RestartResult)
-> RestartResult -> IO RestartResult
forall a b. (a -> b) -> a -> b
$ SupervisionEvent -> RestartResult
RestartFailed (ThreadId -> UTCTime -> SupervisionEvent
ChildNotFound ThreadId
newDeath UTCTime
now)
Just (Worker ChildEpoch
workerEpoch RetryStatus
rState RetryPolicyM IO
rPolicy ThreadId -> IO ThreadId
act) -> ThreadId
-> LetterEpoch
-> ChildEpoch
-> IO RestartResult
-> IO RestartResult
ignoringStaleLetters ThreadId
newDeath LetterEpoch
deadLetterEpoch ChildEpoch
workerEpoch (IO RestartResult -> IO RestartResult)
-> IO RestartResult -> IO RestartResult
forall a b. (a -> b) -> a -> b
$ do
RetryStatus
-> RetryPolicyM IO
-> (RetryStatus -> IO RestartResult)
-> (RetryStatus -> IO RestartResult)
-> IO RestartResult
runRetryPolicy RetryStatus
rState RetryPolicyM IO
rPolicy RetryStatus -> IO RestartResult
emitEventChildRestartLimitReached ((RetryStatus -> IO RestartResult) -> IO RestartResult)
-> (RetryStatus -> IO RestartResult) -> IO RestartResult
forall a b. (a -> b) -> a -> b
$ \RetryStatus
newRState -> do
Epoch
e <- IO Epoch
forall (m :: * -> *). MonadIO m => m Epoch
getEpoch
let ch :: Child_ q
ch = ChildEpoch
-> RetryStatus
-> RetryPolicyM IO
-> (ThreadId -> IO ThreadId)
-> Child_ q
forall (q :: * -> *).
ChildEpoch
-> RetryStatus
-> RetryPolicyM IO
-> (ThreadId -> IO ThreadId)
-> Child_ q
Worker (Epoch -> ChildEpoch
ChildEpoch Epoch
e) RetryStatus
newRState RetryPolicyM IO
rPolicy ThreadId -> IO ThreadId
act
ThreadId
newThreadId <- ThreadId -> IO ThreadId
act ThreadId
newDeath
IORef (HashMap ThreadId (Child_ q))
-> HashMap ThreadId (Child_ q) -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (SupervisionCtx q -> IORef (HashMap ThreadId (Child_ q))
forall (q :: * -> *).
SupervisionCtx q -> IORef (HashMap ThreadId (Child_ q))
_sc_children SupervisionCtx q
ctx) (ThreadId
-> Child_ q
-> HashMap ThreadId (Child_ q)
-> HashMap ThreadId (Child_ q)
forall k v.
(Eq k, Hashable k) =>
k -> v -> HashMap k v -> HashMap k v
Map.insert ThreadId
newThreadId Child_ q
ch (HashMap ThreadId (Child_ q) -> HashMap ThreadId (Child_ q))
-> HashMap ThreadId (Child_ q) -> HashMap ThreadId (Child_ q)
forall a b. (a -> b) -> a -> b
$! ThreadId
-> HashMap ThreadId (Child_ q) -> HashMap ThreadId (Child_ q)
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> HashMap k v
Map.delete ThreadId
newDeath HashMap ThreadId (Child_ q)
chMap)
ThreadId -> RetryStatus -> IO RestartResult
emitEventChildRestarted ThreadId
newThreadId RetryStatus
newRState
Just (Supvsr ChildEpoch
supervisorEpoch RetryStatus
rState RetryPolicyM IO
rPolicy (Supervisor ThreadId
deathSup SupervisionCtx q
ctx')) -> do
ThreadId
-> LetterEpoch
-> ChildEpoch
-> IO RestartResult
-> IO RestartResult
ignoringStaleLetters ThreadId
newDeath LetterEpoch
deadLetterEpoch ChildEpoch
supervisorEpoch (IO RestartResult -> IO RestartResult)
-> IO RestartResult -> IO RestartResult
forall a b. (a -> b) -> a -> b
$ do
RetryStatus
-> RetryPolicyM IO
-> (RetryStatus -> IO RestartResult)
-> (RetryStatus -> IO RestartResult)
-> IO RestartResult
runRetryPolicy RetryStatus
rState RetryPolicyM IO
rPolicy RetryStatus -> IO RestartResult
emitEventChildRestartLimitReached ((RetryStatus -> IO RestartResult) -> IO RestartResult)
-> (RetryStatus -> IO RestartResult) -> IO RestartResult
forall a b. (a -> b) -> a -> b
$ \RetryStatus
newRState -> do
Epoch
e <- IO Epoch
forall (m :: * -> *). MonadIO m => m Epoch
getEpoch
Supervisor q
restartedSup <- RestartStrategy -> Natural -> IO (Supervisor q)
forall (q :: * -> *).
QueueLike q =>
RestartStrategy -> Natural -> IO (Supervisor q)
newSupervisor (SupervisionCtx q -> RestartStrategy
forall (q :: * -> *). SupervisionCtx q -> RestartStrategy
_sc_strategy SupervisionCtx q
ctx) (SupervisionCtx q -> Natural
forall (q :: * -> *). SupervisionCtx q -> Natural
_sc_eventStreamSize SupervisionCtx q
ctx')
let ch :: Child_ q
ch = ChildEpoch
-> RetryStatus -> RetryPolicyM IO -> Supervisor q -> Child_ q
forall (q :: * -> *).
ChildEpoch
-> RetryStatus -> RetryPolicyM IO -> Supervisor q -> Child_ q
Supvsr (Epoch -> ChildEpoch
ChildEpoch Epoch
e) RetryStatus
newRState RetryPolicyM IO
rPolicy Supervisor q
restartedSup
let newThreadId :: ThreadId
newThreadId = Supervisor q -> ThreadId
forall (q :: * -> *). Supervisor q -> ThreadId
_sp_myTid Supervisor q
restartedSup
IORef (HashMap ThreadId (Child_ q))
-> HashMap ThreadId (Child_ q) -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (SupervisionCtx q -> IORef (HashMap ThreadId (Child_ q))
forall (q :: * -> *).
SupervisionCtx q -> IORef (HashMap ThreadId (Child_ q))
_sc_children SupervisionCtx q
ctx) (ThreadId
-> Child_ q
-> HashMap ThreadId (Child_ q)
-> HashMap ThreadId (Child_ q)
forall k v.
(Eq k, Hashable k) =>
k -> v -> HashMap k v -> HashMap k v
Map.insert ThreadId
newThreadId Child_ q
ch (HashMap ThreadId (Child_ q) -> HashMap ThreadId (Child_ q))
-> HashMap ThreadId (Child_ q) -> HashMap ThreadId (Child_ q)
forall a b. (a -> b) -> a -> b
$! ThreadId
-> HashMap ThreadId (Child_ q) -> HashMap ThreadId (Child_ q)
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> HashMap k v
Map.delete ThreadId
deathSup HashMap ThreadId (Child_ q)
chMap)
ThreadId -> RetryStatus -> IO RestartResult
emitEventChildRestarted ThreadId
newThreadId RetryStatus
newRState
where
emitEventChildRestarted :: ThreadId -> RetryStatus -> IO RestartResult
emitEventChildRestarted ThreadId
newThreadId RetryStatus
newRState = do
RestartResult -> IO RestartResult
forall (m :: * -> *) a. Monad m => a -> m a
return (RestartResult -> IO RestartResult)
-> RestartResult -> IO RestartResult
forall a b. (a -> b) -> a -> b
$ ThreadId -> ThreadId -> RetryStatus -> UTCTime -> RestartResult
Restarted ThreadId
newDeath ThreadId
newThreadId RetryStatus
newRState UTCTime
now
emitEventChildRestartLimitReached :: RetryStatus -> IO RestartResult
emitEventChildRestartLimitReached RetryStatus
newRState = do
RestartResult -> IO RestartResult
forall (m :: * -> *) a. Monad m => a -> m a
return (RestartResult -> IO RestartResult)
-> RestartResult -> IO RestartResult
forall a b. (a -> b) -> a -> b
$ SupervisionEvent -> RestartResult
RestartFailed (ThreadId -> RetryStatus -> UTCTime -> SupervisionEvent
ChildRestartLimitReached ThreadId
newDeath RetryStatus
newRState UTCTime
now)
runRetryPolicy :: RetryStatus
-> RetryPolicyM IO
-> (RetryStatus -> IO RestartResult)
-> (RetryStatus -> IO RestartResult)
-> IO RestartResult
runRetryPolicy :: RetryStatus
-> RetryPolicyM IO
-> (RetryStatus -> IO RestartResult)
-> (RetryStatus -> IO RestartResult)
-> IO RestartResult
runRetryPolicy RetryStatus
rState RetryPolicyM IO
rPolicy RetryStatus -> IO RestartResult
ifAbort RetryStatus -> IO RestartResult
ifThrottle = do
Maybe Int
maybeDelay <- RetryPolicyM IO -> RetryStatus -> IO (Maybe Int)
forall (m :: * -> *).
RetryPolicyM m -> RetryStatus -> m (Maybe Int)
getRetryPolicyM RetryPolicyM IO
rPolicy RetryStatus
rState
case Maybe Int
maybeDelay of
Maybe Int
Nothing -> RetryStatus -> IO RestartResult
ifAbort RetryStatus
rState
Just Int
delay ->
let newRState :: RetryStatus
newRState = RetryStatus
rState { rsIterNumber :: Int
rsIterNumber = RetryStatus -> Int
rsIterNumber RetryStatus
rState Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1
, rsCumulativeDelay :: Int
rsCumulativeDelay = RetryStatus -> Int
rsCumulativeDelay RetryStatus
rState Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
delay
, rsPreviousDelay :: Maybe Int
rsPreviousDelay = Int -> Maybe Int
forall a. a -> Maybe a
Just (Int -> (Int -> Int) -> Maybe Int -> Int
forall b a. b -> (a -> b) -> Maybe a -> b
maybe Int
0 (Int -> Int -> Int
forall a b. a -> b -> a
const Int
delay) (RetryStatus -> Maybe Int
rsPreviousDelay RetryStatus
rState))
}
in Int -> IO ()
threadDelay Int
delay IO () -> IO RestartResult -> IO RestartResult
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> RetryStatus -> IO RestartResult
ifThrottle RetryStatus
newRState
restartOneForOne :: QueueLike q
=> SupervisionCtx q
-> LetterEpoch
-> UTCTime
-> ThreadId
-> IO RestartResult
restartOneForOne :: SupervisionCtx q
-> LetterEpoch -> UTCTime -> ThreadId -> IO RestartResult
restartOneForOne = SupervisionCtx q
-> LetterEpoch -> UTCTime -> ThreadId -> IO RestartResult
forall (q :: * -> *).
QueueLike q =>
SupervisionCtx q
-> LetterEpoch -> UTCTime -> ThreadId -> IO RestartResult
restartChild
handleEvents :: QueueLike q => SupervisionCtx q -> IO ()
handleEvents :: SupervisionCtx q -> IO ()
handleEvents ctx :: SupervisionCtx q
ctx@SupervisionCtx{q SupervisionEvent
Natural
IORef (Maybe Mailbox)
IORef (HashMap ThreadId (Child_ q))
Mailbox
RestartStrategy
_sc_strategy :: RestartStrategy
_sc_eventStreamSize :: Natural
_sc_eventStream :: q SupervisionEvent
_sc_children :: IORef (HashMap ThreadId (Child_ q))
_sc_parent_mailbox :: IORef (Maybe Mailbox)
_sc_mailbox :: Mailbox
_sc_strategy :: forall (q :: * -> *). SupervisionCtx q -> RestartStrategy
_sc_eventStreamSize :: forall (q :: * -> *). SupervisionCtx q -> Natural
_sc_eventStream :: forall (q :: * -> *). SupervisionCtx q -> q SupervisionEvent
_sc_children :: forall (q :: * -> *).
SupervisionCtx q -> IORef (HashMap ThreadId (Child_ q))
_sc_parent_mailbox :: forall (q :: * -> *). SupervisionCtx q -> IORef (Maybe Mailbox)
_sc_mailbox :: forall (q :: * -> *). SupervisionCtx q -> Mailbox
..} = do
(DeadLetter LetterEpoch
epoch ThreadId
newDeath SomeException
ex) <- STM DeadLetter -> IO DeadLetter
forall a. STM a -> IO a
atomically (STM DeadLetter -> IO DeadLetter)
-> STM DeadLetter -> IO DeadLetter
forall a b. (a -> b) -> a -> b
$ Mailbox -> STM DeadLetter
forall a. TChan a -> STM a
readTChan Mailbox
_sc_mailbox
UTCTime
now <- IO UTCTime
getCurrentTime
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ q SupervisionEvent -> SupervisionEvent -> STM ()
forall (q :: * -> *) a. QueueLike q => q a -> a -> STM ()
writeQueue q SupervisionEvent
_sc_eventStream (ThreadId -> SomeException -> UTCTime -> SupervisionEvent
ChildDied ThreadId
newDeath SomeException
ex UTCTime
now)
case SomeException -> Maybe AsyncException
forall e. Exception e => SomeException -> Maybe e
asyncExceptionFromException SomeException
ex of
Just (AsyncException
_ :: AsyncException) -> do
IORef (HashMap ThreadId (Child_ q))
-> (HashMap ThreadId (Child_ q)
-> (HashMap ThreadId (Child_ q), ()))
-> IO ()
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef (HashMap ThreadId (Child_ q))
_sc_children ((HashMap ThreadId (Child_ q) -> (HashMap ThreadId (Child_ q), ()))
-> IO ())
-> (HashMap ThreadId (Child_ q)
-> (HashMap ThreadId (Child_ q), ()))
-> IO ()
forall a b. (a -> b) -> a -> b
$ \HashMap ThreadId (Child_ q)
chMap -> (ThreadId
-> HashMap ThreadId (Child_ q) -> HashMap ThreadId (Child_ q)
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> HashMap k v
Map.delete ThreadId
newDeath HashMap ThreadId (Child_ q)
chMap, ())
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ q SupervisionEvent -> SupervisionEvent -> STM ()
forall (q :: * -> *) a. QueueLike q => q a -> a -> STM ()
writeQueue q SupervisionEvent
_sc_eventStream (ThreadId -> SomeException -> UTCTime -> SupervisionEvent
ChildDied ThreadId
newDeath SomeException
ex UTCTime
now)
SupervisionCtx q -> IO ()
forall (q :: * -> *). QueueLike q => SupervisionCtx q -> IO ()
handleEvents SupervisionCtx q
ctx
Maybe AsyncException
Nothing -> do
RestartResult
restartResult <- case RestartStrategy
_sc_strategy of
RestartStrategy
OneForOne -> SupervisionCtx q
-> LetterEpoch -> UTCTime -> ThreadId -> IO RestartResult
forall (q :: * -> *).
QueueLike q =>
SupervisionCtx q
-> LetterEpoch -> UTCTime -> ThreadId -> IO RestartResult
restartOneForOne SupervisionCtx q
ctx LetterEpoch
epoch UTCTime
now ThreadId
newDeath
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ case RestartResult
restartResult of
StaleDeadLetter ThreadId
tid LetterEpoch
le ChildEpoch
we UTCTime
tm -> do
q SupervisionEvent -> SupervisionEvent -> STM ()
forall (q :: * -> *) a. QueueLike q => q a -> a -> STM ()
writeQueue q SupervisionEvent
_sc_eventStream (ThreadId
-> LetterEpoch -> ChildEpoch -> UTCTime -> SupervisionEvent
StaleDeadLetterReceived ThreadId
tid LetterEpoch
le ChildEpoch
we UTCTime
tm)
RestartFailed SupervisionEvent
reason -> do
q SupervisionEvent -> SupervisionEvent -> STM ()
forall (q :: * -> *) a. QueueLike q => q a -> a -> STM ()
writeQueue q SupervisionEvent
_sc_eventStream SupervisionEvent
reason
Restarted ThreadId
oldId ThreadId
newId RetryStatus
rStatus UTCTime
tm ->
q SupervisionEvent -> SupervisionEvent -> STM ()
forall (q :: * -> *) a. QueueLike q => q a -> a -> STM ()
writeQueue q SupervisionEvent
_sc_eventStream (ThreadId -> ThreadId -> RetryStatus -> UTCTime -> SupervisionEvent
ChildRestarted ThreadId
oldId ThreadId
newId RetryStatus
rStatus UTCTime
tm)
SupervisionCtx q -> IO ()
forall (q :: * -> *). QueueLike q => SupervisionCtx q -> IO ()
handleEvents SupervisionCtx q
ctx
monitorWith :: QueueLike q
=> RetryPolicyM IO
-> Supervisor q
-> Supervisor q
-> IO ThreadId
monitorWith :: RetryPolicyM IO -> Supervisor q -> Supervisor q -> IO ThreadId
monitorWith RetryPolicyM IO
policy Supervisor q
sup1 Supervisor q
sup2 = do
let sup1Children :: IORef (HashMap ThreadId (Child_ q))
sup1Children = SupervisionCtx q -> IORef (HashMap ThreadId (Child_ q))
forall (q :: * -> *).
SupervisionCtx q -> IORef (HashMap ThreadId (Child_ q))
_sc_children (Supervisor q -> SupervisionCtx q
forall (q :: * -> *). Supervisor q -> SupervisionCtx q
_sp_ctx Supervisor q
sup1)
let sup1Mailbox :: Mailbox
sup1Mailbox = SupervisionCtx q -> Mailbox
forall (q :: * -> *). SupervisionCtx q -> Mailbox
_sc_mailbox (Supervisor q -> SupervisionCtx q
forall (q :: * -> *). Supervisor q -> SupervisionCtx q
_sp_ctx Supervisor q
sup1)
let sup2Id :: ThreadId
sup2Id = Supervisor q -> ThreadId
forall (q :: * -> *). Supervisor q -> ThreadId
_sp_myTid Supervisor q
sup2
let sup2ParentMailbox :: IORef (Maybe Mailbox)
sup2ParentMailbox = SupervisionCtx q -> IORef (Maybe Mailbox)
forall (q :: * -> *). SupervisionCtx q -> IORef (Maybe Mailbox)
_sc_parent_mailbox (Supervisor q -> SupervisionCtx q
forall (q :: * -> *). Supervisor q -> SupervisionCtx q
_sp_ctx Supervisor q
sup2)
IORef (Maybe Mailbox) -> IO (Maybe Mailbox)
forall a. IORef a -> IO a
readIORef IORef (Maybe Mailbox)
sup2ParentMailbox IO (Maybe Mailbox) -> (Maybe Mailbox -> IO ThreadId) -> IO ThreadId
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \Maybe Mailbox
mbox -> case Maybe Mailbox
mbox of
Just Mailbox
_ -> ThreadId -> IO ThreadId
forall (m :: * -> *) a. Monad m => a -> m a
return ThreadId
sup2Id
Maybe Mailbox
Nothing -> do
Epoch
e <- IO Epoch
forall (m :: * -> *). MonadIO m => m Epoch
getEpoch
let sup2RetryStatus :: RetryStatus
sup2RetryStatus = RetryStatus
defaultRetryStatus
let ch' :: Child_ q
ch' = ChildEpoch
-> RetryStatus -> RetryPolicyM IO -> Supervisor q -> Child_ q
forall (q :: * -> *).
ChildEpoch
-> RetryStatus -> RetryPolicyM IO -> Supervisor q -> Child_ q
Supvsr (Epoch -> ChildEpoch
ChildEpoch Epoch
e) RetryStatus
sup2RetryStatus RetryPolicyM IO
policy Supervisor q
sup2
IORef (HashMap ThreadId (Child_ q))
-> (HashMap ThreadId (Child_ q)
-> (HashMap ThreadId (Child_ q), ()))
-> IO ()
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef (HashMap ThreadId (Child_ q))
sup1Children ((HashMap ThreadId (Child_ q) -> (HashMap ThreadId (Child_ q), ()))
-> IO ())
-> (HashMap ThreadId (Child_ q)
-> (HashMap ThreadId (Child_ q), ()))
-> IO ()
forall a b. (a -> b) -> a -> b
$ \HashMap ThreadId (Child_ q)
chMap -> (ThreadId
-> Child_ q
-> HashMap ThreadId (Child_ q)
-> HashMap ThreadId (Child_ q)
forall k v.
(Eq k, Hashable k) =>
k -> v -> HashMap k v -> HashMap k v
Map.insert ThreadId
sup2Id Child_ q
ch' HashMap ThreadId (Child_ q)
chMap, ())
Mailbox
duped <- STM Mailbox -> IO Mailbox
forall a. STM a -> IO a
atomically (STM Mailbox -> IO Mailbox) -> STM Mailbox -> IO Mailbox
forall a b. (a -> b) -> a -> b
$ Mailbox -> STM Mailbox
forall a. TChan a -> STM (TChan a)
dupTChan Mailbox
sup1Mailbox
IORef (Maybe Mailbox)
-> (Maybe Mailbox -> (Maybe Mailbox, ())) -> IO ()
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef (Maybe Mailbox)
sup2ParentMailbox ((Maybe Mailbox -> (Maybe Mailbox, ())) -> IO ())
-> (Maybe Mailbox -> (Maybe Mailbox, ())) -> IO ()
forall a b. (a -> b) -> a -> b
$ (Maybe Mailbox, ()) -> Maybe Mailbox -> (Maybe Mailbox, ())
forall a b. a -> b -> a
const (Mailbox -> Maybe Mailbox
forall a. a -> Maybe a
Just Mailbox
duped, ())
ThreadId -> IO ThreadId
forall (m :: * -> *) a. Monad m => a -> m a
return ThreadId
sup2Id