-- | A helper module which takes care of parallelism
{-# LANGUAGE DeriveDataTypeable #-}
module Test.Tasty.Parallel (ActionStatus(..), Action(..), runInParallel) where

import Control.Monad
import Control.Concurrent
import Control.Concurrent.Async
import Control.Concurrent.STM
import Foreign.StablePtr

-- | What to do about an 'Action'?
data ActionStatus
  = ActionReady
    -- ^ the action is ready to be executed
  | ActionSkip
    -- ^ the action should be skipped
  | ActionWait
    -- ^ not sure what to do yet; wait
  deriving ActionStatus -> ActionStatus -> Bool
(ActionStatus -> ActionStatus -> Bool)
-> (ActionStatus -> ActionStatus -> Bool) -> Eq ActionStatus
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: ActionStatus -> ActionStatus -> Bool
$c/= :: ActionStatus -> ActionStatus -> Bool
== :: ActionStatus -> ActionStatus -> Bool
$c== :: ActionStatus -> ActionStatus -> Bool
Eq

data Action = Action
  { Action -> STM ActionStatus
actionStatus :: STM ActionStatus
  , Action -> IO ()
actionRun :: IO ()
  , Action -> STM ()
actionSkip :: STM ()
  }

-- | Take a list of actions and execute them in parallel, no more than @n@
-- at the same time.
--
-- The action itself is asynchronous, ie. it returns immediately and does
-- the work in new threads. It returns an action which aborts tests and
-- cleans up.
runInParallel
  :: Int -- ^ maximum number of parallel threads
  -> [Action] -- ^ list of actions to execute.
    -- The first action in the pair tells if the second action is ready to run.
  -> IO (IO ())
-- This implementation tries its best to ensure that exceptions are
-- properly propagated to the caller and threads are not left running.
--
-- Note that exceptions inside tests are already caught by the test
-- actions themselves. Any exceptions that reach this function or its
-- threads are by definition unexpected.
runInParallel :: Int -> [Action] -> IO (IO ())
runInParallel Int
nthreads [Action]
actions = do
  ThreadId
callingThread <- IO ThreadId
myThreadId

  -- Don't let the main thread be garbage-collected
  -- Otherwise we may get a "thread blocked indefinitely in an STM
  -- transaction" exception when a child thread is blocked and GC'd.
  -- (See e.g. https://github.com/UnkindPartition/tasty/issues/15)
  -- FIXME is this still needed?
  StablePtr ThreadId
_ <- ThreadId -> IO (StablePtr ThreadId)
forall a. a -> IO (StablePtr a)
newStablePtr ThreadId
callingThread

  TMVar [Action]
actionsVar <- STM (TMVar [Action]) -> IO (TMVar [Action])
forall a. STM a -> IO a
atomically (STM (TMVar [Action]) -> IO (TMVar [Action]))
-> STM (TMVar [Action]) -> IO (TMVar [Action])
forall a b. (a -> b) -> a -> b
$ [Action] -> STM (TMVar [Action])
forall a. a -> STM (TMVar a)
newTMVar [Action]
actions

  [Async ()]
pids <- Int -> IO (Async ()) -> IO [Async ()]
forall (m :: * -> *) a. Applicative m => Int -> m a -> m [a]
replicateM Int
nthreads (IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async (IO () -> IO (Async ())) -> IO () -> IO (Async ())
forall a b. (a -> b) -> a -> b
$ TMVar [Action] -> IO ()
work TMVar [Action]
actionsVar)

  IO () -> IO (IO ())
forall (m :: * -> *) a. Monad m => a -> m a
return (IO () -> IO (IO ())) -> IO () -> IO (IO ())
forall a b. (a -> b) -> a -> b
$ do
    -- Tell worker threads there is no more work after their current task.
    -- 'cancel' below by itself is not sufficient because if an exception
    -- is thrown in the middle of a test, the worker thread simply marks
    -- the test as failed and moves on to their next task. We also need to
    -- make it clear that there are no further tasks.
    [Action]
_ <- STM [Action] -> IO [Action]
forall a. STM a -> IO a
atomically (STM [Action] -> IO [Action]) -> STM [Action] -> IO [Action]
forall a b. (a -> b) -> a -> b
$ TMVar [Action] -> [Action] -> STM [Action]
forall a. TMVar a -> a -> STM a
swapTMVar TMVar [Action]
actionsVar []
    -- Cancel all the current tasks, waiting for workers to clean up.
    -- The waiting part is important (see #249), that's why we use cancel
    -- instead of killThread.
    (Async () -> IO ()) -> [Async ()] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ Async () -> IO ()
forall a. Async a -> IO ()
cancel [Async ()]
pids

work :: TMVar [Action] -> IO ()
work :: TMVar [Action] -> IO ()
work TMVar [Action]
actionsVar = IO ()
go
  where
    go :: IO ()
go = do
      IO (IO ()) -> IO ()
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (IO (IO ()) -> IO ())
-> (STM (IO ()) -> IO (IO ())) -> STM (IO ()) -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM (IO ()) -> IO (IO ())
forall a. STM a -> IO a
atomically (STM (IO ()) -> IO ()) -> STM (IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ do
        Maybe (Action, [Action])
mb_ready <- [Action] -> STM (Maybe (Action, [Action]))
findBool ([Action] -> STM (Maybe (Action, [Action])))
-> STM [Action] -> STM (Maybe (Action, [Action]))
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< TMVar [Action] -> STM [Action]
forall a. TMVar a -> STM a
takeTMVar TMVar [Action]
actionsVar
        case Maybe (Action, [Action])
mb_ready of
          Maybe (Action, [Action])
Nothing -> do
            -- Nothing left to do. Put back the TMVar so that other threads
            -- do not block on an empty TMVar (see #249) and return.
            TMVar [Action] -> [Action] -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar TMVar [Action]
actionsVar []
            IO () -> STM (IO ())
forall (m :: * -> *) a. Monad m => a -> m a
return (IO () -> STM (IO ())) -> IO () -> STM (IO ())
forall a b. (a -> b) -> a -> b
$ () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
          Just (Action
this, [Action]
rest) -> do
            TMVar [Action] -> [Action] -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar TMVar [Action]
actionsVar [Action]
rest
            IO () -> STM (IO ())
forall (m :: * -> *) a. Monad m => a -> m a
return (IO () -> STM (IO ())) -> IO () -> STM (IO ())
forall a b. (a -> b) -> a -> b
$ Action -> IO ()
actionRun Action
this IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO ()
go

-- | Find a ready-to-run item. Filter out the items that will never be
-- ready to run.
--
-- Return the ready item and the remaining ones.
--
-- This action may block if no items are ready to run just yet.
--
-- Return 'Nothing' if there are no runnable items left.
findBool :: [Action] -> STM (Maybe (Action, [Action]))
findBool :: [Action] -> STM (Maybe (Action, [Action]))
findBool = [Action] -> [Action] -> STM (Maybe (Action, [Action]))
go []
  where
    go :: [Action] -> [Action] -> STM (Maybe (Action, [Action]))
go [] [] =
      -- nothing to do
      Maybe (Action, [Action]) -> STM (Maybe (Action, [Action]))
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (Action, [Action])
forall a. Maybe a
Nothing
    go [Action]
_ [] =
      -- nothing ready yet
      STM (Maybe (Action, [Action]))
forall a. STM a
retry
    go [Action]
past (Action
this : [Action]
rest) = do
      ActionStatus
status <- Action -> STM ActionStatus
actionStatus Action
this
      case ActionStatus
status of
        ActionStatus
ActionReady -> Maybe (Action, [Action]) -> STM (Maybe (Action, [Action]))
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe (Action, [Action]) -> STM (Maybe (Action, [Action])))
-> Maybe (Action, [Action]) -> STM (Maybe (Action, [Action]))
forall a b. (a -> b) -> a -> b
$ (Action, [Action]) -> Maybe (Action, [Action])
forall a. a -> Maybe a
Just (Action
this, [Action] -> [Action]
forall a. [a] -> [a]
reverse [Action]
past [Action] -> [Action] -> [Action]
forall a. [a] -> [a] -> [a]
++ [Action]
rest)
        ActionStatus
ActionWait -> [Action] -> [Action] -> STM (Maybe (Action, [Action]))
go (Action
this Action -> [Action] -> [Action]
forall a. a -> [a] -> [a]
: [Action]
past) [Action]
rest
        ActionStatus
ActionSkip -> do
          Action -> STM ()
actionSkip Action
this
          [Action] -> [Action] -> STM (Maybe (Action, [Action]))
go [Action]
past [Action]
rest