-- | A helper module which takes care of parallelism {-# LANGUAGE DeriveDataTypeable #-} module Test.Tasty.Parallel (ActionStatus(..), Action(..), runInParallel) where import Control.Monad import Control.Concurrent import Control.Concurrent.Async import Control.Concurrent.STM import Foreign.StablePtr -- | What to do about an 'Action'? data ActionStatus = ActionReady -- ^ the action is ready to be executed | ActionSkip -- ^ the action should be skipped | ActionWait -- ^ not sure what to do yet; wait deriving Eq data Action = Action { actionStatus :: STM ActionStatus , actionRun :: IO () , actionSkip :: STM () } -- | Take a list of actions and execute them in parallel, no more than @n@ -- at the same time. -- -- The action itself is asynchronous, ie. it returns immediately and does -- the work in new threads. It returns an action which aborts tests and -- cleans up. runInParallel :: Int -- ^ maximum number of parallel threads -> [Action] -- ^ list of actions to execute. -- The first action in the pair tells if the second action is ready to run. -> IO (IO ()) -- This implementation tries its best to ensure that exceptions are -- properly propagated to the caller and threads are not left running. -- -- Note that exceptions inside tests are already caught by the test -- actions themselves. Any exceptions that reach this function or its -- threads are by definition unexpected. runInParallel nthreads actions = do callingThread <- myThreadId -- Don't let the main thread be garbage-collected -- Otherwise we may get a "thread blocked indefinitely in an STM -- transaction" exception when a child thread is blocked and GC'd. -- (See e.g. https://github.com/feuerbach/tasty/issues/15) -- FIXME is this still needed? _ <- newStablePtr callingThread actionsVar <- atomically $ newTMVar actions pids <- replicateM nthreads (async $ work actionsVar) return $ do -- Tell worker threads there is no more work after their current task. -- 'cancel' below by itself is not sufficient because if an exception -- is thrown in the middle of a test, the worker thread simply marks -- the test as failed and moves on to their next task. We also need to -- make it clear that there are no further tasks. _ <- atomically $ swapTMVar actionsVar [] -- Cancel all the current tasks, waiting for workers to clean up. -- The waiting part is important (see #249), that's why we use cancel -- instead of killThread. mapM_ cancel pids work :: TMVar [Action] -> IO () work actionsVar = go where go = do join . atomically $ do mb_ready <- findBool =<< takeTMVar actionsVar case mb_ready of Nothing -> do -- Nothing left to do. Put back the TMVar so that other threads -- do not block on an empty TMVar (see #249) and return. putTMVar actionsVar [] return $ return () Just (this, rest) -> do putTMVar actionsVar rest return $ actionRun this >> go -- | Find a ready-to-run item. Filter out the items that will never be -- ready to run. -- -- Return the ready item and the remaining ones. -- -- This action may block if no items are ready to run just yet. -- -- Return 'Nothing' if there are no runnable items left. findBool :: [Action] -> STM (Maybe (Action, [Action])) findBool = go [] where go [] [] = -- nothing to do return Nothing go _ [] = -- nothing ready yet retry go past (this : rest) = do status <- actionStatus this case status of ActionReady -> return $ Just (this, reverse past ++ rest) ActionWait -> go (this : past) rest ActionSkip -> do actionSkip this go past rest