-- | High level bag of tasks interface needing no explicit control. Tasks can
--   only return results and add new tasks as intended and it is not possible
--   to add new tasks from the outside or from the action processing the
--   results. This way it is possible to ensure that 'getResults' returns only
--   'Nothing' if it is safe to say that there will be no results anymore.
module Control.Concurrent.Bag.Safe
  ( Task
  , BagT
  , newTaskBag
  , newEvalBag
  , getResult
  , getAllResults
  , addTask
  , liftIO
  , lift
  , TaskBufferSTM (..)
  , SplitFunction
  , takeFirst )
where

import Control.Applicative
import Control.Monad.Reader
import Control.Concurrent.Bag.TaskBuffer
  ( TaskBufferSTM (..)
  , SplitFunction
  , takeFirst )
import qualified Control.Concurrent.Bag.Basic as Basic

-- | A monad in which tasks can be specified.
--   Task is instancing 'MonadIO' and it therefore has the function 'liftIO' to
--   perform arbitrary IO actions. Tasks may or may not return a value. If it
--   returns a value, this value is written back as a result.
--   Additionally there is a function 'addTask' to
--   add new tasks to the bag.
--   The parameter /r/ is the result type of the corresponding bag.
newtype Task b r a = Task { getTaskReader :: ReaderT (Basic.Bag b r) IO a }

instance Functor (Task b r) where
  fmap = liftM

instance Applicative (Task b r) where
  pure  = return
  (<*>) = ap

instance Monad (Task b r) where
  return = Task . return
  (Task a) >>= b = Task $ a >>= getTaskReader . b

instance MonadIO (Task b r) where
  liftIO act = Task $ lift act

runTask :: Task b r (Maybe r) -> Basic.Bag b r -> IO (Maybe r)
runTask = runReaderT . getTaskReader

-- | A monad transformer for processing the results of the bag sequencially.
--   In addition to the actions available in the base monad, which has to be
--   an instance of MonadIO in all functions, it provides the action
--   'getResult' to get a result of the bag.
newtype BagT b r m a = BagT { getBagReader :: ReaderT (Basic.Bag b r) m a }

instance Monad m => Functor (BagT b r m) where
  fmap = liftM

instance Monad m => Applicative (BagT b r m) where
  pure  = return
  (<*>) = ap

instance Monad m => Monad (BagT b r m) where
  return = BagT . return
  (BagT a) >>= b = BagT $ a >>= getBagReader . b

instance MonadTrans (BagT b r) where
  lift act = BagT $ lift act

instance MonadIO m => MonadIO (BagT b r m) where
  liftIO act = BagT $ liftIO act

runBagT :: BagT b r m a -> Basic.Bag b r -> m a
runBagT (BagT reader) = runReaderT reader

-- | Like 'newTaskBag', but it takes a list of expressions that will be
--   evaluated to weak head normal form using 'Prelude.seq'.
--
--   __WARNING__: This does not evaluate to normal form, but only to weark head
--   normal form.
newEvalBag :: (MonadIO m, TaskBufferSTM b) =>
              Maybe (SplitFunction b r)
           -> [r]
           -> BagT b r m a
           -> m a
newEvalBag split es =
  newTaskBag split (map (\e -> e `seq` return $ Just e) es)

-- | Initializes a new bag of tasks and starts a gang of workers threads. The
--   number of worker threads is equal to the number of capabilities of the
--   Haskell runtime (see 'Control.Concurrent.getNumCapabilities').
--
--   __WARNING__: If it may be necessary to terminate the thread pool, i.e.
--   because the result processing function does not always request all values,
--   you have to make sure that the task can be stopped.
--   Terminating the tasks is done with asynchronous exceptions which can only be
--   received at a \emph{safe point}. Safe points are all points where memory
--   allocation is requested, but there are calculations and also loops which never
--   need any new memory. These calculations cannot be terminated and may run
--   forever, see the documentation of 'Control.Exception.Base.throwTo'.
newTaskBag :: (MonadIO m, TaskBufferSTM b) =>
              Maybe (SplitFunction b r)
                                   -- ^ Possible split function
                                   --   If the function is given, we will create
                                   --   a bag with one buffer per worker
                                   --   reducing the communication between the
                                   --   workers.
           -> [Task b r (Maybe r)] -- ^ list of initial tasks
           -> BagT b r m a         -- ^ action to process the results of the bag
           -> m a
newTaskBag split ts act = do
  bag  <- Basic.newBag_ split
  mapM_ (\t -> Basic.addTask bag (runTask t bag)) ts
  Basic.noMoreTasks bag
  result <- runBagT act bag
  Basic.terminateBag bag
  return result

-- | Get a result of the bag if there is one. If it returns Nothing, all tasks
--   have been processed and there are no results left. 'getResults' blocks
--   until a task has been evaluated to a result or all tasks are processed.
--   Therefore it may block forever.
getResult :: (MonadIO m, TaskBufferSTM b) => BagT b r m (Maybe r)
getResult =
  BagT $ ask >>= Basic.getResult

-- | Convenience function to get all results from the bag of tasks.
getAllResults :: (MonadIO m, TaskBufferSTM b) => BagT b a m [a]
getAllResults = do
  mx  <- getResult
  case mx of
    Just x -> do
      xs <- getAllResults
      return $ x:xs
    Nothing ->
      return   []

-- | Add a task to the bag of tasks.
addTask :: (TaskBufferSTM b) => Task b r (Maybe r) -> Task b r ()
addTask task =
  Task $ do
    bag <- ask
    Basic.addTask bag (runTask task bag)