module Control.Concurrent.Bag.Implicit ( module Control.Concurrent.Bag.Task , newTaskBag , newEvalBag , SplitFunction , takeFirst , splitVertical , splitHalf , newInterruptingBag , newInterruptibleBag ) where import Data.IORef import Control.Monad.IO.Class import qualified Control.Concurrent.Bag.Basic as Basic import Control.Concurrent.Bag.Task import Control.Concurrent.Bag.TaskBuffer ( TaskBufferSTM (..) , SplitFunction , takeFirst ) import System.IO.Unsafe (unsafeInterleaveIO) newTaskBag :: (TaskBufferSTM b) => Maybe (SplitFunction b r) -- ^ Possible split function -- If the function is given, we will create -- a bag with one buffer per worker -- reducing the communication between the -- workers. -> [TaskIO r (Maybe r)] -- ^ list of initial tasks -> IO [r] newTaskBag split ts = do bag <- Basic.newBag_ split mapM_ (\t -> Basic.addTask bag (runTaskIO t (Basic.addTask bag))) ts Basic.noMoreTasks bag dummy <- newIORef () _ <- mkWeakIORef dummy (Basic.terminateBag bag) lazyResults bag dummy where lazyResults bag dummy = unsafeInterleaveIO $ do r <- Basic.getResult bag case r of Nothing -> do writeIORef dummy () return [] Just v -> do vs <- lazyResults bag dummy return (v:vs) -- | Like 'newTaskBag', but it takes a list of expressions that will be -- evaluated to weak head normal form using 'Prelude.seq'. -- -- __WARNING__: This does not evaluate to normal form, but only to weak head -- normal form. newEvalBag :: (TaskBufferSTM b) => Maybe (SplitFunction b r) -> [r] -> IO [r] newEvalBag split es = newTaskBag split (map (\e -> e `seq` return $ Just e) es) newInterruptibleBag :: (TaskBufferSTM b) => Maybe (SplitFunction b r) -> [Interruptible r] -> IO [r] newInterruptibleBag split ts = do newTaskBag split $ map runInterruptible ts -- | Similar to 'newTaskBag', but interrupts the tasks in certain intervals. -- Using a 'Control.Concurrent.STM.TChan' as buffer, this ensures -- completeness: all tasks that have a result will get their time to evaluate -- it. Note, that calculations, that do no memory allocation, cannot be -- interrupted. newInterruptingBag :: (TaskBufferSTM b) => Maybe (SplitFunction b r) -> [Interruptible r] -> IO [r] newInterruptingBag split ts = do newTaskBag split $ map runInterrupted ts