module Control.Concurrent.Bag.Implicit
( module Control.Concurrent.Bag.Task
, newTaskBag
, newEvalBag
, SplitFunction
, takeFirst
, newInterruptingBag
, newInterruptibleBag
, BufferType (..)
)
where
import Data.IORef
import Control.Monad.IO.Class
import qualified Control.Concurrent.Bag.Basic as Basic
import Control.Concurrent.Bag.Task
import Control.Concurrent.Bag.TaskBufferSTM
( TaskBufferSTM (..)
, SplitFunction
, takeFirst
, BufferType (..) )
import System.IO.Unsafe (unsafeInterleaveIO)
import Control.Concurrent.STM.TChan (TChan)
import Control.Concurrent.STM.TStack (TStack)
newTaskBag :: BufferType
-> Maybe (SplitFunction r)
-> [TaskIO r (Maybe r)]
-> IO [r]
newTaskBag buf split ts = do
bag <- Basic.newBag_ buf split
mapM_ (\t -> Basic.addTask bag (runTaskIO t (Basic.addTask bag) (Basic.writeResult bag))) ts
Basic.noMoreTasks bag
dummy <- newIORef ()
_ <- mkWeakIORef dummy (Basic.terminateBag bag)
lazyResults bag dummy
where
lazyResults bag dummy = unsafeInterleaveIO $ do
r <- Basic.getResult bag
case r of
Nothing -> do
writeIORef dummy ()
return []
Just v -> do
vs <- lazyResults bag dummy
return (v:vs)
newEvalBag :: BufferType
-> Maybe (SplitFunction r)
-> [r]
-> IO [r]
newEvalBag buf split es =
newTaskBag buf split (map (\e -> e `seq` return $ Just e) es)
newInterruptibleBag :: BufferType
-> Maybe (SplitFunction r)
-> [Interruptible r]
-> IO [r]
newInterruptibleBag buf split ts = do
newTaskBag buf split $ map runInterruptible ts
newInterruptingBag :: BufferType
-> Maybe (SplitFunction r)
-> [Interruptible r]
-> IO [r]
newInterruptingBag buf split ts = do
newTaskBag buf split $ map runInterrupted ts