{-|
Module      : Control.Concurrent.Bag.Implicit
Description : Lazy list based bag of tasks interface using STM
Copyright   : (c) Bastian Holst, 2014
License     : BSD3
Maintainer  : bastianholst@gmx.de
Stability   : experimental
Portability : POSIX

High level bag of tasks interface based on "Control.Concurrent.Bag.Basic".
Tasks can only return results and add new tasks as intended and it is not
possible to add new tasks from the outside or from the action processing the
results.
-}
module Control.Concurrent.Bag.Implicit
 ( module Control.Concurrent.Bag.Task
 , newTaskBag
 , newEvalBag
 , SplitFunction
 , takeFirst
 , newInterruptingBag
 , newInterruptibleBag
 , BufferType (..)
 )
where

import Data.IORef
import Control.Monad.IO.Class
import qualified Control.Concurrent.Bag.Basic as Basic
import Control.Concurrent.Bag.Task
import Control.Concurrent.Bag.TaskBufferSTM
  ( TaskBufferSTM (..)
  , SplitFunction
  , takeFirst
  , BufferType (..) )
import System.IO.Unsafe (unsafeInterleaveIO)
import Control.Concurrent.STM.TChan (TChan)
import Control.Concurrent.STM.TStack (TStack)

-- | Build and start a new task bag.
--
--   The returned list is not evaluated yet, but will be evaluated on demand
--   later. Already computed results will be kept in a buffer until the
--   result list is evaluated. If the result list gets garbage collected,
--   the bag of tasks will be stopped automatically.
newTaskBag :: BufferType           -- ^ buffer type
           -> Maybe (SplitFunction r)
                                   -- ^ Possible split function
                                   --   If the function is given, we will create
                                   --   a bag with one buffer per worker
                                   --   reducing the communication between the
                                   --   workers.
           -> [TaskIO r (Maybe r)] -- ^ list of initial tasks
           -> IO [r]               -- ^ lazy result list
newTaskBag buf split ts = do
  bag <- Basic.newBag_ buf split
  mapM_ (\t -> Basic.addTask bag (runTaskIO t (Basic.addTask bag) (Basic.writeResult bag))) ts
  Basic.noMoreTasks bag
  dummy <- newIORef ()
  _     <- mkWeakIORef dummy (Basic.terminateBag bag)
  lazyResults bag dummy
 where
  lazyResults bag dummy = unsafeInterleaveIO $ do
    r <- Basic.getResult bag
    case r of
      Nothing -> do
        writeIORef dummy ()
        return []
      Just v  -> do
        vs <- lazyResults bag dummy
        return (v:vs)

-- | Like 'newTaskBag', but it takes a list of expressions that will be
--   evaluated to weak head normal form using 'Prelude.seq'.
--
--   __WARNING__: This does not evaluate to normal form, but only to weak head
--   normal form.
newEvalBag :: BufferType
              -- ^ buffer type
           -> Maybe (SplitFunction r)
              -- ^ Possible split function
              --
              --   If the function is given, we will create
              --   a bag with one buffer per worker
              --   reducing the communication between the
              --   workers.
           -> [r]
              -- ^ given expressions to evaluate
           -> IO [r]
              -- ^ lazy result list
newEvalBag buf split es =
  newTaskBag buf split (map (\e -> e `seq` return $ Just e) es)

-- | Similar to 'newTaskBag', but taking a list of 'Interruptible' instead of
--   tasks.
newInterruptibleBag :: BufferType
                       -- ^ buffer type
                    -> Maybe (SplitFunction r)
                       -- ^ Possible split function
                       --
                       --   If the function is given, we will create
                       --   a bag with one buffer per worker
                       --   reducing the communication between the
                       --   workers.
                    -> [Interruptible r]
                       -- ^ list of initial tasks
                    -> IO [r]
                       -- ^ lazy result list
newInterruptibleBag buf split ts = do
  newTaskBag buf split $ map runInterruptible ts

-- | Similar to 'newInterruptibleBag', but interrupts the tasks in certain intervals.
--   Using a 'Control.Concurrent.STM.TChan' as buffer, this ensures
--   completeness: all tasks that have a result will get their time to evaluate
--   it. Note, that calculations, that do no memory allocation, cannot be
--   interrupted.
newInterruptingBag :: BufferType
                      -- ^ buffer type
                   -> Maybe (SplitFunction r)
                      -- ^ Possible split function
                      --
                      --   If the function is given, we will create
                      --   a bag with one buffer per worker
                      --   reducing the communication between the
                      --   workers.
                   -> [Interruptible r]
                      -- ^ list of initial tasks
                   -> IO [r]
                      -- ^ lazy result list
newInterruptingBag buf split ts = do
  newTaskBag buf split $ map runInterrupted ts