{-|
Module      : Control.Concurrent.Bag.Safe
Description : Transformer based bag of tasks interface using STM
Copyright   : (c) Bastian Holst, 2014
License     : BSD3
Maintainer  : bastianholst@gmx.de
Stability   : experimental
Portability : POSIX

High level bag of tasks interface based on "Control.Concurrent.Bag.Basic".
Tasks can only return results and add new tasks as intended and it is not
possible to add new tasks from the outside or from the action processing the
results. This way it is possible to ensure that 'getResults' returns only
'Nothing' if it is safe to say that there will be no results anymore.
-}
module Control.Concurrent.Bag.Safe
  ( BagT
  , newTaskBag
  , newEvalBag
  , newInterruptibleBag
  , newInterruptingBag
  , getResult
  , getAllResults
  , liftIO
  , lift
  , TaskBufferSTM (..)
  , SplitFunction
  , takeFirst
  , module Control.Concurrent.Bag.Task
  , BufferType (..) )
where

import Control.Applicative
import Control.Monad.Reader
import Control.Concurrent.Bag.TaskBufferSTM
  ( TaskBufferSTM (..)
  , SplitFunction
  , takeFirst
  , BufferType (..) )
import qualified Control.Concurrent.Bag.Basic as Basic
import Control.Concurrent.Bag.Task
import Control.Concurrent.STM.TChan (TChan)
import Control.Concurrent.STM.TStack (TStack)
import Control.Concurrent.Bag.BagT

-- | Run the bag transformer.
runBagT :: MonadIO m => BagT r m a -> Basic.Bag r -> m a
runBagT (BagT reader) bag = runReaderT reader (Basic.getResult bag)

-- | Like 'newTaskBag', but it takes a list of expressions that will be
--   evaluated to weak head normal form using 'Prelude.seq'.
--
--   __WARNING__: This does not evaluate to normal form, but only to weak head
--   normal form.
newEvalBag :: (MonadIO m) =>
              BufferType
              -- ^ buffer type
           -> Maybe (SplitFunction r)
              -- ^ Possible split function
              --   If the function is given, we will create
              --   a bag with one buffer per worker
              --   reducing the communication between the
              --   workers.
           -> [r]
              -- ^ expressions to evaluate
           -> BagT r m a
              -- ^ action to process the results of the bag
           -> m a
newEvalBag buf split es =
  newTaskBag buf split (map (\e -> e `seq` return $ Just e) es)

-- | Initializes a new bag of tasks and starts a gang of workers threads. The
--   number of worker threads is equal to the number of capabilities of the
--   Haskell runtime (see 'Control.Concurrent.getNumCapabilities').
--
--   __WARNING__: If it may be necessary to terminate the thread pool, i.e.
--   because the result processing function does not always request all values,
--   you have to make sure that the task can be stopped.
--   Terminating the tasks is done with asynchronous exceptions which can only be
--   received at a \emph{safe point}. Safe points are all points where memory
--   allocation is requested, but there are calculations and also loops which never
--   need any new memory. These calculations cannot be terminated and may run
--   forever, see the documentation of 'Control.Exception.Base.throwTo'.
newTaskBag :: (MonadIO m) =>
              BufferType           -- ^ buffer type
           -> Maybe (SplitFunction r)
                                   -- ^ Possible split function
                                   --
                                   --   If the function is given, we will create
                                   --   a bag with one buffer per worker
                                   --   reducing the communication between the
                                   --   workers.
           -> [TaskIO r (Maybe r)] -- ^ list of initial tasks
           -> BagT r m a           -- ^ action to process the results of the bag
           -> m a
newTaskBag buf split ts act = do
  bag  <- Basic.newBag_ buf split
  mapM_ (\t -> Basic.addTask bag (runTaskIO t (Basic.addTask bag) (Basic.writeResult bag))) ts
  Basic.noMoreTasks bag
  result <- runBagT act bag
  Basic.terminateBag bag
  return result

-- | Similar to 'newTaskBag', but taking a list of 'Interruptible' instead of
--   tasks.
newInterruptibleBag :: MonadIO m =>
                       BufferType
                       -- ^ buffer type
                    -> Maybe (SplitFunction r)
                       -- ^ Possible split function
                       --
                       --   If the function is given, we will create
                       --   a bag with one buffer per worker
                       --   reducing the communication between the
                       --   workers.
                    -> [Interruptible r]
                       -- ^ list of initial tasks
                    -> BagT r m a
                       -- ^ action to process the results of the bag
                    -> m a
newInterruptibleBag buf split ts = do
  newTaskBag buf split $ map runInterruptible ts

-- | Similar to 'newInterruptibleBag', but interrupts the tasks in certain intervals.
--   Using a 'Control.Concurrent.STM.TChan' as buffer, this ensures
--   completeness: all tasks that have a result will get their time to evaluate
--   it. Note, that calculations, that do no memory allocation, cannot be
--   interrupted.
newInterruptingBag :: (MonadIO m) =>
                      BufferType
                      -- ^ buffer type
                   -> Maybe (SplitFunction r)
                      -- ^ Possible split function
                      --
                      --   If the function is given, we will create
                      --   a bag with one buffer per worker
                      --   reducing the communication between the
                      --   workers.
                   -> [Interruptible r]
                      -- ^ list of initial tasks
                   -> BagT r m a
                      -- ^ action to process the results of the bag
                   -> m a
newInterruptingBag buf split ts = do
  newTaskBag buf split $ map runInterrupted ts