gang-of-threads-3.2.1: Non-deterministic parallelism with bags

PortabilityPOSIX
Stabilityexperimental
Maintainerbastianholst@gmx.de
Safe HaskellSafe-Inferred

Control.Concurrent.Bag.Safe

Description

High level bag of tasks interface based on Control.Concurrent.Bag.Basic. Tasks can only return results and add new tasks as intended and it is not possible to add new tasks from the outside or from the action processing the results. This way it is possible to ensure that getResults returns only Nothing if it is safe to say that there will be no results anymore.

Synopsis

Documentation

data BagT r m a Source

A monad transformer for processing the results of the bag sequencially. In addition to the actions available in the base monad, which has to be an instance of MonadIO in all functions, it provides the action getResult to get a result of the bag.

Instances

MonadTrans (BagT r) 
Monad m => Monad (BagT r m) 
Monad m => Functor (BagT r m) 
Monad m => Applicative (BagT r m) 
MonadIO m => MonadIO (BagT r m) 

newTaskBagSource

Arguments

:: MonadIO m 
=> BufferType

buffer type

-> Maybe (SplitFunction r)

Possible split function

If the function is given, we will create a bag with one buffer per worker reducing the communication between the workers.

-> [TaskIO r (Maybe r)]

list of initial tasks

-> BagT r m a

action to process the results of the bag

-> m a 

Initializes a new bag of tasks and starts a gang of workers threads. The number of worker threads is equal to the number of capabilities of the Haskell runtime (see getNumCapabilities).

__WARNING__: If it may be necessary to terminate the thread pool, i.e. because the result processing function does not always request all values, you have to make sure that the task can be stopped. Terminating the tasks is done with asynchronous exceptions which can only be received at a emph{safe point}. Safe points are all points where memory allocation is requested, but there are calculations and also loops which never need any new memory. These calculations cannot be terminated and may run forever, see the documentation of throwTo.

newEvalBagSource

Arguments

:: MonadIO m 
=> BufferType

buffer type

-> Maybe (SplitFunction r)

Possible split function If the function is given, we will create a bag with one buffer per worker reducing the communication between the workers.

-> [r]

expressions to evaluate

-> BagT r m a

action to process the results of the bag

-> m a 

Like newTaskBag, but it takes a list of expressions that will be evaluated to weak head normal form using seq.

__WARNING__: This does not evaluate to normal form, but only to weak head normal form.

newInterruptibleBagSource

Arguments

:: MonadIO m 
=> BufferType

buffer type

-> Maybe (SplitFunction r)

Possible split function

If the function is given, we will create a bag with one buffer per worker reducing the communication between the workers.

-> [Interruptible r]

list of initial tasks

-> BagT r m a

action to process the results of the bag

-> m a 

Similar to newTaskBag, but taking a list of Interruptible instead of tasks.

newInterruptingBagSource

Arguments

:: MonadIO m 
=> BufferType

buffer type

-> Maybe (SplitFunction r)

Possible split function

If the function is given, we will create a bag with one buffer per worker reducing the communication between the workers.

-> [Interruptible r]

list of initial tasks

-> BagT r m a

action to process the results of the bag

-> m a 

Similar to newInterruptibleBag, but interrupts the tasks in certain intervals. Using a TChan as buffer, this ensures completeness: all tasks that have a result will get their time to evaluate it. Note, that calculations, that do no memory allocation, cannot be interrupted.

getResult :: MonadIO m => BagT r m (Maybe r)Source

Get a result of the bag if there is one. If it returns Nothing, all tasks have been processed and there are no results left. getResults blocks until a task has been evaluated to a result or all tasks are processed. Therefore it may block forever.

getAllResults :: MonadIO m => BagT a m [a]Source

Convenience function to get all results from the bag of tasks.

liftIO :: MonadIO m => forall a. IO a -> m a

Lift a computation from the IO monad.

lift :: MonadTrans t => forall m a. Monad m => m a -> t m a

Lift a computation from the argument monad to the constructed monad.

data TaskBufferSTM a Source

A buffer holding tasks.

For this type, all access functions are using the STM monad.

Note, that this is not a type class because we want to allow the user to select between multiple buffers other than on type level.

Constructors

TaskBufferSTM 

Fields

writeBufferSTM :: a -> STM ()

Function to write an item into the buffer in the normal way.

unGetBufferSTM :: a -> STM ()

Function to write an item into the buffer at the read end.

readBufferSTM :: STM a

Function to read item from the buffer. Blocks if empty.

tryReadBufferSTM :: STM (Maybe a)

Function to try to read an item from the buffer. Returns Nothing if empty.

isEmptyBufferSTM :: STM Bool

Check whether the buffer is empty.

type SplitFunction r = TaskBufferSTM (IO (Maybe r)) -> TaskBufferSTM (IO (Maybe r)) -> STM (IO (Maybe r))Source

Split functions are used to split the contents of the source buffer into two parts. One part is left in this buffer or put back later; the other part is written into the sink buffer. One element of this part is returned in the STM monad. This is why the source buffer should always have at least one item available. If it has not, the action will suspend.

takeFirst :: SplitFunction rSource

Just take the first item from the source buffer.

data BufferType Source

The type of a buffer. At this time you can only select between Queue and Stack.

Constructors

Queue

A first in first out (FIFO) buffer.

Stack

A last in first out (LIFO) buffer.