module Control.Concurrent.Bag.Implicit ( module Control.Concurrent.Bag.Task , newTaskBag , newEvalBag , SplitFunction , takeFirst , splitVertical , splitHalf ) where import Data.IORef import Control.Monad.IO.Class import qualified Control.Concurrent.Bag.Basic as Basic import Control.Concurrent.Bag.Task import Control.Concurrent.Bag.TaskBuffer ( TaskBufferSTM (..) , SplitFunction , takeFirst ) import System.IO.Unsafe (unsafeInterleaveIO) newTaskBag :: (TaskBufferSTM b) => Maybe (SplitFunction b r) -- ^ Possible split function -- If the function is given, we will create -- a bag with one buffer per worker -- reducing the communication between the -- workers. -> [Task b r (Maybe r)] -- ^ list of initial tasks -> IO [r] newTaskBag split ts = do bag <- Basic.newBag_ split mapM_ (\t -> Basic.addTask bag (runTask t bag)) ts Basic.noMoreTasks bag dummy <- newIORef () _ <- mkWeakIORef dummy (Basic.terminateBag bag) lazyResults bag dummy where lazyResults bag dummy = unsafeInterleaveIO $ do r <- Basic.getResult bag case r of Nothing -> do writeIORef dummy () return [] Just v -> do vs <- lazyResults bag dummy return (v:vs) -- | Like 'newTaskBag', but it takes a list of expressions that will be -- evaluated to weak head normal form using 'Prelude.seq'. -- -- __WARNING__: This does not evaluate to normal form, but only to weak head -- normal form. newEvalBag :: (TaskBufferSTM b) => Maybe (SplitFunction b r) -> [r] -> IO [r] newEvalBag split es = newTaskBag split (map (\e -> e `seq` return $ Just e) es)