{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE MultiParamTypeClasses #-}
module Control.Concurrent.NQE.Supervisor
( SupervisorMessage
, Strategy(..)
, supervisor
, addChild
, removeChild
, stopSupervisor
) where
import Control.Applicative
import Control.Concurrent.NQE.Process
import Control.Monad
import Control.Monad.STM (catchSTM)
import UnliftIO
data SupervisorMessage n
= MonadUnliftIO n =>
AddChild (n ())
(Reply (Async ()))
| RemoveChild (Async ())
| StopSupervisor
data Strategy
= Notify ((Async (), Either SomeException ()) -> STM ())
| KillAll
| IgnoreGraceful
| IgnoreAll
supervisor ::
(MonadUnliftIO m, Mailbox mbox (SupervisorMessage n), m ~ n)
=> Strategy
-> mbox (SupervisorMessage n)
-> [n ()]
-> m ()
supervisor strat mbox children = do
state <- newTVarIO []
finally (go state) (down state)
where
go state = do
mapM_ (startChild state) children
loop state
loop state = do
e <-
atomically $
Right <$> receiveSTM mbox <|> Left <$> waitForChild state
again <-
case e of
Right m -> processMessage state m
Left x -> processDead state strat x
when again $ loop state
down state = do
as <- readTVarIO state
mapM_ cancel as
waitForChild :: TVar [Async ()] -> STM (Async (), Either SomeException ())
waitForChild state = do
as <- readTVar state
waitAnyCatchSTM as
processMessage ::
(MonadUnliftIO m)
=> TVar [Async ()]
-> SupervisorMessage m
-> m Bool
processMessage state (AddChild ch r) = do
a <- async ch
atomically $ do
modifyTVar' state (a:)
r a
return True
processMessage state (RemoveChild a) = do
atomically (modifyTVar' state (filter (/= a)))
cancel a
return True
processMessage state StopSupervisor = do
as <- readTVarIO state
forM_ as (stopChild state)
return False
processDead ::
(MonadIO m)
=> TVar [Async ()]
-> Strategy
-> (Async (), Either SomeException ())
-> m Bool
processDead state IgnoreAll (a, _) = do
atomically (modifyTVar' state (filter (/= a)))
return True
processDead state KillAll (a, e) = do
as <- atomically $ do
modifyTVar' state (filter (/= a))
readTVar state
mapM_ (stopChild state) as
case e of
Left x -> throwIO x
Right () -> return False
processDead state IgnoreGraceful (a, Right ()) = do
atomically (modifyTVar' state (filter (/= a)))
return True
processDead state IgnoreGraceful (a, Left e) = do
as <- atomically $ do
modifyTVar' state (filter (/= a))
readTVar state
mapM_ (stopChild state) as
throwIO e
processDead state (Notify notif) (a, e) = do
x <-
atomically $ do
modifyTVar' state (filter (/= a))
catchSTM (notif (a, e) >> return Nothing) $ \x ->
return $ Just (x :: SomeException)
case x of
Nothing -> return True
Just ex -> do
as <- readTVarIO state
forM_ as (stopChild state)
throwIO ex
startChild ::
(MonadUnliftIO m)
=> TVar [Async ()]
-> m ()
-> m (Async ())
startChild state run = do
a <- async run
atomically (modifyTVar' state (a:))
return a
stopChild :: MonadIO m => TVar [Async ()] -> Async () -> m ()
stopChild state a = do
isChild <-
atomically $ do
cur <- readTVar state
let new = filter (/= a) cur
writeTVar state new
return (cur /= new)
when isChild (cancel a)
addChild ::
(MonadUnliftIO n, MonadIO m, Mailbox mbox (SupervisorMessage n))
=> mbox (SupervisorMessage n)
-> n ()
-> m (Async ())
addChild mbox action = AddChild action `query` mbox
removeChild ::
(MonadUnliftIO n, MonadIO m, Mailbox mbox (SupervisorMessage n))
=> mbox (SupervisorMessage n)
-> Async ()
-> m ()
removeChild mbox child = RemoveChild child `send` mbox
stopSupervisor ::
(MonadUnliftIO n, MonadIO m, Mailbox mbox (SupervisorMessage n))
=> mbox (SupervisorMessage n)
-> m ()
stopSupervisor mbox = StopSupervisor `send` mbox