module Control.Concurrent.Bag.SafeConcurrent ( BagT , newTaskBag , newEvalBag , newInterruptibleBag , newInterruptingBag , getResult , getAllResults , liftIO , lift , module Control.Concurrent.Bag.Task ) where import Control.Applicative import Control.Monad.Reader import qualified Control.Concurrent.Bag.Concurrent as Basic import Control.Concurrent.Bag.Task -- | A monad transformer for processing the results of the bag sequencially. -- In addition to the actions available in the base monad, which has to be -- an instance of MonadIO in all functions, it provides the action -- 'getResult' to get a result of the bag. newtype BagT r m a = BagT { getBagReader :: ReaderT (Basic.Bag r) m a } instance Monad m => Functor (BagT r m) where fmap = liftM instance Monad m => Applicative (BagT r m) where pure = return (<*>) = ap instance Monad m => Monad (BagT r m) where return = BagT . return (BagT a) >>= b = BagT $ a >>= getBagReader . b instance MonadTrans (BagT r) where lift act = BagT $ lift act instance MonadIO m => MonadIO (BagT r m) where liftIO act = BagT $ liftIO act runBagT :: BagT r m a -> Basic.Bag r -> m a runBagT (BagT reader) = runReaderT reader -- | Like 'newTaskBag', but it takes a list of expressions that will be -- evaluated to weak head normal form using 'Prelude.seq'. -- -- __WARNING__: This does not evaluate to normal form, but only to weak head -- normal form. newEvalBag :: (MonadIO m) => [r] -> BagT r m a -> m a newEvalBag es = newTaskBag (map (\e -> e `seq` return $ Just e) es) -- | Initializes a new bag of tasks and starts a gang of workers threads. The -- number of worker threads is equal to the number of capabilities of the -- Haskell runtime (see 'Control.Concurrent.getNumCapabilities'). -- -- __WARNING__: If it may be necessary to terminate the thread pool, i.e. -- because the result processing function does not always request all values, -- you have to make sure that the task can be stopped. -- Terminating the tasks is done with asynchronous exceptions which can only be -- received at a \emph{safe point}. Safe points are all points where memory -- allocation is requested, but there are calculations and also loops which never -- need any new memory. These calculations cannot be terminated and may run -- forever, see the documentation of 'Control.Exception.Base.throwTo'. newTaskBag :: (MonadIO m) => [TaskIO r (Maybe r)] -- ^ list of initial tasks -> BagT r m a -- ^ action to process the results of the bag -> m a newTaskBag ts act = do bag <- Basic.newBag_ mapM_ (\t -> Basic.addTask bag (runTaskIO t (Basic.addTask bag))) ts Basic.noMoreTasks bag result <- runBagT act bag Basic.terminateBag bag return result newInterruptibleBag :: (MonadIO m) => [Interruptible r] -> BagT r m a -> m a newInterruptibleBag ts = do newTaskBag $ map runInterruptible ts -- | Similar to 'newTaskBag', but interrupts the tasks in certain intervals. -- Using a 'Control.Concurrent.STM.TChan' as buffer, this ensures -- completeness: all tasks that have a result will get their time to evaluate -- it. Note, that calculations, that do no memory allocation, cannot be -- interrupted. newInterruptingBag :: (MonadIO m) => [Interruptible r] -> BagT r m a -> m a newInterruptingBag ts = do newTaskBag $ map runInterrupted ts -- | Get a result of the bag if there is one. If it returns Nothing, all tasks -- have been processed and there are no results left. 'getResults' blocks -- until a task has been evaluated to a result or all tasks are processed. -- Therefore it may block forever. getResult :: (MonadIO m) => BagT r m (Maybe r) getResult = BagT $ ask >>= Basic.getResult -- | Convenience function to get all results from the bag of tasks. getAllResults :: (MonadIO m) => BagT a m [a] getAllResults = do mx <- getResult case mx of Just x -> do xs <- getAllResults return $ x:xs Nothing -> return []