immortal-queue-0.1.0.0: Build a pool of queue-processing worker threads.

Safe HaskellNone
LanguageHaskell2010

Control.Immortal.Queue

Contents

Description

This module uses the immortal library to build a pool of worker threads that process a queue of tasks asynchronously.

First build an ImmortalQueue for your task type and queue backend. Then you can launch the pool using processImmortalQueue and stop the pool with closeImmortalQueue.

import Control.Concurrent.STM (atomically)
import Control.Concurrent.STM.TQueue
import Control.Exception (Exception)
import Control.Immortal.Queue

data Task
    = Print String
    deriving (Show)

queueConfig :: TQueue Task -> ImmortalQueue Task
queueConfig queue =
    ImmortalQueue
        { qThreadCount = 2
        , qPollWorkerTime = 1000
        , qPop = atomically $ readTQueue queue
        , qPush = atomically . writeTQueue queue
        , qHandler = performTask
        , qFailure = printError
        }
  where
    performTask :: Task -> IO ()
    performTask t = case t of
        Print str ->
            putStrLn str
    printError :: Exception e => Task -> e -> IO ()
    printError t err =
        let description = case t of
                Print str ->
                    "print"
        in  putStrLn $ "Task `" ++ description ++ "` failed with: " ++ show err

main :: IO ()
main = do
    queue <- newTQueueIO
    workers <- processImmortalQueue $ queueConfig queue
    atomically $ mapM_ (writeTQueue queue . Print) ["hello", "world"]
    closeImmortalQueue workers
Synopsis

Config

data ImmortalQueue a Source #

The configuration data required for initializing a worker pool.

Constructors

ImmortalQueue 

Fields

  • qThreadCount :: Natural

    Number of worker threads to run.

  • qPollWorkerTime :: Int

    Wait time in milliseconds for polling for a free worker.

  • qPop :: IO a

    A blocking action to pop the next item off of the queue.

  • qPush :: a -> IO ()

    An action to enqueue a task. Used during shutdown if we've popped an item but haven't assigned it to a worker yet.

  • qHandler :: a -> IO ()

    The handler to perform a queued task.

  • qFailure :: forall e. Exception e => a -> e -> IO ()

    An error handler for when a thread encounters an unhandled exception.

Run

processImmortalQueue :: forall a. ImmortalQueue a -> IO QueueId Source #

Start a management thread that creates the queue-processing worker threads & return a QueueId that can be used to stop the workers.

data QueueId Source #

An identifier created by a queue manager that can be used to stop the worker processes.

Stop

closeImmortalQueue :: QueueId -> IO () Source #

Cleanly close the worker pool, allowing them to complete their actions.

killImmortalQueue :: QueueId -> IO () Source #

Uncleanly close the worker pool, aborting current actions.