{-# LANGUAGE RankNTypes #-} {-# LANGUAGE LambdaCase #-} {-# LANGUAGE ScopedTypeVariables #-} {- | This module uses the library to build a pool of worker threads that process a queue of tasks asynchronously. First build an 'ImmortalQueue' for your task type and queue backend. Then you can launch the pool using 'processImmortalQueue' and stop the pool with 'closeImmortalQueue'. > import Control.Concurrent.STM (atomically) > import Control.Concurrent.STM.TQueue > import Control.Exception (Exception) > import Control.Immortal.Queue > > data Task > = Print String > deriving (Show) > > queueConfig :: TQueue Task -> ImmortalQueue Task > queueConfig queue = > ImmortalQueue > { qThreadCount = 2 > , qPollWorkerTime = 1000 > , qPop = atomically $ readTQueue queue > , qPush = atomically . writeTQueue queue > , qHandler = performTask > , qFailure = printError > } > where > performTask :: Task -> IO () > performTask t = case t of > Print str -> > putStrLn str > printError :: Exception e => Task -> e -> IO () > printError t err = > let description = case t of > Print str -> > "print" > in putStrLn $ "Task `" ++ description ++ "` failed with: " ++ show err > > main :: IO () > main = do > queue <- newTQueueIO > workers <- processImmortalQueue $ queueConfig queue > atomically $ mapM_ (writeTQueue queue . Print) ["hello", "world"] > closeImmortalQueue workers -} module Control.Immortal.Queue ( -- * Config ImmortalQueue(..) -- * Run , processImmortalQueue , QueueId -- * Stop , closeImmortalQueue , killImmortalQueue ) where import Control.Concurrent ( MVar , newEmptyMVar , takeMVar , putMVar , threadDelay , readMVar , tryPutMVar , tryTakeMVar ) import Control.Concurrent.Async ( Async , async , wait , race , cancel ) import Control.Exception ( Exception ) import Control.Immortal ( Thread ) import Control.Monad ( (>=>) , forever , void ) import Numeric.Natural ( Natural ) import qualified Control.Immortal as Immortal -- | The configuration data required for initializing a worker pool. data ImmortalQueue a = ImmortalQueue { qThreadCount :: Natural -- ^ Number of worker threads to run. , qPollWorkerTime :: Int -- ^ Wait time in milliseconds for polling for a free worker. , qPop :: IO a -- ^ A blocking action to pop the next item off of the queue. , qPush :: a -> IO () -- ^ An action to enqueue a task. Used during shutdown if we've -- popped an item but haven't assigned it to a worker yet. , qHandler :: a -> IO () -- ^ The handler to perform a queued task. , qFailure :: forall e. Exception e => a -> e -> IO () -- ^ An error handler for when a thread encounters an unhandled -- exception. } -- | Start a management thread that creates the queue-processing worker -- threads & return a QueueId that can be used to stop the workers. processImmortalQueue :: forall a . ImmortalQueue a -> IO QueueId processImmortalQueue queue = do shutdown <- newEmptyMVar asyncQueue <- async $ do threads <- mapM (const makeWorker) [1 .. qThreadCount queue] nextAction <- newEmptyMVar asyncQueuePopper <- async $ popQueue nextAction threads cleanClose <- takeMVar shutdown cancel asyncQueuePopper if cleanClose then do mapM_ (Immortal.mortalize . wdThread) threads mapM_ (flip putMVar () . wdCloseMVar) threads else mapM_ (Immortal.stop . wdThread) threads mapM_ (Immortal.wait . wdThread) threads tryTakeMVar nextAction >>= \case Nothing -> return () Just action -> qPush queue action return QueueId { qiCloseCleanly = shutdown, qiAsyncQueue = asyncQueue } where -- Create the communication MVars for a worker & then start the -- Immortal thread. makeWorker :: IO (WorkerData a) makeWorker = do closeMVar <- newEmptyMVar inputMVar <- newEmptyMVar let makeData thread = WorkerData { wdCloseMVar = closeMVar , wdInputMVar = inputMVar , wdThread = thread } makeData <$> Immortal.create (processAction . makeData) -- Wait for an input action or the close signal, then process the input -- or close by returning. processAction :: WorkerData a -> IO () processAction workerData = do actionOrClose <- race (takeMVar $ wdCloseMVar workerData) (readMVar $ wdInputMVar workerData) case actionOrClose of Left () -> return () Right action -> let flushInput = void $ takeMVar $ wdInputMVar workerData finalize = errorHandler action >=> const flushInput in Immortal.onFinish finalize $ qHandler queue action -- Ignore successful results & run the error handler on any exceptions. errorHandler :: Exception e => a -> Either e () -> IO () errorHandler action = either (qFailure queue action) (const $ return ()) -- Pop an item from the queue, put it in the enqueued action MVar, -- assign it to a worker, & then remove it as the currently enqueued -- action. popQueue :: MVar a -> [WorkerData a] -> IO () popQueue actionMVar workers = forever $ do nextAction <- qPop queue putMVar actionMVar nextAction assignWork workers nextAction void $ takeMVar actionMVar -- Assign the action to the first free worker, retrying until a worker -- is available. assignWork :: [WorkerData a] -> a -> IO () assignWork workers action = do assigned <- assignToFreeWorker action workers if assigned then return () else threadDelay (1000 * qPollWorkerTime queue) >> assignWork workers action -- Iterate through the workers, trying to assign the action & returning -- whether assigning was successful or not. assignToFreeWorker :: a -> [WorkerData a] -> IO Bool assignToFreeWorker action = \case [] -> return False worker : ws -> do successfulPut <- tryPutMVar (wdInputMVar worker) action if successfulPut then return True else assignToFreeWorker action ws data WorkerData a = WorkerData { wdCloseMVar :: MVar () -- ^ Signal to shut down the worker , wdInputMVar :: MVar a -- ^ The action to process , wdThread :: Thread -- ^ The "Immortal" thread for the worker } -- | An identifier created by a queue manager that can be used to stop the -- worker processes. data QueueId = QueueId { qiCloseCleanly :: MVar Bool -- ^ Signal to close the queue cleanly or immediately. , qiAsyncQueue :: Async () -- ^ The pool's management thread. } -- | Cleanly close the worker pool, allowing them to complete their -- actions. closeImmortalQueue :: QueueId -> IO () closeImmortalQueue queueId = putMVar (qiCloseCleanly queueId) True >> wait (qiAsyncQueue queueId) -- | Uncleanly close the worker pool, aborting current actions. killImmortalQueue :: QueueId -> IO () killImmortalQueue queueId = putMVar (qiCloseCleanly queueId) False >> wait (qiAsyncQueue queueId)