{- Humble module inspired to Erlang supervisors, with minimal dependencies. -} {-# LANGUAGE GADTs #-} {-# LANGUAGE RankNTypes #-} {-# LANGUAGE RecordWildCards #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE DeriveDataTypeable #-} module Control.Concurrent.Supervisor ( SupervisorSpec , Supervisor , DeadLetter , RestartAction , SupervisionEvent(..) , RestartStrategy(..) -- * Creating a new supervisor spec -- $new , newSupervisorSpec -- * Creating a new supervisor -- $sup , newSupervisor -- * Restart Strategies , oneForOne -- * Stopping a supervisor -- $shutdown , shutdownSupervisor -- * Accessing Supervisor event log -- $log , eventStream , activeChildren -- * Supervise a forked thread -- $fork , forkSupervised -- * Monitor another supervisor -- $monitor , monitor ) where import qualified Data.HashMap.Strict as Map import Control.Concurrent import Control.Concurrent.STM import Data.IORef import Control.Exception import Data.Typeable import Control.Monad import Control.Retry import Data.Time -------------------------------------------------------------------------------- data Uninitialised data Initialised -------------------------------------------------------------------------------- data Supervisor_ a = Supervisor_ { _sp_myTid :: !(Maybe ThreadId) , _sp_children :: !(IORef (Map.HashMap ThreadId Child)) , _sp_mailbox :: TChan DeadLetter , _sp_eventStream :: TBQueue SupervisionEvent } type SupervisorSpec = Supervisor_ Uninitialised type Supervisor = Supervisor_ Initialised -------------------------------------------------------------------------------- data DeadLetter = DeadLetter ThreadId SomeException -------------------------------------------------------------------------------- data Child = Worker !RestartStrategy RestartAction | Supvsr !RestartStrategy !(Supervisor_ Initialised) -------------------------------------------------------------------------------- type RestartAction = ThreadId -> IO ThreadId -------------------------------------------------------------------------------- data SupervisionEvent = ChildBorn !ThreadId !UTCTime | ChildDied !ThreadId !SomeException !UTCTime | ChildRestarted !ThreadId !ThreadId !RestartStrategy !UTCTime | ChildRestartLimitReached !ThreadId !RestartStrategy !UTCTime | ChildFinished !ThreadId !UTCTime deriving Show -------------------------------------------------------------------------------- -- | Erlang inspired strategies. At the moment only the 'OneForOne' is -- implemented. data RestartStrategy = OneForOne !Int RetryPolicy instance Show RestartStrategy where show (OneForOne r _) = "OneForOne (Restarted " <> show r <> " times)" -------------------------------------------------------------------------------- -- | Smart constructor which offers a default throttling based on -- fibonacci numbers. oneForOne :: RestartStrategy oneForOne = OneForOne 0 $ fibonacciBackoff 100 -- $new -- In order to create a new supervisor, you need a `SupervisorSpec`, -- which can be acquired by a call to `newSupervisor`: -------------------------------------------------------------------------------- -- | Creates a new 'SupervisorSpec'. The reason it doesn't return a -- 'Supervisor' is to force you to call 'supervise' explicitly, in order to start the -- supervisor thread. newSupervisorSpec :: IO SupervisorSpec newSupervisorSpec = do tkn <- newTChanIO evt <- newTBQueueIO 1000 ref <- newIORef Map.empty return $ Supervisor_ Nothing ref tkn evt -- $supervise -------------------------------------------------------------------------------- newSupervisor :: SupervisorSpec -> IO Supervisor newSupervisor spec = forkIO (handleEvents spec) >>= \tid -> do mbx <- atomically $ dupTChan (_sp_mailbox spec) return Supervisor_ { _sp_myTid = Just tid , _sp_mailbox = mbx , _sp_children = _sp_children spec , _sp_eventStream = _sp_eventStream spec } -- $log -------------------------------------------------------------------------------- -- | Gives you access to the event this supervisor is generating, allowing you -- to react. It's using a bounded queue to explicitly avoid memory leaks in case -- you do not want to drain the queue to listen to incoming events. eventStream :: Supervisor -> TBQueue SupervisionEvent eventStream (Supervisor_ _ _ _ e) = e -------------------------------------------------------------------------------- -- | Returns the number of active threads at a given moment in time. activeChildren :: Supervisor -> IO Int activeChildren (Supervisor_ _ chRef _ _) = do readIORef chRef >>= return . length . Map.keys -- $shutdown -------------------------------------------------------------------------------- -- | Shutdown the given supervisor. This will cause the supervised children to -- be killed as well. To do so, we explore the children tree, killing workers as we go, -- and recursively calling `shutdownSupervisor` in case we hit a monitored `Supervisor`. shutdownSupervisor :: Supervisor -> IO () shutdownSupervisor (Supervisor_ sId chRef _ _) = do case sId of Nothing -> return () Just tid -> do chMap <- readIORef chRef processChildren (Map.toList chMap) killThread tid where processChildren [] = return () processChildren (x:xs) = do case x of (tid, Worker _ _) -> killThread tid (_, Supvsr _ s) -> shutdownSupervisor s processChildren xs -- $fork -------------------------------------------------------------------------------- -- | Fork a thread in a supervised mode. forkSupervised :: Supervisor -- ^ The 'Supervisor' -> RestartStrategy -- ^ The 'RestartStrategy' to use -> IO () -- ^ The computation to run -> IO ThreadId forkSupervised sup@Supervisor_{..} str act = bracket (supervised sup act) return $ \newChild -> do let ch = Worker str (const (supervised sup act)) atomicModifyIORef' _sp_children $ \chMap -> (Map.insert newChild ch chMap, ()) now <- getCurrentTime writeIfNotFull _sp_eventStream (ChildBorn newChild now) return newChild -------------------------------------------------------------------------------- writeIfNotFull :: TBQueue SupervisionEvent -> SupervisionEvent -> IO () writeIfNotFull q evt = atomically $ do isFull <- isFullTBQueue q unless isFull $ writeTBQueue q evt -------------------------------------------------------------------------------- supervised :: Supervisor -> IO () -> IO ThreadId supervised Supervisor_{..} act = forkFinally act $ \res -> case res of Left ex -> bracket myThreadId return $ \myId -> atomically $ writeTChan _sp_mailbox (DeadLetter myId ex) Right _ -> bracket myThreadId return $ \myId -> do now <- getCurrentTime atomicModifyIORef' _sp_children $ \chMap -> (Map.delete myId chMap, ()) writeIfNotFull _sp_eventStream (ChildFinished myId now) -------------------------------------------------------------------------------- handleEvents :: SupervisorSpec -> IO () handleEvents sp@(Supervisor_ myId myChildren myMailbox myStream) = do (DeadLetter newDeath ex) <- atomically $ readTChan myMailbox now <- getCurrentTime writeIfNotFull myStream (ChildDied newDeath ex now) -- If we catch an `AsyncException`, we have nothing but good -- reasons not to restart the thread. case typeOf ex == (typeOf (undefined :: AsyncException)) of True -> handleEvents sp False -> do chMap <- readIORef myChildren case Map.lookup newDeath chMap of Nothing -> return () Just (Worker str act) -> applyStrategy str (\newStr -> writeIfNotFull myStream (ChildRestartLimitReached newDeath newStr now)) $ \newStr -> do let ch = Worker newStr act newThreadId <- act newDeath writeIORef myChildren (Map.insert newThreadId ch $! Map.delete newDeath chMap) writeIfNotFull myStream (ChildRestarted newDeath newThreadId newStr now) Just (Supvsr str s@(Supervisor_ _ mbx cld es)) -> applyStrategy str (\newStr -> writeIfNotFull myStream (ChildRestartLimitReached newDeath newStr now)) $ \newStr -> do let node = Supervisor_ myId myChildren myMailbox myStream let ch = (Supvsr newStr s) newThreadId <- supervised node (handleEvents $ Supervisor_ Nothing mbx cld es) writeIORef myChildren (Map.insert newThreadId ch $! Map.delete newDeath chMap) writeIfNotFull myStream (ChildRestarted newDeath newThreadId newStr now) handleEvents sp where applyStrategy :: RestartStrategy -> (RestartStrategy -> IO ()) -> (RestartStrategy -> IO ()) -> IO () applyStrategy (OneForOne currentRestarts retryPol) ifAbort ifThrottle = do let newStr = OneForOne (currentRestarts + 1) retryPol case getRetryPolicy retryPol (currentRestarts + 1) of Nothing -> ifAbort newStr Just delay -> threadDelay delay >> ifThrottle newStr -- $monitor newtype MonitorRequest = MonitoredSupervision ThreadId deriving (Show, Typeable) instance Exception MonitorRequest -------------------------------------------------------------------------------- -- | Monitor another supervisor. To achieve these, we simulate a new 'DeadLetter', -- so that the first supervisor will effectively restart the monitored one. -- Thanks to the fact that for the supervisor the restart means we just copy over -- its internal state, it should be perfectly fine to do so. monitor :: Supervisor -> Supervisor -> IO () monitor (Supervisor_ _ _ mbox _) (Supervisor_ mbId _ _ _) = do case mbId of Nothing -> return () Just tid -> atomically $ writeTChan mbox (DeadLetter tid (toException $ MonitoredSupervision tid))