{-| Module : Control.Concurrent.Bag.ImplicitConcurrent Description : Lazy list based bag of tasks interface without using STM Copyright : (c) Bastian Holst, 2014 License : BSD3 Maintainer : bastianholst@gmx.de Stability : experimental Portability : POSIX High level bag of tasks interface based on "Control.Concurrent.Bag.Concurrent". Tasks can only return results and add new tasks as intended and it is not possible to add new tasks from the outside or from the action processing the results. -} module Control.Concurrent.Bag.ImplicitConcurrent ( module Control.Concurrent.Bag.Task , newTaskBag , newEvalBag , newInterruptingBag , newInterruptibleBag , BufferType (..) ) where import Data.IORef import Control.Monad.IO.Class import qualified Control.Concurrent.Bag.Concurrent as Basic import Control.Concurrent.Bag.Task import System.IO.Unsafe (unsafeInterleaveIO) import Control.Concurrent.Chan import Control.Concurrent.Bag.TaskBuffer -- | Build and start a new task bag. -- -- The returned list is not evaluated yet, but will be evaluated on demand -- later. Already computed results will be kept in a buffer until the -- result list is evaluated. If the result list gets garbage collected, -- the bag of tasks will be stopped automatically. newTaskBag :: BufferType -- ^ buffer type -> [TaskIO r (Maybe r)] -- ^ list of initial tasks -> IO [r] -- ^ lazy result list newTaskBag buf ts = do bag <- Basic.newBag_ buf mapM_ (\t -> Basic.addTask bag (runTaskIO t (Basic.addTask bag) (Basic.writeResult bag))) ts Basic.noMoreTasks bag dummy <- newIORef () _ <- mkWeakIORef dummy (Basic.terminateBag bag) lazyResults bag dummy where lazyResults bag dummy = unsafeInterleaveIO $ do r <- Basic.getResult bag case r of Nothing -> do writeIORef dummy () return [] Just v -> do vs <- lazyResults bag dummy return (v:vs) -- | Like 'newTaskBag', but it takes a list of expressions that will be -- evaluated to weak head normal form using 'Prelude.seq'. -- -- __WARNING__: This does not evaluate to normal form, but only to weak head -- normal form. newEvalBag :: BufferType -> [r] -> IO [r] newEvalBag buf es = newTaskBag buf (map (\e -> e `seq` return $ Just e) es) -- | Similar to 'newTaskBag', but taking a list of 'Interruptible' instead of -- tasks. newInterruptibleBag :: BufferType -- ^ buffer type -> [Interruptible r] -- ^ list of initial tasks -> IO [r] -- ^ lazy result list newInterruptibleBag buf ts = do newTaskBag buf $ map runInterruptible ts -- | Similar to 'newTaskBag', but interrupts the tasks in certain intervals. -- Using a 'Control.Concurrent.STM.TChan' as buffer, this ensures -- completeness: all tasks that have a result will get their time to evaluate -- it. Note, that calculations, that do no memory allocation, cannot be -- interrupted. newInterruptingBag :: BufferType -- ^ buffer type -> [Interruptible r] -- ^ list of initial tasks -> IO [r] -- ^ lazy result list newInterruptingBag buf ts = do newTaskBag buf $ map runInterrupted ts