parallel-io-0.3.0: Combinators for executing IO actions in parallel on a thread pool.

Control.Concurrent.ParallelIO.Local

Contents

Description

Parallelism combinators with explicit thread-pool creation and passing.

The most basic example of usage is:

 main = withPool 2 $ \pool -> parallel_ pool [putStrLn "Echo", putStrLn " in parallel"]

Make sure that you compile with -threaded and supply +RTS -N2 -RTS to the generated Haskell executable, or you won't get any parallelism.

If you plan to allow your worker items to block, then you should read the documentation for extraWorkerWhileBlocked.

The Control.Concurrent.ParallelIO.Global module is implemented on top of this one by maintaining a shared global thread pool with one thread per capability.

Synopsis

Executing actions

parallel_ :: Pool -> [IO a] -> IO ()Source

Run the list of computations in parallel.

Has the following properties:

  1. Never creates more or less unblocked threads than are specified to live in the pool. NB: this count includes the thread executing parallel_. This should minimize contention and hence pre-emption, while also preventing starvation.
  2. On return all actions have been performed.
  3. The function returns in a timely manner as soon as all actions have been performed.
  4. The above properties are true even if parallel_ is used by an action which is itself being executed by one of the parallel combinators.

If any of the IO actions throws an exception, undefined behaviour will result. If you want safety, wrap your actions in Control.Exception.try.

parallel :: Pool -> [IO a] -> IO [a]Source

Run the list of computations in parallel, returning the results in the same order as the corresponding actions.

Has the following properties:

  1. Never creates more or less unblocked threads than are specified to live in the pool. NB: this count includes the thread executing parallel. This should minimize contention and hence pre-emption, while also preventing starvation.
  2. On return all actions have been performed.
  3. The function returns in a timely manner as soon as all actions have been performed.
  4. The above properties are true even if parallel is used by an action which is itself being executed by one of the parallel combinators.

If any of the IO actions throws an exception, undefined behaviour will result. If you want safety, wrap your actions in Control.Exception.try.

parallelInterleaved :: Pool -> [IO a] -> IO [a]Source

Run the list of computations in parallel, returning the results in the approximate order of completion.

Has the following properties:

  1. Never creates more or less unblocked threads than are specified to live in the pool. NB: this count includes the thread executing parallelInterleaved. This should minimize contention and hence pre-emption, while also preventing starvation.
  2. On return all actions have been performed.
  3. The result of running actions appear in the list in undefined order, but which is likely to be very similar to the order of completion.
  4. The above properties are true even if parallelInterleaved is used by an action which is itself being executed by one of the parallel combinators.

If any of the IO actions throws an exception, undefined behaviour will result. If you want safety, wrap your actions in Control.Exception.try.

Pool management

data Pool Source

A thread pool, containing a maximum number of threads. The best way to construct one of these is using withPool.

withPool :: Int -> (Pool -> IO a) -> IO aSource

A safe wrapper around startPool and stopPool. Executes an IO action using a newly-created pool with the specified number of threads and cleans it up at the end.

startPool :: Int -> IO PoolSource

A slightly unsafe way to construct a pool. Make a pool from the maximum number of threads you wish it to execute (including the main thread in the count).

If you use this variant then ensure that you insert a call to stopPool somewhere in your program after all users of that pool have finished.

A better alternative is to see if you can use the withPool variant.

stopPool :: Pool -> IO ()Source

Clean up a thread pool. If you don't call this from the main thread then no one holds the queue, the queue gets GC'd, the threads find themselves blocked indefinitely, and you get exceptions.

This cleanly shuts down the threads so the queue isn't important and you don't get exceptions.

Only call this after all users of the pool have completed, or your program may block indefinitely.

extraWorkerWhileBlocked :: Pool -> IO a -> IO aSource

You should wrap any IO action used from your worker threads that may block with this method. It temporarily spawns another worker thread to make up for the loss of the old blocked worker.

This is particularly important if the unblocking is dependent on worker threads actually doing work. If you have this situation, and you don't use this method to wrap blocking actions, then you may get a deadlock if all your worker threads get blocked on work that they assume will be done by other worker threads.

An example where something goes wrong if you don't use this to wrap blocking actions is the following example:

 newEmptyMVar >>= \mvar -> parallel_ pool [readMVar mvar, putMVar mvar ()]

If we only have one thread, we will sometimes get a schedule where the readMVar action is run before the putMVar. Unless we wrap the read with extraWorkerWhileBlocked, if the pool has a single thread our program to deadlock, because the worker will become blocked and no other thread will be available to execute the putMVar.

The correct code is:

 newEmptyMVar >>= \mvar -> parallel_ pool [extraWorkerWhileBlocked pool (readMVar mvar), putMVar mvar ()]

Advanced pool management

spawnPoolWorkerFor :: Pool -> IO ()Source

Internal method for adding extra unblocked threads to a pool if one of the current worker threads is going to be temporarily blocked. Unrestricted use of this is unsafe, so we reccomend that you use the extraWorkerWhileBlocked function instead if possible.

killPoolWorkerFor :: Pool -> IO ()Source

Internal method for removing threads from a pool after one of the threads on the pool becomes newly unblocked. Unrestricted use of this is unsafe, so we reccomend that you use the extraWorkerWhileBlocked function instead if possible.