module Control.Concurrent.Supervisor
( SupervisorSpec
, Supervisor
, DeadLetter
, RestartAction
, SupervisionEvent(..)
, RestartStrategy(..)
, newSupervisorSpec
, newSupervisor
, oneForOne
, shutdownSupervisor
, eventStream
, activeChildren
, forkSupervised
, monitor
) where
import qualified Data.HashMap.Strict as Map
import Control.Concurrent
import Control.Concurrent.STM
import Data.IORef
import Control.Exception
import Data.Typeable
import Control.Monad
import Control.Retry
import Data.Time
data Uninitialised
data Initialised
data Supervisor_ a = Supervisor_ {
_sp_myTid :: !(Maybe ThreadId)
, _sp_children :: !(IORef (Map.HashMap ThreadId Child))
, _sp_mailbox :: TChan DeadLetter
, _sp_eventStream :: TBQueue SupervisionEvent
}
type SupervisorSpec = Supervisor_ Uninitialised
type Supervisor = Supervisor_ Initialised
data DeadLetter = DeadLetter ThreadId SomeException
data Child = Worker !RestartStrategy RestartAction
| Supvsr !RestartStrategy !(Supervisor_ Initialised)
type RestartAction = ThreadId -> IO ThreadId
data SupervisionEvent =
ChildBorn !ThreadId !UTCTime
| ChildDied !ThreadId !SomeException !UTCTime
| ChildRestarted !ThreadId !ThreadId !RestartStrategy !UTCTime
| ChildRestartLimitReached !ThreadId !RestartStrategy !UTCTime
| ChildFinished !ThreadId !UTCTime
deriving Show
data RestartStrategy =
OneForOne !Int RetryPolicy
instance Show RestartStrategy where
show (OneForOne r _) = "OneForOne (Restarted " <> show r <> " times)"
oneForOne :: RestartStrategy
oneForOne = OneForOne 0 $ fibonacciBackoff 100
newSupervisorSpec :: IO SupervisorSpec
newSupervisorSpec = do
tkn <- newTChanIO
evt <- newTBQueueIO 1000
ref <- newIORef Map.empty
return $ Supervisor_ Nothing ref tkn evt
newSupervisor :: SupervisorSpec -> IO Supervisor
newSupervisor spec = forkIO (handleEvents spec) >>= \tid -> do
mbx <- atomically $ dupTChan (_sp_mailbox spec)
return Supervisor_ {
_sp_myTid = Just tid
, _sp_mailbox = mbx
, _sp_children = _sp_children spec
, _sp_eventStream = _sp_eventStream spec
}
eventStream :: Supervisor -> TBQueue SupervisionEvent
eventStream (Supervisor_ _ _ _ e) = e
activeChildren :: Supervisor -> IO Int
activeChildren (Supervisor_ _ chRef _ _) = do
readIORef chRef >>= return . length . Map.keys
shutdownSupervisor :: Supervisor -> IO ()
shutdownSupervisor (Supervisor_ sId chRef _ _) = do
case sId of
Nothing -> return ()
Just tid -> do
chMap <- readIORef chRef
processChildren (Map.toList chMap)
killThread tid
where
processChildren [] = return ()
processChildren (x:xs) = do
case x of
(tid, Worker _ _) -> killThread tid
(_, Supvsr _ s) -> shutdownSupervisor s
processChildren xs
forkSupervised :: Supervisor
-> RestartStrategy
-> IO ()
-> IO ThreadId
forkSupervised sup@Supervisor_{..} str act =
bracket (supervised sup act) return $ \newChild -> do
let ch = Worker str (const (supervised sup act))
atomicModifyIORef' _sp_children $ \chMap -> (Map.insert newChild ch chMap, ())
now <- getCurrentTime
writeIfNotFull _sp_eventStream (ChildBorn newChild now)
return newChild
writeIfNotFull :: TBQueue SupervisionEvent -> SupervisionEvent -> IO ()
writeIfNotFull q evt = atomically $ do
isFull <- isFullTBQueue q
unless isFull $ writeTBQueue q evt
supervised :: Supervisor -> IO () -> IO ThreadId
supervised Supervisor_{..} act = forkFinally act $ \res -> case res of
Left ex -> bracket myThreadId return $ \myId -> atomically $
writeTChan _sp_mailbox (DeadLetter myId ex)
Right _ -> bracket myThreadId return $ \myId -> do
now <- getCurrentTime
atomicModifyIORef' _sp_children $ \chMap -> (Map.delete myId chMap, ())
writeIfNotFull _sp_eventStream (ChildFinished myId now)
handleEvents :: SupervisorSpec -> IO ()
handleEvents sp@(Supervisor_ myId myChildren myMailbox myStream) = do
(DeadLetter newDeath ex) <- atomically $ readTChan myMailbox
now <- getCurrentTime
writeIfNotFull myStream (ChildDied newDeath ex now)
case typeOf ex == (typeOf (undefined :: AsyncException)) of
True -> handleEvents sp
False -> do
chMap <- readIORef myChildren
case Map.lookup newDeath chMap of
Nothing -> return ()
Just (Worker str act) ->
applyStrategy str (\newStr -> writeIfNotFull myStream (ChildRestartLimitReached newDeath newStr now)) $ \newStr -> do
let ch = Worker newStr act
newThreadId <- act newDeath
writeIORef myChildren (Map.insert newThreadId ch $! Map.delete newDeath chMap)
writeIfNotFull myStream (ChildRestarted newDeath newThreadId newStr now)
Just (Supvsr str s@(Supervisor_ _ mbx cld es)) ->
applyStrategy str (\newStr -> writeIfNotFull myStream (ChildRestartLimitReached newDeath newStr now)) $ \newStr -> do
let node = Supervisor_ myId myChildren myMailbox myStream
let ch = (Supvsr newStr s)
newThreadId <- supervised node (handleEvents $ Supervisor_ Nothing mbx cld es)
writeIORef myChildren (Map.insert newThreadId ch $! Map.delete newDeath chMap)
writeIfNotFull myStream (ChildRestarted newDeath newThreadId newStr now)
handleEvents sp
where
applyStrategy :: RestartStrategy
-> (RestartStrategy -> IO ())
-> (RestartStrategy -> IO ())
-> IO ()
applyStrategy (OneForOne currentRestarts retryPol) ifAbort ifThrottle = do
let newStr = OneForOne (currentRestarts + 1) retryPol
case getRetryPolicy retryPol (currentRestarts + 1) of
Nothing -> ifAbort newStr
Just delay -> threadDelay delay >> ifThrottle newStr
newtype MonitorRequest = MonitoredSupervision ThreadId deriving (Show, Typeable)
instance Exception MonitorRequest
monitor :: Supervisor -> Supervisor -> IO ()
monitor (Supervisor_ _ _ mbox _) (Supervisor_ mbId _ _ _) = do
case mbId of
Nothing -> return ()
Just tid -> atomically $
writeTChan mbox (DeadLetter tid (toException $ MonitoredSupervision tid))