module Control.Concurrent.Bag.ImplicitConcurrent ( module Control.Concurrent.Bag.Task , newTaskBag , newEvalBag , newInterruptingBag , newInterruptibleBag , BufferType (..) ) where import Data.IORef import Control.Monad.IO.Class import qualified Control.Concurrent.Bag.Concurrent as Basic import Control.Concurrent.Bag.Task import System.IO.Unsafe (unsafeInterleaveIO) import Control.Concurrent.Chan import Control.Concurrent.Bag.TaskBuffer newTaskBag :: BufferType -> [TaskIO r (Maybe r)] -- ^ list of initial tasks -> IO [r] newTaskBag buf ts = do bag <- Basic.newBag_ buf mapM_ (\t -> Basic.addTask bag (runTaskIO t (Basic.addTask bag))) ts Basic.noMoreTasks bag dummy <- newIORef () _ <- mkWeakIORef dummy (Basic.terminateBag bag) lazyResults bag dummy where lazyResults bag dummy = unsafeInterleaveIO $ do r <- Basic.getResult bag case r of Nothing -> do writeIORef dummy () return [] Just v -> do vs <- lazyResults bag dummy return (v:vs) -- | Like 'newTaskBag', but it takes a list of expressions that will be -- evaluated to weak head normal form using 'Prelude.seq'. -- -- __WARNING__: This does not evaluate to normal form, but only to weak head -- normal form. newEvalBag :: BufferType -> [r] -> IO [r] newEvalBag buf es = newTaskBag buf (map (\e -> e `seq` return $ Just e) es) newInterruptibleBag :: BufferType -> [Interruptible r] -> IO [r] newInterruptibleBag buf ts = do newTaskBag buf $ map runInterruptible ts -- | Similar to 'newTaskBag', but interrupts the tasks in certain intervals. -- Using a 'Control.Concurrent.STM.TChan' as buffer, this ensures -- completeness: all tasks that have a result will get their time to evaluate -- it. Note, that calculations, that do no memory allocation, cannot be -- interrupted. newInterruptingBag :: BufferType -> [Interruptible r] -> IO [r] newInterruptingBag buf ts = do newTaskBag buf $ map runInterrupted ts