parallel-io-0.3.5: Combinators for executing IO actions in parallel on a thread pool.
Safe HaskellSafe-Inferred
LanguageHaskell98

Control.Concurrent.ParallelIO.Local

Description

Parallelism combinators with explicit thread-pool creation and passing.

The most basic example of usage is:

main = withPool 2 $ \pool -> parallel_ pool [putStrLn "Echo", putStrLn " in parallel"]

Make sure that you compile with -threaded and supply +RTS -N2 -RTS to the generated Haskell executable, or you won't get any parallelism.

If you plan to allow your worker items to block, then you should read the documentation for extraWorkerWhileBlocked.

The Control.Concurrent.ParallelIO.Global module is implemented on top of this one by maintaining a shared global thread pool with one thread per capability.

Synopsis

Executing actions

parallel_ :: Pool -> [IO a] -> IO () Source #

Run the list of computations in parallel.

Has the following properties:

  1. Never creates more or less unblocked threads than are specified to live in the pool. NB: this count includes the thread executing parallel_. This should minimize contention and hence pre-emption, while also preventing starvation.
  2. On return all actions have been performed.
  3. The function returns in a timely manner as soon as all actions have been performed.
  4. The above properties are true even if parallel_ is used by an action which is itself being executed by one of the parallel combinators.
  5. If any of the IO actions throws an exception this does not prevent any of the other actions from being performed.
  6. If any of the IO actions throws an exception, the exception thrown by the first failing action in the input list will be thrown by parallel_. Importantly, at the time the exception is thrown there is no guarantee that the other parallel actions have completed.

The motivation for this choice is that waiting for the all threads to either return or throw before throwing the first exception will almost always cause GHC to show the "Blocked indefinitely in MVar operation" exception rather than the exception you care about.

The reason for this behaviour can be seen by considering this machine state:

  1. The main thread has used the parallel combinators to spawn two threads, thread 1 and thread 2. It is blocked on both of them waiting for them to return either a result or an exception via an MVar.
  2. Thread 1 and thread 2 share another (empty) MVar, the "wait handle". Thread 2 is waiting on the handle, while thread 2 will eventually put into the handle.

Consider what happens when thread 1 is buggy and throws an exception before putting into the handle. Now thread 2 is blocked indefinitely, and so the main thread is also blocked indefinetly waiting for the result of thread 2. GHC has no choice but to throw the uninformative exception. However, what we really wanted to see was the original exception thrown in thread 1!

By having the main thread abandon its wait for the results of the spawned threads as soon as the first exception comes in, we give this exception a chance to actually be displayed.

parallelE_ :: Pool -> [IO a] -> IO [Maybe SomeException] Source #

As parallel_, but instead of throwing exceptions that are thrown by subcomputations, they are returned in a data structure.

As a result, property 6 of parallel_ is not preserved, and therefore if your IO actions can depend on each other and may throw exceptions your program may die with "blocked indefinitely" exceptions rather than informative messages.

parallel :: Pool -> [IO a] -> IO [a] Source #

Run the list of computations in parallel, returning the results in the same order as the corresponding actions.

Has the following properties:

  1. Never creates more or less unblocked threads than are specified to live in the pool. NB: this count includes the thread executing parallel. This should minimize contention and hence pre-emption, while also preventing starvation.
  2. On return all actions have been performed.
  3. The function returns in a timely manner as soon as all actions have been performed.
  4. The above properties are true even if parallel is used by an action which is itself being executed by one of the parallel combinators.
  5. If any of the IO actions throws an exception this does not prevent any of the other actions from being performed.
  6. If any of the IO actions throws an exception, the exception thrown by the first failing action in the input list will be thrown by parallel. Importantly, at the time the exception is thrown there is no guarantee that the other parallel actions have completed.

The motivation for this choice is that waiting for the all threads to either return or throw before throwing the first exception will almost always cause GHC to show the "Blocked indefinitely in MVar operation" exception rather than the exception you care about.

The reason for this behaviour can be seen by considering this machine state:

  1. The main thread has used the parallel combinators to spawn two threads, thread 1 and thread 2. It is blocked on both of them waiting for them to return either a result or an exception via an MVar.
  2. Thread 1 and thread 2 share another (empty) MVar, the "wait handle". Thread 2 is waiting on the handle, while thread 2 will eventually put into the handle.

Consider what happens when thread 1 is buggy and throws an exception before putting into the handle. Now thread 2 is blocked indefinitely, and so the main thread is also blocked indefinetly waiting for the result of thread 2. GHC has no choice but to throw the uninformative exception. However, what we really wanted to see was the original exception thrown in thread 1!

By having the main thread abandon its wait for the results of the spawned threads as soon as the first exception comes in, we give this exception a chance to actually be displayed.

parallelE :: Pool -> [IO a] -> IO [Either SomeException a] Source #

As parallel, but instead of throwing exceptions that are thrown by subcomputations, they are returned in a data structure.

As a result, property 6 of parallel is not preserved, and therefore if your IO actions can depend on each other and may throw exceptions your program may die with "blocked indefinitely" exceptions rather than informative messages.

parallelInterleaved :: Pool -> [IO a] -> IO [a] Source #

Run the list of computations in parallel, returning the results in the approximate order of completion.

Has the following properties:

  1. Never creates more or less unblocked threads than are specified to live in the pool. NB: this count includes the thread executing parallelInterleaved. This should minimize contention and hence pre-emption, while also preventing starvation.
  2. On return all actions have been performed.
  3. The result of running actions appear in the list in undefined order, but which is likely to be very similar to the order of completion.
  4. The above properties are true even if parallelInterleaved is used by an action which is itself being executed by one of the parallel combinators.
  5. If any of the IO actions throws an exception this does not prevent any of the other actions from being performed.
  6. If any of the IO actions throws an exception, the exception thrown by the first failing action in the input list will be thrown by parallelInterleaved. Importantly, at the time the exception is thrown there is no guarantee that the other parallel actions have completed.

The motivation for this choice is that waiting for the all threads to either return or throw before throwing the first exception will almost always cause GHC to show the "Blocked indefinitely in MVar operation" exception rather than the exception you care about.

The reason for this behaviour can be seen by considering this machine state:

  1. The main thread has used the parallel combinators to spawn two threads, thread 1 and thread 2. It is blocked on both of them waiting for them to return either a result or an exception via an MVar.
  2. Thread 1 and thread 2 share another (empty) MVar, the "wait handle". Thread 2 is waiting on the handle, while thread 1 will eventually put into the handle.

Consider what happens when thread 1 is buggy and throws an exception before putting into the handle. Now thread 2 is blocked indefinitely, and so the main thread is also blocked indefinetly waiting for the result of thread 2. GHC has no choice but to throw the uninformative exception. However, what we really wanted to see was the original exception thrown in thread 1!

By having the main thread abandon its wait for the results of the spawned threads as soon as the first exception comes in, we give this exception a chance to actually be displayed.

parallelInterleavedE :: Pool -> [IO a] -> IO [Either SomeException a] Source #

As parallelInterleaved, but instead of throwing exceptions that are thrown by subcomputations, they are returned in a data structure.

As a result, property 6 of parallelInterleaved is not preserved, and therefore if your IO actions can depend on each other and may throw exceptions your program may die with "blocked indefinitely" exceptions rather than informative messages.

parallelFirst :: Pool -> [IO (Maybe a)] -> IO (Maybe a) Source #

Run the list of computations in parallel, returning the result of the first thread that completes with (Just x), if any

Has the following properties:

  1. Never creates more or less unblocked threads than are specified to live in the pool. NB: this count includes the thread executing parallelInterleaved. This should minimize contention and hence pre-emption, while also preventing starvation.
  2. On return all actions have either been performed or cancelled (with ThreadKilled exceptions).
  3. The above properties are true even if parallelFirst is used by an action which is itself being executed by one of the parallel combinators.
  4. If any of the IO actions throws an exception, the exception thrown by the first throwing action in the input list will be thrown by parallelFirst. Importantly, at the time the exception is thrown there is no guarantee that the other parallel actions have been completed or cancelled.

The motivation for this choice is that waiting for the all threads to either return or throw before throwing the first exception will almost always cause GHC to show the "Blocked indefinitely in MVar operation" exception rather than the exception you care about.

The reason for this behaviour can be seen by considering this machine state:

  1. The main thread has used the parallel combinators to spawn two threads, thread 1 and thread 2. It is blocked on both of them waiting for them to return either a result or an exception via an MVar.
  2. Thread 1 and thread 2 share another (empty) MVar, the "wait handle". Thread 2 is waiting on the handle, while thread 1 will eventually put into the handle.

Consider what happens when thread 1 is buggy and throws an exception before putting into the handle. Now thread 2 is blocked indefinitely, and so the main thread is also blocked indefinetly waiting for the result of thread 2. GHC has no choice but to throw the uninformative exception. However, what we really wanted to see was the original exception thrown in thread 1!

By having the main thread abandon its wait for the results of the spawned threads as soon as the first exception comes in, we give this exception a chance to actually be displayed.

parallelFirstE :: Pool -> [IO (Maybe a)] -> IO (Maybe (Either SomeException a)) Source #

As parallelFirst, but instead of throwing exceptions that are thrown by subcomputations, they are returned in a data structure.

As a result, property 4 of parallelFirst is not preserved, and therefore if your IO actions can depend on each other and may throw exceptions your program may die with "blocked indefinitely" exceptions rather than informative messages.

Pool management

data Pool Source #

A thread pool, containing a maximum number of threads. The best way to construct one of these is using withPool.

withPool :: Int -> (Pool -> IO a) -> IO a Source #

A safe wrapper around startPool and stopPool. Executes an IO action using a newly-created pool with the specified number of threads and cleans it up at the end.

startPool :: Int -> IO Pool Source #

A slightly unsafe way to construct a pool. Make a pool from the maximum number of threads you wish it to execute (including the main thread in the count).

If you use this variant then ensure that you insert a call to stopPool somewhere in your program after all users of that pool have finished.

A better alternative is to see if you can use the withPool variant.

stopPool :: Pool -> IO () Source #

Clean up a thread pool. If you don't call this from the main thread then no one holds the queue, the queue gets GC'd, the threads find themselves blocked indefinitely, and you get exceptions.

This cleanly shuts down the threads so the queue isn't important and you don't get exceptions.

Only call this after all users of the pool have completed, or your program may block indefinitely.

extraWorkerWhileBlocked :: Pool -> IO a -> IO a Source #

You should wrap any IO action used from your worker threads that may block with this method. It temporarily spawns another worker thread to make up for the loss of the old blocked worker.

This is particularly important if the unblocking is dependent on worker threads actually doing work. If you have this situation, and you don't use this method to wrap blocking actions, then you may get a deadlock if all your worker threads get blocked on work that they assume will be done by other worker threads.

An example where something goes wrong if you don't use this to wrap blocking actions is the following example:

newEmptyMVar >>= \mvar -> parallel_ pool [readMVar mvar, putMVar mvar ()]

If we only have one thread, we will sometimes get a schedule where the readMVar action is run before the putMVar. Unless we wrap the read with extraWorkerWhileBlocked, if the pool has a single thread our program to deadlock, because the worker will become blocked and no other thread will be available to execute the putMVar.

The correct code is:

newEmptyMVar >>= \mvar -> parallel_ pool [extraWorkerWhileBlocked pool (readMVar mvar), putMVar mvar ()]

Advanced pool management

spawnPoolWorkerFor :: Pool -> IO () Source #

Internal method for adding extra unblocked threads to a pool if one of the current worker threads is going to be temporarily blocked. Unrestricted use of this is unsafe, so we recommend that you use the extraWorkerWhileBlocked function instead if possible.

killPoolWorkerFor :: Pool -> IO () Source #

Internal method for removing threads from a pool after one of the threads on the pool becomes newly unblocked. Unrestricted use of this is unsafe, so we reccomend that you use the extraWorkerWhileBlocked function instead if possible.