{-# LANGUAGE ScopedTypeVariables #-}
-- | Parallelism combinators with explicit thread-pool creation and
-- passing.
--
-- The most basic example of usage is:
--
-- > main = withPool 2 $ \pool -> parallel_ pool [putStrLn "Echo", putStrLn " in parallel"]
--
-- Make sure that you compile with @-threaded@ and supply @+RTS -N2 -RTS@
-- to  the generated Haskell executable, or you won't get any parallelism.
--
-- If you plan to allow your worker items to block, then you should read the documentation for 'extraWorkerWhileBlocked'.
--
-- The "Control.Concurrent.ParallelIO.Global" module is implemented
-- on top of this one by maintaining a shared global thread pool
-- with one thread per capability.
module Control.Concurrent.ParallelIO.Local (
    -- * Executing actions
    parallel_, parallelE_, parallel, parallelE,
    parallelInterleaved, parallelInterleavedE,
    parallelFirst, parallelFirstE,

    -- * Pool management
    Pool, withPool, startPool, stopPool,
    extraWorkerWhileBlocked,
    
    -- * Advanced pool management
    spawnPoolWorkerFor, killPoolWorkerFor
  ) where

import Control.Concurrent.ParallelIO.Compat

import Control.Concurrent
import Control.Exception
import qualified Control.Exception as E
import Control.Monad

import System.IO


catchNonThreadKilled :: IO a -> (SomeException -> IO a) -> IO a
catchNonThreadKilled :: IO a -> (SomeException -> IO a) -> IO a
catchNonThreadKilled IO a
act SomeException -> IO a
handler = IO a
act IO a -> (SomeException -> IO a) -> IO a
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`E.catch` \SomeException
e -> case SomeException -> Maybe AsyncException
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
e of Just AsyncException
ThreadKilled -> SomeException -> IO a
forall e a. Exception e => e -> IO a
throwIO SomeException
e; Maybe AsyncException
_ -> SomeException -> IO a
handler SomeException
e

onNonThreadKilledException :: IO a -> IO b -> IO a
onNonThreadKilledException :: IO a -> IO b -> IO a
onNonThreadKilledException IO a
act IO b
handler = IO a -> (SomeException -> IO a) -> IO a
forall a. IO a -> (SomeException -> IO a) -> IO a
catchNonThreadKilled IO a
act (\SomeException
e -> IO b
handler IO b -> IO a -> IO a
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> SomeException -> IO a
forall e a. Exception e => e -> IO a
throwIO SomeException
e)

reflectExceptionsTo :: ThreadId -> IO () -> IO ()
reflectExceptionsTo :: ThreadId -> IO () -> IO ()
reflectExceptionsTo ThreadId
tid IO ()
act = IO () -> (SomeException -> IO ()) -> IO ()
forall a. IO a -> (SomeException -> IO a) -> IO a
catchNonThreadKilled IO ()
act (ThreadId -> SomeException -> IO ()
forall e. Exception e => ThreadId -> e -> IO ()
throwTo ThreadId
tid)


-- | A thread pool, containing a maximum number of threads. The best way to
-- construct one of these is using 'withPool'.
data Pool = Pool {
    Pool -> Int
pool_threadcount :: Int,
    Pool -> QSem
pool_sem :: QSem
  }

-- | A slightly unsafe way to construct a pool. Make a pool from the maximum
-- number of threads you wish it to execute (including the main thread
-- in the count).
-- 
-- If you use this variant then ensure that you insert a call to 'stopPool'
-- somewhere in your program after all users of that pool have finished.
--
-- A better alternative is to see if you can use the 'withPool' variant.
startPool :: Int -> IO Pool
startPool :: Int -> IO Pool
startPool Int
threadcount
  | Int
threadcount Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
1 = [Char] -> IO Pool
forall a. HasCallStack => [Char] -> a
error ([Char] -> IO Pool) -> [Char] -> IO Pool
forall a b. (a -> b) -> a -> b
$ [Char]
"startPool: thread count must be strictly positive (was " [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ Int -> [Char]
forall a. Show a => a -> [Char]
show Int
threadcount [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ [Char]
")"
  | Bool
otherwise = (QSem -> Pool) -> IO QSem -> IO Pool
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Int -> QSem -> Pool
Pool Int
threadcount) (IO QSem -> IO Pool) -> IO QSem -> IO Pool
forall a b. (a -> b) -> a -> b
$ Int -> IO QSem
newQSem (Int
threadcount Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1)

-- | Clean up a thread pool. If you don't call this from the main thread then no one holds the queue,
-- the queue gets GC'd, the threads find themselves blocked indefinitely, and you get exceptions.
-- 
-- This cleanly shuts down the threads so the queue isn't important and you don't get
-- exceptions.
--
-- Only call this /after/ all users of the pool have completed, or your program may
-- block indefinitely.
stopPool :: Pool -> IO ()
stopPool :: Pool -> IO ()
stopPool Pool
pool = Int -> IO () -> IO ()
forall (m :: * -> *) a. Applicative m => Int -> m a -> m ()
replicateM_ (Pool -> Int
pool_threadcount Pool
pool Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Pool -> IO ()
killPoolWorkerFor Pool
pool

-- | A safe wrapper around 'startPool' and 'stopPool'. Executes an 'IO' action using a newly-created
-- pool with the specified number of threads and cleans it up at the end.
withPool :: Int -> (Pool -> IO a) -> IO a
withPool :: Int -> (Pool -> IO a) -> IO a
withPool Int
threadcount = IO Pool -> (Pool -> IO ()) -> (Pool -> IO a) -> IO a
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket (Int -> IO Pool
startPool Int
threadcount) Pool -> IO ()
stopPool


-- | You should wrap any IO action used from your worker threads that may block with this method.
-- It temporarily spawns another worker thread to make up for the loss of the old blocked
-- worker.
--
-- This is particularly important if the unblocking is dependent on worker threads actually doing
-- work. If you have this situation, and you don't use this method to wrap blocking actions, then
-- you may get a deadlock if all your worker threads get blocked on work that they assume will be
-- done by other worker threads.
--
-- An example where something goes wrong if you don't use this to wrap blocking actions is the following example:
--
-- > newEmptyMVar >>= \mvar -> parallel_ pool [readMVar mvar, putMVar mvar ()]
--
-- If we only have one thread, we will sometimes get a schedule where the 'readMVar' action is run
-- before the 'putMVar'. Unless we wrap the read with 'extraWorkerWhileBlocked', if the pool has a
-- single thread our program to deadlock, because the worker will become blocked and no other thread
-- will be available to execute the 'putMVar'.
--
-- The correct code is:
--
-- > newEmptyMVar >>= \mvar -> parallel_ pool [extraWorkerWhileBlocked pool (readMVar mvar), putMVar mvar ()]
extraWorkerWhileBlocked :: Pool -> IO a -> IO a
extraWorkerWhileBlocked :: Pool -> IO a -> IO a
extraWorkerWhileBlocked Pool
pool = IO () -> IO () -> IO a -> IO a
forall a b c. IO a -> IO b -> IO c -> IO c
bracket_ (Pool -> IO ()
spawnPoolWorkerFor Pool
pool) (Pool -> IO ()
killPoolWorkerFor Pool
pool)

-- | Internal method for adding extra unblocked threads to a pool if one of the current
-- worker threads is going to be temporarily blocked. Unrestricted use of this is unsafe,
-- so we recommend that you use the 'extraWorkerWhileBlocked' function instead if possible.
spawnPoolWorkerFor :: Pool -> IO ()
spawnPoolWorkerFor :: Pool -> IO ()
spawnPoolWorkerFor Pool
pool = QSem -> IO ()
signalQSem (Pool -> QSem
pool_sem Pool
pool)

-- | Internal method for removing threads from a pool after one of the threads on the pool
-- becomes newly unblocked. Unrestricted use of this is unsafe, so we reccomend that you use
-- the 'extraWorkerWhileBlocked' function instead if possible.
killPoolWorkerFor :: Pool -> IO ()
killPoolWorkerFor :: Pool -> IO ()
killPoolWorkerFor Pool
pool = QSem -> IO ()
waitQSem (Pool -> QSem
pool_sem Pool
pool)


-- | Run the list of computations in parallel.
--
-- Has the following properties:
--
--  1. Never creates more or less unblocked threads than are specified to
--     live in the pool. NB: this count includes the thread executing 'parallel_'.
--     This should minimize contention and hence pre-emption, while also preventing
--     starvation.
--
--  2. On return all actions have been performed.
--
--  3. The function returns in a timely manner as soon as all actions have
--     been performed.
--
--  4. The above properties are true even if 'parallel_' is used by an
--     action which is itself being executed by one of the parallel combinators.
--
--  5. If any of the IO actions throws an exception this does not prevent any of the
--     other actions from being performed.
--
--  6. If any of the IO actions throws an exception, the exception thrown by the first
--     failing action in the input list will be thrown by 'parallel_'. Importantly, at the
--     time the exception is thrown there is no guarantee that the other parallel actions
--     have completed.
--
--     The motivation for this choice is that waiting for the all threads to either return
--     or throw before throwing the first exception will almost always cause GHC to show the
--     "Blocked indefinitely in MVar operation" exception rather than the exception you care about.
--
--     The reason for this behaviour can be seen by considering this machine state:
--
--       1. The main thread has used the parallel combinators to spawn two threads, thread 1 and thread 2.
--          It is blocked on both of them waiting for them to return either a result or an exception via an MVar.
--
--       2. Thread 1 and thread 2 share another (empty) MVar, the "wait handle". Thread 2 is waiting on the handle,
--          while thread 2 will eventually put into the handle.
--     
--     Consider what happens when thread 1 is buggy and throws an exception before putting into the handle. Now
--     thread 2 is blocked indefinitely, and so the main thread is also blocked indefinetly waiting for the result
--     of thread 2. GHC has no choice but to throw the uninformative exception. However, what we really wanted to
--     see was the original exception thrown in thread 1!
--
--     By having the main thread abandon its wait for the results of the spawned threads as soon as the first exception
--     comes in, we give this exception a chance to actually be displayed.
parallel_ :: Pool -> [IO a] -> IO ()
parallel_ :: Pool -> [IO a] -> IO ()
parallel_ Pool
pool [IO a]
xs = Pool -> [IO a] -> IO [a]
forall a. Pool -> [IO a] -> IO [a]
parallel Pool
pool [IO a]
xs IO [a] -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()

-- | As 'parallel_', but instead of throwing exceptions that are thrown by subcomputations,
-- they are returned in a data structure.
--
-- As a result, property 6 of 'parallel_' is not preserved, and therefore if your IO actions can depend on each other
-- and may throw exceptions your program may die with "blocked indefinitely" exceptions rather than informative messages.
parallelE_ :: Pool -> [IO a] -> IO [Maybe SomeException]
parallelE_ :: Pool -> [IO a] -> IO [Maybe SomeException]
parallelE_ Pool
pool = ([Either SomeException a] -> [Maybe SomeException])
-> IO [Either SomeException a] -> IO [Maybe SomeException]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((Either SomeException a -> Maybe SomeException)
-> [Either SomeException a] -> [Maybe SomeException]
forall a b. (a -> b) -> [a] -> [b]
map ((SomeException -> Maybe SomeException)
-> (a -> Maybe SomeException)
-> Either SomeException a
-> Maybe SomeException
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either SomeException -> Maybe SomeException
forall a. a -> Maybe a
Just (\a
_ -> Maybe SomeException
forall a. Maybe a
Nothing))) (IO [Either SomeException a] -> IO [Maybe SomeException])
-> ([IO a] -> IO [Either SomeException a])
-> [IO a]
-> IO [Maybe SomeException]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Pool -> [IO a] -> IO [Either SomeException a]
forall a. Pool -> [IO a] -> IO [Either SomeException a]
parallelE Pool
pool

-- | Run the list of computations in parallel, returning the results in the
-- same order as the corresponding actions.
--
-- Has the following properties:
--
--  1. Never creates more or less unblocked threads than are specified to
--     live in the pool. NB: this count includes the thread executing 'parallel'.
--     This should minimize contention and hence pre-emption, while also preventing
--     starvation.
--
--  2. On return all actions have been performed.
--
--  3. The function returns in a timely manner as soon as all actions have
--     been performed.
--
--  4. The above properties are true even if 'parallel' is used by an
--     action which is itself being executed by one of the parallel combinators.
--
--  5. If any of the IO actions throws an exception this does not prevent any of the
--     other actions from being performed.
--
--  6. If any of the IO actions throws an exception, the exception thrown by the first
--     failing action in the input list will be thrown by 'parallel'. Importantly, at the
--     time the exception is thrown there is no guarantee that the other parallel actions
--     have completed.
--
--     The motivation for this choice is that waiting for the all threads to either return
--     or throw before throwing the first exception will almost always cause GHC to show the
--     "Blocked indefinitely in MVar operation" exception rather than the exception you care about.
--
--     The reason for this behaviour can be seen by considering this machine state:
--
--       1. The main thread has used the parallel combinators to spawn two threads, thread 1 and thread 2.
--          It is blocked on both of them waiting for them to return either a result or an exception via an MVar.
--
--       2. Thread 1 and thread 2 share another (empty) MVar, the "wait handle". Thread 2 is waiting on the handle,
--          while thread 2 will eventually put into the handle.
--     
--     Consider what happens when thread 1 is buggy and throws an exception before putting into the handle. Now
--     thread 2 is blocked indefinitely, and so the main thread is also blocked indefinetly waiting for the result
--     of thread 2. GHC has no choice but to throw the uninformative exception. However, what we really wanted to
--     see was the original exception thrown in thread 1!
--
--     By having the main thread abandon its wait for the results of the spawned threads as soon as the first exception
--     comes in, we give this exception a chance to actually be displayed.
parallel :: Pool -> [IO a] -> IO [a]
parallel :: Pool -> [IO a] -> IO [a]
parallel Pool
pool [IO a]
acts = ((forall a. IO a -> IO a) -> IO [a]) -> IO [a]
forall b. ((forall a. IO a -> IO a) -> IO b) -> IO b
mask (((forall a. IO a -> IO a) -> IO [a]) -> IO [a])
-> ((forall a. IO a -> IO a) -> IO [a]) -> IO [a]
forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
restore -> do
    ThreadId
main_tid <- IO ThreadId
myThreadId
    [MVar a]
resultvars <- [IO a] -> (IO a -> IO (MVar a)) -> IO [MVar a]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [IO a]
acts ((IO a -> IO (MVar a)) -> IO [MVar a])
-> (IO a -> IO (MVar a)) -> IO [MVar a]
forall a b. (a -> b) -> a -> b
$ \IO a
act -> do
        MVar a
resultvar <- IO (MVar a)
forall a. IO (MVar a)
newEmptyMVar
        ThreadId
_tid <- IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ IO () -> IO () -> IO () -> IO ()
forall a b c. IO a -> IO b -> IO c -> IO c
bracket_ (Pool -> IO ()
killPoolWorkerFor Pool
pool) (Pool -> IO ()
spawnPoolWorkerFor Pool
pool) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ ThreadId -> IO () -> IO ()
reflectExceptionsTo ThreadId
main_tid (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
            a
res <- IO a -> IO a
forall a. IO a -> IO a
restore IO a
act
            -- Use tryPutMVar instead of putMVar so we get an exception if my brain has failed
            Bool
True <- MVar a -> a -> IO Bool
forall a. MVar a -> a -> IO Bool
tryPutMVar MVar a
resultvar a
res
            () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
        MVar a -> IO (MVar a)
forall (m :: * -> *) a. Monad m => a -> m a
return MVar a
resultvar
    Pool -> IO [a] -> IO [a]
forall a. Pool -> IO a -> IO a
extraWorkerWhileBlocked Pool
pool ((MVar a -> IO a) -> [MVar a] -> IO [a]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
mapM MVar a -> IO a
forall a. MVar a -> IO a
takeMVar [MVar a]
resultvars)

-- | As 'parallel', but instead of throwing exceptions that are thrown by subcomputations,
-- they are returned in a data structure.
--
-- As a result, property 6 of 'parallel' is not preserved, and therefore if your IO actions can depend on each other
-- and may throw exceptions your program may die with "blocked indefinitely" exceptions rather than informative messages.
parallelE :: Pool -> [IO a] -> IO [Either SomeException a]
parallelE :: Pool -> [IO a] -> IO [Either SomeException a]
parallelE Pool
pool [IO a]
acts = ((forall a. IO a -> IO a) -> IO [Either SomeException a])
-> IO [Either SomeException a]
forall b. ((forall a. IO a -> IO a) -> IO b) -> IO b
mask (((forall a. IO a -> IO a) -> IO [Either SomeException a])
 -> IO [Either SomeException a])
-> ((forall a. IO a -> IO a) -> IO [Either SomeException a])
-> IO [Either SomeException a]
forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
restore -> do
    [MVar (Either SomeException a)]
resultvars <- [IO a]
-> (IO a -> IO (MVar (Either SomeException a)))
-> IO [MVar (Either SomeException a)]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [IO a]
acts ((IO a -> IO (MVar (Either SomeException a)))
 -> IO [MVar (Either SomeException a)])
-> (IO a -> IO (MVar (Either SomeException a)))
-> IO [MVar (Either SomeException a)]
forall a b. (a -> b) -> a -> b
$ \IO a
act -> do
        MVar (Either SomeException a)
resultvar <- IO (MVar (Either SomeException a))
forall a. IO (MVar a)
newEmptyMVar
        ThreadId
_tid <- IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ IO () -> IO () -> IO () -> IO ()
forall a b c. IO a -> IO b -> IO c -> IO c
bracket_ (Pool -> IO ()
killPoolWorkerFor Pool
pool) (Pool -> IO ()
spawnPoolWorkerFor Pool
pool) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
            Either SomeException a
ei_e_res <- IO a -> IO (Either SomeException a)
forall e a. Exception e => IO a -> IO (Either e a)
try (IO a -> IO a
forall a. IO a -> IO a
restore IO a
act)
            -- Use tryPutMVar instead of putMVar so we get an exception if my brain has failed
            Bool
True <- MVar (Either SomeException a) -> Either SomeException a -> IO Bool
forall a. MVar a -> a -> IO Bool
tryPutMVar MVar (Either SomeException a)
resultvar Either SomeException a
ei_e_res
            () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
        MVar (Either SomeException a) -> IO (MVar (Either SomeException a))
forall (m :: * -> *) a. Monad m => a -> m a
return MVar (Either SomeException a)
resultvar
    Pool -> IO [Either SomeException a] -> IO [Either SomeException a]
forall a. Pool -> IO a -> IO a
extraWorkerWhileBlocked Pool
pool ((MVar (Either SomeException a) -> IO (Either SomeException a))
-> [MVar (Either SomeException a)] -> IO [Either SomeException a]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
mapM MVar (Either SomeException a) -> IO (Either SomeException a)
forall a. MVar a -> IO a
takeMVar [MVar (Either SomeException a)]
resultvars)

-- | Run the list of computations in parallel, returning the results in the
-- approximate order of completion.
--
-- Has the following properties:
--
--  1. Never creates more or less unblocked threads than are specified to
--     live in the pool. NB: this count includes the thread executing 'parallelInterleaved'.
--     This should minimize contention and hence pre-emption, while also preventing
--     starvation.
--
--  2. On return all actions have been performed.
--
--  3. The result of running actions appear in the list in undefined order, but which
--     is likely to be very similar to the order of completion.
--
--  4. The above properties are true even if 'parallelInterleaved' is used by an
--     action which is itself being executed by one of the parallel combinators.
--
--  5. If any of the IO actions throws an exception this does not prevent any of the
--     other actions from being performed.
--
--  6. If any of the IO actions throws an exception, the exception thrown by the first
--     failing action in the input list will be thrown by 'parallelInterleaved'. Importantly, at the
--     time the exception is thrown there is no guarantee that the other parallel actions
--     have completed.
--
--     The motivation for this choice is that waiting for the all threads to either return
--     or throw before throwing the first exception will almost always cause GHC to show the
--     "Blocked indefinitely in MVar operation" exception rather than the exception you care about.
--
--     The reason for this behaviour can be seen by considering this machine state:
--
--       1. The main thread has used the parallel combinators to spawn two threads, thread 1 and thread 2.
--          It is blocked on both of them waiting for them to return either a result or an exception via an MVar.
--
--       2. Thread 1 and thread 2 share another (empty) MVar, the "wait handle". Thread 2 is waiting on the handle,
--          while thread 1 will eventually put into the handle.
--     
--     Consider what happens when thread 1 is buggy and throws an exception before putting into the handle. Now
--     thread 2 is blocked indefinitely, and so the main thread is also blocked indefinetly waiting for the result
--     of thread 2. GHC has no choice but to throw the uninformative exception. However, what we really wanted to
--     see was the original exception thrown in thread 1!
--
--     By having the main thread abandon its wait for the results of the spawned threads as soon as the first exception
--     comes in, we give this exception a chance to actually be displayed.
parallelInterleaved :: Pool -> [IO a] -> IO [a]
parallelInterleaved :: Pool -> [IO a] -> IO [a]
parallelInterleaved Pool
pool [IO a]
acts = ((forall a. IO a -> IO a) -> IO [a]) -> IO [a]
forall b. ((forall a. IO a -> IO a) -> IO b) -> IO b
mask (((forall a. IO a -> IO a) -> IO [a]) -> IO [a])
-> ((forall a. IO a -> IO a) -> IO [a]) -> IO [a]
forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
restore -> do
    ThreadId
main_tid <- IO ThreadId
myThreadId
    Chan a
resultchan <- IO (Chan a)
forall a. IO (Chan a)
newChan
    [IO a] -> (IO a -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [IO a]
acts ((IO a -> IO ()) -> IO ()) -> (IO a -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \IO a
act -> do
        ThreadId
_tid <- IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ IO () -> IO () -> IO () -> IO ()
forall a b c. IO a -> IO b -> IO c -> IO c
bracket_ (Pool -> IO ()
killPoolWorkerFor Pool
pool) (Pool -> IO ()
spawnPoolWorkerFor Pool
pool) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ ThreadId -> IO () -> IO ()
reflectExceptionsTo ThreadId
main_tid (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
            a
res <- IO a -> IO a
forall a. IO a -> IO a
restore IO a
act
            Chan a -> a -> IO ()
forall a. Chan a -> a -> IO ()
writeChan Chan a
resultchan a
res
        () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
    Pool -> IO [a] -> IO [a]
forall a. Pool -> IO a -> IO a
extraWorkerWhileBlocked Pool
pool ((IO a -> IO a) -> [IO a] -> IO [a]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
mapM (\IO a
_act -> Chan a -> IO a
forall a. Chan a -> IO a
readChan Chan a
resultchan) [IO a]
acts)

-- | As 'parallelInterleaved', but instead of throwing exceptions that are thrown by subcomputations,
-- they are returned in a data structure.
--
-- As a result, property 6 of 'parallelInterleaved' is not preserved, and therefore if your IO actions can depend on each other
-- and may throw exceptions your program may die with "blocked indefinitely" exceptions rather than informative messages.
parallelInterleavedE :: Pool -> [IO a] -> IO [Either SomeException a]
parallelInterleavedE :: Pool -> [IO a] -> IO [Either SomeException a]
parallelInterleavedE Pool
pool [IO a]
acts = ((forall a. IO a -> IO a) -> IO [Either SomeException a])
-> IO [Either SomeException a]
forall b. ((forall a. IO a -> IO a) -> IO b) -> IO b
mask (((forall a. IO a -> IO a) -> IO [Either SomeException a])
 -> IO [Either SomeException a])
-> ((forall a. IO a -> IO a) -> IO [Either SomeException a])
-> IO [Either SomeException a]
forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
restore -> do
    Chan (Either SomeException a)
resultchan <- IO (Chan (Either SomeException a))
forall a. IO (Chan a)
newChan
    [IO a] -> (IO a -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [IO a]
acts ((IO a -> IO ()) -> IO ()) -> (IO a -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \IO a
act -> do
        ThreadId
_tid <- IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ IO () -> IO () -> IO () -> IO ()
forall a b c. IO a -> IO b -> IO c -> IO c
bracket_ (Pool -> IO ()
killPoolWorkerFor Pool
pool) (Pool -> IO ()
spawnPoolWorkerFor Pool
pool) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
            Either SomeException a
ei_e_res <- IO a -> IO (Either SomeException a)
forall e a. Exception e => IO a -> IO (Either e a)
try (IO a -> IO a
forall a. IO a -> IO a
restore IO a
act)
            Chan (Either SomeException a) -> Either SomeException a -> IO ()
forall a. Chan a -> a -> IO ()
writeChan Chan (Either SomeException a)
resultchan Either SomeException a
ei_e_res
        () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
    Pool -> IO [Either SomeException a] -> IO [Either SomeException a]
forall a. Pool -> IO a -> IO a
extraWorkerWhileBlocked Pool
pool ((IO a -> IO (Either SomeException a))
-> [IO a] -> IO [Either SomeException a]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
mapM (\IO a
_act -> Chan (Either SomeException a) -> IO (Either SomeException a)
forall a. Chan a -> IO a
readChan Chan (Either SomeException a)
resultchan) [IO a]
acts)

-- | Run the list of computations in parallel, returning the result of the first
-- thread that completes with (Just x), if any
--
-- Has the following properties:
--
--  1. Never creates more or less unblocked threads than are specified to
--     live in the pool. NB: this count includes the thread executing 'parallelInterleaved'.
--     This should minimize contention and hence pre-emption, while also preventing
--     starvation.
--
--  2. On return all actions have either been performed or cancelled (with ThreadKilled exceptions).
--
--  3. The above properties are true even if 'parallelFirst' is used by an
--     action which is itself being executed by one of the parallel combinators.
--
--  4. If any of the IO actions throws an exception, the exception thrown by the first
--     throwing action in the input list will be thrown by 'parallelFirst'. Importantly, at the
--     time the exception is thrown there is no guarantee that the other parallel actions
--     have been completed or cancelled.
--
--     The motivation for this choice is that waiting for the all threads to either return
--     or throw before throwing the first exception will almost always cause GHC to show the
--     "Blocked indefinitely in MVar operation" exception rather than the exception you care about.
--
--     The reason for this behaviour can be seen by considering this machine state:
--
--       1. The main thread has used the parallel combinators to spawn two threads, thread 1 and thread 2.
--          It is blocked on both of them waiting for them to return either a result or an exception via an MVar.
--
--       2. Thread 1 and thread 2 share another (empty) MVar, the "wait handle". Thread 2 is waiting on the handle,
--          while thread 1 will eventually put into the handle.
--     
--     Consider what happens when thread 1 is buggy and throws an exception before putting into the handle. Now
--     thread 2 is blocked indefinitely, and so the main thread is also blocked indefinetly waiting for the result
--     of thread 2. GHC has no choice but to throw the uninformative exception. However, what we really wanted to
--     see was the original exception thrown in thread 1!
--
--     By having the main thread abandon its wait for the results of the spawned threads as soon as the first exception
--     comes in, we give this exception a chance to actually be displayed.
parallelFirst :: Pool -> [IO (Maybe a)] -> IO (Maybe a)
parallelFirst :: Pool -> [IO (Maybe a)] -> IO (Maybe a)
parallelFirst Pool
pool [IO (Maybe a)]
acts = ((forall a. IO a -> IO a) -> IO (Maybe a)) -> IO (Maybe a)
forall b. ((forall a. IO a -> IO a) -> IO b) -> IO b
mask (((forall a. IO a -> IO a) -> IO (Maybe a)) -> IO (Maybe a))
-> ((forall a. IO a -> IO a) -> IO (Maybe a)) -> IO (Maybe a)
forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
restore -> do
    ThreadId
main_tid <- IO ThreadId
myThreadId
    MVar (Maybe a)
resultvar <- IO (MVar (Maybe a))
forall a. IO (MVar a)
newEmptyMVar
    ([ThreadId]
tids, [MVar ()]
waits) <- ([(ThreadId, MVar ())] -> ([ThreadId], [MVar ()]))
-> IO [(ThreadId, MVar ())] -> IO ([ThreadId], [MVar ()])
forall (m :: * -> *) a1 r. Monad m => (a1 -> r) -> m a1 -> m r
liftM [(ThreadId, MVar ())] -> ([ThreadId], [MVar ()])
forall a b. [(a, b)] -> ([a], [b])
unzip (IO [(ThreadId, MVar ())] -> IO ([ThreadId], [MVar ()]))
-> IO [(ThreadId, MVar ())] -> IO ([ThreadId], [MVar ()])
forall a b. (a -> b) -> a -> b
$ [IO (Maybe a)]
-> (IO (Maybe a) -> IO (ThreadId, MVar ()))
-> IO [(ThreadId, MVar ())]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [IO (Maybe a)]
acts ((IO (Maybe a) -> IO (ThreadId, MVar ()))
 -> IO [(ThreadId, MVar ())])
-> (IO (Maybe a) -> IO (ThreadId, MVar ()))
-> IO [(ThreadId, MVar ())]
forall a b. (a -> b) -> a -> b
$ \IO (Maybe a)
act -> do
        MVar ()
wait_var <- IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
        ThreadId
tid <- IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ (IO () -> IO Bool -> IO ()) -> IO Bool -> IO () -> IO ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip IO () -> IO Bool -> IO ()
forall a b. IO a -> IO b -> IO a
onNonThreadKilledException (MVar (Maybe a) -> Maybe a -> IO Bool
forall a. MVar a -> a -> IO Bool
tryPutMVar MVar (Maybe a)
resultvar Maybe a
forall a. Maybe a
Nothing) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$                     -- If we throw an exception, unblock
                        IO () -> IO () -> IO () -> IO ()
forall a b c. IO a -> IO b -> IO c -> IO c
bracket_ (Pool -> IO ()
killPoolWorkerFor Pool
pool) (Pool -> IO ()
spawnPoolWorkerFor Pool
pool IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> MVar () -> () -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar ()
wait_var ()) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ -- the main thread so it can rethrow it
                        ThreadId -> IO () -> IO ()
reflectExceptionsTo ThreadId
main_tid (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
            Maybe a
mb_res <- IO (Maybe a) -> IO (Maybe a)
forall a. IO a -> IO a
restore IO (Maybe a)
act
            case Maybe a
mb_res of
                Maybe a
Nothing  -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
                Just a
res -> MVar (Maybe a) -> Maybe a -> IO Bool
forall a. MVar a -> a -> IO Bool
tryPutMVar MVar (Maybe a)
resultvar (a -> Maybe a
forall a. a -> Maybe a
Just a
res) IO Bool -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
        (ThreadId, MVar ()) -> IO (ThreadId, MVar ())
forall (m :: * -> *) a. Monad m => a -> m a
return (ThreadId
tid, MVar ()
wait_var)
    IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ (MVar () -> IO ()) -> [MVar ()] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar [MVar ()]
waits IO () -> IO Bool -> IO Bool
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> MVar (Maybe a) -> Maybe a -> IO Bool
forall a. MVar a -> a -> IO Bool
tryPutMVar MVar (Maybe a)
resultvar Maybe a
forall a. Maybe a
Nothing IO Bool -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
    Maybe a
mb_res <- Pool -> IO (Maybe a) -> IO (Maybe a)
forall a. Pool -> IO a -> IO a
extraWorkerWhileBlocked Pool
pool (MVar (Maybe a) -> IO (Maybe a)
forall a. MVar a -> IO a
takeMVar MVar (Maybe a)
resultvar)
    (ThreadId -> IO ()) -> [ThreadId] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ ThreadId -> IO ()
killThread [ThreadId]
tids
    Maybe a -> IO (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
mb_res

-- | As 'parallelFirst', but instead of throwing exceptions that are thrown by subcomputations,
-- they are returned in a data structure.
--
-- As a result, property 4 of 'parallelFirst' is not preserved, and therefore if your IO actions can depend on each other
-- and may throw exceptions your program may die with "blocked indefinitely" exceptions rather than informative messages.
parallelFirstE :: Pool -> [IO (Maybe a)] -> IO (Maybe (Either SomeException a))
parallelFirstE :: Pool -> [IO (Maybe a)] -> IO (Maybe (Either SomeException a))
parallelFirstE Pool
pool [IO (Maybe a)]
acts = ((forall a. IO a -> IO a) -> IO (Maybe (Either SomeException a)))
-> IO (Maybe (Either SomeException a))
forall b. ((forall a. IO a -> IO a) -> IO b) -> IO b
mask (((forall a. IO a -> IO a) -> IO (Maybe (Either SomeException a)))
 -> IO (Maybe (Either SomeException a)))
-> ((forall a. IO a -> IO a)
    -> IO (Maybe (Either SomeException a)))
-> IO (Maybe (Either SomeException a))
forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
restore -> do
    ThreadId
main_tid <- IO ThreadId
myThreadId
    MVar (Maybe (Either SomeException a))
resultvar <- IO (MVar (Maybe (Either SomeException a)))
forall a. IO (MVar a)
newEmptyMVar
    ([ThreadId]
tids, [MVar ()]
waits) <- ([(ThreadId, MVar ())] -> ([ThreadId], [MVar ()]))
-> IO [(ThreadId, MVar ())] -> IO ([ThreadId], [MVar ()])
forall (m :: * -> *) a1 r. Monad m => (a1 -> r) -> m a1 -> m r
liftM [(ThreadId, MVar ())] -> ([ThreadId], [MVar ()])
forall a b. [(a, b)] -> ([a], [b])
unzip (IO [(ThreadId, MVar ())] -> IO ([ThreadId], [MVar ()]))
-> IO [(ThreadId, MVar ())] -> IO ([ThreadId], [MVar ()])
forall a b. (a -> b) -> a -> b
$ [IO (Maybe a)]
-> (IO (Maybe a) -> IO (ThreadId, MVar ()))
-> IO [(ThreadId, MVar ())]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [IO (Maybe a)]
acts ((IO (Maybe a) -> IO (ThreadId, MVar ()))
 -> IO [(ThreadId, MVar ())])
-> (IO (Maybe a) -> IO (ThreadId, MVar ()))
-> IO [(ThreadId, MVar ())]
forall a b. (a -> b) -> a -> b
$ \IO (Maybe a)
act -> do
        MVar ()
wait_var <- IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
        ThreadId
tid <- IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ IO () -> IO () -> IO () -> IO ()
forall a b c. IO a -> IO b -> IO c -> IO c
bracket_ (Pool -> IO ()
killPoolWorkerFor Pool
pool) (Pool -> IO ()
spawnPoolWorkerFor Pool
pool IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> MVar () -> () -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar ()
wait_var ()) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
            Either SomeException (Maybe a)
ei_mb_res <- IO (Maybe a) -> IO (Either SomeException (Maybe a))
forall e a. Exception e => IO a -> IO (Either e a)
try (IO (Maybe a) -> IO (Maybe a)
forall a. IO a -> IO a
restore IO (Maybe a)
act)
            case Either SomeException (Maybe a)
ei_mb_res of
                -- NB: we aren't in danger of putting a "thread killed" exception into the MVar
                -- since we only kill the spawned threads *after* we have already taken from resultvar
                Left SomeException
e           -> MVar (Maybe (Either SomeException a))
-> Maybe (Either SomeException a) -> IO Bool
forall a. MVar a -> a -> IO Bool
tryPutMVar MVar (Maybe (Either SomeException a))
resultvar (Either SomeException a -> Maybe (Either SomeException a)
forall a. a -> Maybe a
Just (SomeException -> Either SomeException a
forall a b. a -> Either a b
Left SomeException
e)) IO Bool -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
                Right Maybe a
Nothing    -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
                Right (Just a
res) -> MVar (Maybe (Either SomeException a))
-> Maybe (Either SomeException a) -> IO Bool
forall a. MVar a -> a -> IO Bool
tryPutMVar MVar (Maybe (Either SomeException a))
resultvar (Either SomeException a -> Maybe (Either SomeException a)
forall a. a -> Maybe a
Just (a -> Either SomeException a
forall a b. b -> Either a b
Right a
res)) IO Bool -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
        (ThreadId, MVar ()) -> IO (ThreadId, MVar ())
forall (m :: * -> *) a. Monad m => a -> m a
return (ThreadId
tid, MVar ()
wait_var)
    IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ (MVar () -> IO ()) -> [MVar ()] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar [MVar ()]
waits IO () -> IO Bool -> IO Bool
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> MVar (Maybe (Either SomeException a))
-> Maybe (Either SomeException a) -> IO Bool
forall a. MVar a -> a -> IO Bool
tryPutMVar MVar (Maybe (Either SomeException a))
resultvar Maybe (Either SomeException a)
forall a. Maybe a
Nothing IO Bool -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
    Maybe (Either SomeException a)
mb_res <- Pool
-> IO (Maybe (Either SomeException a))
-> IO (Maybe (Either SomeException a))
forall a. Pool -> IO a -> IO a
extraWorkerWhileBlocked Pool
pool (MVar (Maybe (Either SomeException a))
-> IO (Maybe (Either SomeException a))
forall a. MVar a -> IO a
takeMVar MVar (Maybe (Either SomeException a))
resultvar)
    (ThreadId -> IO ()) -> [ThreadId] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ ThreadId -> IO ()
killThread [ThreadId]
tids
    Maybe (Either SomeException a)
-> IO (Maybe (Either SomeException a))
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (Either SomeException a)
mb_res