module Test.Framework.Runners.ThreadPool ( executeOnPool ) where import Control.Concurrent import Control.Monad import qualified Data.IntMap as IM import Foreign.StablePtr data WorkerEvent token a = WorkerTermination | WorkerItem token a -- | Execute IO actions on several threads and return their results in the original -- order. It is guaranteed that no action from the input list is executed unless all -- the items that precede it in the list have been executed or are executing at that -- moment. executeOnPool :: Int -- ^ Number of threads to use -> [IO a] -- ^ Actions to execute: these will be scheduled left to right -> IO [a] -- ^ Ordered results of executing the given IO actions in parallel executeOnPool n actions = do -- Prepare the channels input_chan <- newChan output_chan <- newChan -- Write the actions as items to the channel followed by one termination per thread -- that indicates they should terminate. We do this on another thread for -- maximum laziness (in case one the actions we are going to run depend on the -- output from previous actions..) _ <- forkIO $ writeList2Chan input_chan (zipWith WorkerItem [0..] actions ++ replicate n WorkerTermination) -- Spawn workers forM_ [1..n] (const $ forkIO $ poolWorker input_chan output_chan) -- Short version: make sure we do the right thing if a test blocks on dead -- MVars or TVars. -- Long version: GHC is clever enough to throw an exception (BlockedOnDeadMVar -- or BlockedIndefinitely) when a thread is waiting for a MVar or TVar that can't -- be written to. However, it doesn't know anything about the handlers for those -- exceptions. Therefore, when a worker runs a test that causes this exception, -- since the main thread is blocking on the worker, the main thread gets the -- exception too despite the fact that the main thread will be runnable as soon -- as the worker catches its own exception. The below makes sure the main thread -- is always reachable by the GC, which is the mechanism for finding threads -- that are unrunnable. -- See also the ticket where SimonM (semi-cryptically) explains this: -- http://hackage.haskell.org/trac/ghc/ticket/3291 -- -- NB: this actually leaks stable pointers. We could prevent this by making -- takeWhileWorkersExist do |unsafePerformIO newStablePtr| when returning the -- lazily-demanded tail of the list, but its a bit of a pain. For now, just -- grit our teeth and accept the leak. _stablePtr <- myThreadId >>= newStablePtr -- Return the results generated by the worker threads lazily and in -- the same order as we got the inputs fmap (reorderFrom 0 . takeWhileWorkersExist n) $ getChanContents output_chan poolWorker :: Chan (WorkerEvent token (IO a)) -> Chan (WorkerEvent token a) -> IO () poolWorker input_chan output_chan = do -- Read an action and work out whether we should continue or stop action_item <- readChan input_chan case action_item of WorkerTermination -> writeChan output_chan WorkerTermination -- Must have run out of real actions to execute WorkerItem token action -> do -- Do the action then loop result <- action writeChan output_chan (WorkerItem token result) poolWorker input_chan output_chan -- | Keep grabbing items out of the infinite list of worker outputs until we have -- recieved word that all of the workers have shut down. This lets us turn a possibly -- infinite list of outputs into a certainly finite one suitable for use with reorderFrom. takeWhileWorkersExist :: Int -> [WorkerEvent token a] -> [(token, a)] takeWhileWorkersExist worker_count events | worker_count <= 0 = [] | otherwise = case events of (WorkerTermination:events') -> takeWhileWorkersExist (worker_count - 1) events' (WorkerItem token x:events') -> (token, x) : takeWhileWorkersExist worker_count events' [] -> [] -- | This function carefully shuffles the input list so it in the total order -- defined by the integers paired with the elements. If the list is @xs@ and -- the supplied initial integer is @n@, it must be the case that: -- -- > sort (map fst xs) == [n..n + (length xs - 1)] -- -- This function returns items in the lazy result list as soon as it is sure -- it has the right item for that position. reorderFrom :: Int -> [(Int, a)] -> [a] reorderFrom from initial_things = go from initial_things IM.empty False where go next [] buf _ | IM.null buf = [] -- If the buffer and input list is empty, we're done | otherwise = go next (IM.toList buf) IM.empty False -- Make sure we check the buffer even if the list is done go next all_things@((token, x):things) buf buf_useful | token == next -- If the list token matches the one we were expecting we can just take the item = x : go (next + 1) things buf True -- Always worth checking the buffer now because the expected item has changed | buf_useful -- If it's worth checking the buffer, it's possible the token we need is in it , (Just x', buf') <- IM.updateLookupWithKey (const $ const Nothing) next buf -- Delete the found item from the map (if we find it) to save space = x' : go (next + 1) all_things buf' True -- Always worth checking the buffer now because the expected item has changed | otherwise -- Token didn't match, buffer unhelpful: it must be in the tail of the list = go next things (IM.insert token x buf) False -- Since we've already checked the buffer, stop bothering to do so until something changes -}