{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE Rank2Types #-}
module Control.Concurrent.NQE.Supervisor
( ChildAction
, Child
, SupervisorMessage
, Supervisor
, Strategy(..)
, withSupervisor
, supervisor
, supervisorProcess
, addChild
, removeChild
) where
import Control.Applicative
import Control.Concurrent.NQE.Process
import Control.Monad
import Data.List
import UnliftIO
type ChildAction = IO ()
type Child = Async ()
data SupervisorMessage
= AddChild !ChildAction
!(Listen Child)
| RemoveChild !Child
!(Listen ())
type Supervisor = Process SupervisorMessage
data Strategy
= Notify (Listen (Child, Maybe SomeException))
| KillAll
| IgnoreGraceful
| IgnoreAll
withSupervisor ::
MonadUnliftIO m
=> Strategy
-> (Supervisor -> m a)
-> m a
withSupervisor = withProcess . supervisorProcess
supervisor :: MonadUnliftIO m => Strategy -> m Supervisor
supervisor strat = process (supervisorProcess strat)
supervisorProcess ::
MonadUnliftIO m
=> Strategy
-> Inbox SupervisorMessage
-> m ()
supervisorProcess strat i = do
state <- newTVarIO []
finally (loop state) (stopAll state)
where
loop state = do
e <- atomically $ Right <$> receiveSTM i <|> Left <$> waitForChild state
again <-
case e of
Right m -> processMessage state m
Left x -> processDead state strat x
when again $ loop state
addChild :: MonadIO m => Supervisor -> ChildAction -> m Child
addChild sup action = AddChild action `query` sup
removeChild :: MonadIO m => Supervisor -> Child -> m ()
removeChild sup c = RemoveChild c `query` sup
stopAll :: MonadUnliftIO m => TVar [Child] -> m ()
stopAll state = mask_ $ do
as <- readTVarIO state
mapM_ cancel as
waitForChild :: TVar [Child] -> STM (Child, Either SomeException ())
waitForChild state = do
as <- readTVar state
waitAnyCatchSTM as
processMessage ::
MonadUnliftIO m => TVar [Child] -> SupervisorMessage -> m Bool
processMessage state (AddChild ch r) = do
a <- startChild state ch
atomically $ r a
return True
processMessage state (RemoveChild a r) = do
stopChild state a
atomically $ r ()
return True
processDead ::
MonadUnliftIO m
=> TVar [Child]
-> Strategy
-> (Child, Either SomeException ())
-> m Bool
processDead state IgnoreAll (a, _) = do
atomically . modifyTVar' state $ filter (/= a)
return True
processDead state KillAll (a, e) = do
atomically $ modifyTVar' state . filter $ (/= a)
stopAll state
case e of
Left x -> throwIO x
Right () -> return False
processDead state IgnoreGraceful (a, Right ()) = do
atomically (modifyTVar' state (filter (/= a)))
return True
processDead state IgnoreGraceful (a, Left e) = do
atomically $ modifyTVar' state (filter (/= a))
stopAll state
throwIO e
processDead state (Notify notif) (a, ee) = do
atomically $ do
as <- readTVar state
case find (== a) as of
Just p -> notif (p, me)
Nothing -> return ()
modifyTVar state (filter (/= a))
return True
where
me =
case ee of
Left e -> Just e
Right () -> Nothing
startChild :: MonadUnliftIO m => TVar [Child] -> ChildAction -> m Child
startChild state ch = mask_ $ do
a <- liftIO $ async ch
atomically $ modifyTVar' state (a :)
return a
stopChild :: MonadUnliftIO m => TVar [Child] -> Child -> m ()
stopChild state a = mask_ $ do
isChild <-
atomically $ do
cur <- readTVar state
let new = filter (/= a) cur
writeTVar state new
return (cur /= new)
when isChild $ cancel a