immortal-queue- Build a pool of queue-processing worker threads.

Safe HaskellNone




This module uses the immortal library to build a pool of worker threads that process a queue of tasks asynchronously.

First build an ImmortalQueue for your task type and queue backend. Then you can launch the pool using processImmortalQueue and stop the pool with closeImmortalQueue.

import Control.Concurrent.STM (atomically)
import Control.Concurrent.STM.TQueue
import Control.Exception (Exception)
import Control.Immortal.Queue

data Task
    = Print String
    deriving (Show)

queueConfig :: TQueue Task -> ImmortalQueue Task
queueConfig queue =
        { qThreadCount = 2
        , qPollWorkerTime = 1000
        , qPop = atomically $ readTQueue queue
        , qPush = atomically . writeTQueue queue
        , qHandler = performTask
        , qFailure = printError
    performTask :: Task -> IO ()
    performTask t = case t of
        Print str ->
            putStrLn str
    printError :: Exception e => Task -> e -> IO ()
    printError t err =
        let description = case t of
                Print str ->
        in  putStrLn $ "Task `" ++ description ++ "` failed with: " ++ show err

main :: IO ()
main = do
    queue <- newTQueueIO
    workers <- processImmortalQueue $ queueConfig queue
    atomically $ mapM_ (writeTQueue queue . Print) ["hello", "world"]
    closeImmortalQueue workers


data ImmortalQueue a Source #

The configuration data required for initializing a worker pool.




  • qThreadCount :: Natural

    Number of worker threads to run.

  • qPollWorkerTime :: Int

    Wait time in milliseconds for polling for a free worker.

  • qPop :: IO a

    A blocking action to pop the next item off of the queue.

  • qPush :: a -> IO ()

    An action to enqueue a task. Used during shutdown if we've popped an item but haven't assigned it to a worker yet.

  • qHandler :: a -> IO ()

    The handler to perform a queued task.

  • qFailure :: forall e. Exception e => a -> e -> IO ()

    An error handler for when a thread encounters an unhandled exception.


processImmortalQueue :: forall a. ImmortalQueue a -> IO QueueId Source #

Start a management thread that creates the queue-processing worker threads & return a QueueId that can be used to stop the workers.

data QueueId Source #

An identifier created by a queue manager that can be used to stop the worker processes.


closeImmortalQueue :: QueueId -> IO () Source #

Cleanly close the worker pool, allowing them to complete their actions.

killImmortalQueue :: QueueId -> IO () Source #

Uncleanly close the worker pool, aborting current actions.