module Control.Concurrent.Bag.SafeConcurrent
  ( BagT
  , newTaskBag
  , newEvalBag
  , newInterruptibleBag
  , newInterruptingBag
  , getResult
  , getAllResults
  , liftIO
  , lift
  , module Control.Concurrent.Bag.Task
  , BufferType (..) )
where

import Control.Applicative
import Control.Monad.Reader
import qualified Control.Concurrent.Bag.Concurrent as Basic
import Control.Concurrent.Bag.TaskBuffer
import Control.Concurrent.Bag.Task
import Control.Concurrent.Bag.BagT

runBagT :: MonadIO m => BagT r m a -> Basic.Bag r -> m a
runBagT (BagT reader) bag = runReaderT reader (Basic.getResult bag)

-- | Like 'newTaskBag', but it takes a list of expressions that will be
--   evaluated to weak head normal form using 'Prelude.seq'.
--
--   __WARNING__: This does not evaluate to normal form, but only to weak head
--   normal form.
newEvalBag :: (MonadIO m) =>
              BufferType
           -> [r]
           -> BagT r m a
           -> m a
newEvalBag buf es =
  newTaskBag buf (map (\e -> e `seq` return $ Just e) es)

-- | Initializes a new bag of tasks and starts a gang of workers threads. The
--   number of worker threads is equal to the number of capabilities of the
--   Haskell runtime (see 'Control.Concurrent.getNumCapabilities').
--
--   __WARNING__: If it may be necessary to terminate the thread pool, i.e.
--   because the result processing function does not always request all values,
--   you have to make sure that the task can be stopped.
--   Terminating the tasks is done with asynchronous exceptions which can only be
--   received at a \emph{safe point}. Safe points are all points where memory
--   allocation is requested, but there are calculations and also loops which never
--   need any new memory. These calculations cannot be terminated and may run
--   forever, see the documentation of 'Control.Exception.Base.throwTo'.
newTaskBag :: (MonadIO m) =>
              BufferType
           -> [TaskIO r (Maybe r)] -- ^ list of initial tasks
           -> BagT r m a         -- ^ action to process the results of the bag
           -> m a
newTaskBag buf ts act = do
  bag  <- Basic.newBag_ buf
  mapM_ (\t -> Basic.addTask bag (runTaskIO t (Basic.addTask bag))) ts
  Basic.noMoreTasks bag
  result <- runBagT act bag
  Basic.terminateBag bag
  return result

newInterruptibleBag :: (MonadIO m) =>
                       BufferType
                    -> [Interruptible r]
                    -> BagT r m a
                    -> m a
newInterruptibleBag buf ts = do
  newTaskBag buf $ map runInterruptible ts

-- | Similar to 'newTaskBag', but interrupts the tasks in certain intervals.
--   Using a 'Control.Concurrent.STM.TChan' as buffer, this ensures
--   completeness: all tasks that have a result will get their time to evaluate
--   it. Note, that calculations, that do no memory allocation, cannot be
--   interrupted.
newInterruptingBag :: (MonadIO m) =>
                      BufferType
                   -> [Interruptible r]
                   -> BagT r m a
                   -> m a
newInterruptingBag buf ts = do
  newTaskBag buf $ map runInterrupted ts