module System.Restricted.Worker.Pool
(
WorkersPool
, mkPool
, withWorker
, takeWorker
, putWorker
, destroyWorker
) where
import Control.Applicative ((<$>), (<*>))
import Control.Concurrent (forkIO, killThread, threadDelay)
import Control.Concurrent.STM
import Control.Exception (mask, onException)
import Control.Monad (forM, forever, join, when)
import Control.Monad.Base (MonadBase(..))
import Control.Monad.Trans.Control (MonadBaseControl, control)
import Control.Monad.IO.Class (MonadIO, liftIO)
import System.Mem.Weak (addFinalizer)
import System.Restricted.Worker.Internal
import System.Restricted.Worker.Types
data WorkersPool w = Pool
{ newWorker :: Int -> WMonad w (Worker w, RestartWorker IO w)
, maxWorkers :: Int
, activeWorkers :: TVar Int
, workers :: TVar [(Worker w, RestartWorker IO w)]
, restartRate :: Int
}
mkPool :: (MonadIO (WMonad a))
=> (Int -> WMonad a (Worker a, RestartWorker IO a))
-> Int
-> Int
-> WMonad a (WorkersPool a)
mkPool newW maxW restartRate = do
res <- liftIO $ atomically $ newTVar []
num <- liftIO $ atomically $ newTVar (0 :: Int)
reaperT <- liftIO $ forkIO $ reaper res restartRate
let p = Pool newW maxW num res restartRate
liftIO $ addFinalizer p (killThread reaperT)
return p
reaper :: TVar [(Worker a, RestartWorker IO a)] -> Int -> IO ()
reaper wrkrs t' = forever $ do
let t = t' * 1000000
threadDelay t
workers <- readTVarIO wrkrs
workers' <- forM workers $ \(w, rw) -> (,) <$> rw w <*> return rw
atomically $ writeTVar wrkrs workers'
takeWorker :: (MonadIO (WMonad a), MonadBaseControl IO (WMonad a))
=> WorkersPool a -> WMonad a (Worker a, RestartWorker IO a)
takeWorker Pool{..} = do
res <- liftIO $ readTVarIO workers
case res of
((w@Worker{..}, restartW):xs) -> liftIO $ do
atomically $ writeTVar workers xs
workerDead <- not <$> workerAlive w
wrk <- if workerDead
then do
restartW w
else return w
return (wrk, restartW)
[] -> join $ liftIO . atomically $ do
activeRes <- readTVar activeWorkers
when (activeRes >= maxWorkers) retry
modifyTVar' activeWorkers (+1)
return $ control $ \runIO ->
runIO (newWorker (activeRes+1))
`onException`
atomically (modifyTVar' activeWorkers (subtract 1))
putWorker :: WorkersPool a -> (Worker a, RestartWorker IO a) -> IO ()
putWorker Pool{..} w = atomically $
modifyTVar' workers (w:)
destroyWorker :: WorkersPool a -> Worker a -> IO ()
destroyWorker Pool{..} w = do
_ <- killWorker w
atomically $ modifyTVar' activeWorkers (subtract 1)
withWorker :: (MonadBaseControl IO m, MonadBase (WMonad a) m,
MonadBaseControl IO (WMonad a), MonadIO (WMonad a))
=> WorkersPool a
-> ((Worker a, RestartWorker IO a) -> m b)
-> m b
withWorker pool cb = do
res <- liftBase $ takeWorker pool
control $ \runIO -> mask $ \restore -> do
ret <- restore (runIO (cb res))
`onException` destroyWorker pool (fst res)
putWorker pool res
return ret