-- | High level bag of tasks interface needing no explicit control. Tasks can -- only return results and add new tasks as intended and it is not possible -- to add new tasks from the outside or from the action processing the -- results. This way it is possible to ensure that 'getResults' returns only -- 'Nothing' if it is safe to say that there will be no results anymore. module Control.Concurrent.Bag.Safe ( BagT , newTaskBag , newEvalBag , newInterruptibleBag , newInterruptingBag , getResult , getAllResults , liftIO , lift , TaskBufferSTM (..) , SplitFunction , takeFirst , module Control.Concurrent.Bag.Task , BufferType (..) ) where import Control.Applicative import Control.Monad.Reader import Control.Concurrent.Bag.TaskBufferSTM ( TaskBufferSTM (..) , SplitFunction , takeFirst , BufferType (..) ) import qualified Control.Concurrent.Bag.Basic as Basic import Control.Concurrent.Bag.Task import Control.Concurrent.STM.TChan (TChan) import Control.Concurrent.STM.TStack (TStack) import Control.Concurrent.Bag.BagT runBagT :: MonadIO m => BagT r m a -> Basic.Bag r -> m a runBagT (BagT reader) bag = runReaderT reader (Basic.getResult bag) -- | Like 'newTaskBag', but it takes a list of expressions that will be -- evaluated to weak head normal form using 'Prelude.seq'. -- -- __WARNING__: This does not evaluate to normal form, but only to weak head -- normal form. newEvalBag :: (MonadIO m) => BufferType -> Maybe (SplitFunction r) -> [r] -> BagT r m a -> m a newEvalBag buf split es = newTaskBag buf split (map (\e -> e `seq` return $ Just e) es) -- | Initializes a new bag of tasks and starts a gang of workers threads. The -- number of worker threads is equal to the number of capabilities of the -- Haskell runtime (see 'Control.Concurrent.getNumCapabilities'). -- -- __WARNING__: If it may be necessary to terminate the thread pool, i.e. -- because the result processing function does not always request all values, -- you have to make sure that the task can be stopped. -- Terminating the tasks is done with asynchronous exceptions which can only be -- received at a \emph{safe point}. Safe points are all points where memory -- allocation is requested, but there are calculations and also loops which never -- need any new memory. These calculations cannot be terminated and may run -- forever, see the documentation of 'Control.Exception.Base.throwTo'. newTaskBag :: (MonadIO m) => BufferType -> Maybe (SplitFunction r) -- ^ Possible split function -- If the function is given, we will create -- a bag with one buffer per worker -- reducing the communication between the -- workers. -> [TaskIO r (Maybe r)] -- ^ list of initial tasks -> BagT r m a -- ^ action to process the results of the bag -> m a newTaskBag buf split ts act = do bag <- Basic.newBag_ buf split mapM_ (\t -> Basic.addTask bag (runTaskIO t (Basic.addTask bag) (Basic.writeResult bag))) ts Basic.noMoreTasks bag result <- runBagT act bag Basic.terminateBag bag return result newInterruptibleBag :: MonadIO m => BufferType -> Maybe (SplitFunction r) -> [Interruptible r] -> BagT r m a -> m a newInterruptibleBag buf split ts = do newTaskBag buf split $ map runInterruptible ts -- | Similar to 'newTaskBag', but interrupts the tasks in certain intervals. -- Using a 'Control.Concurrent.STM.TChan' as buffer, this ensures -- completeness: all tasks that have a result will get their time to evaluate -- it. Note, that calculations, that do no memory allocation, cannot be -- interrupted. newInterruptingBag :: (MonadIO m) => BufferType -> Maybe (SplitFunction r) -> [Interruptible r] -> BagT r m a -> m a newInterruptingBag buf split ts = do newTaskBag buf split $ map runInterrupted ts