module Scripting.Parallel.ThreadPool
       (
         -- * Helpers for parallel scripting, i.e. prepping input to `hydraPrint`
         parForM
       )
       where

import Data.IORef
import Control.Monad (forM)
import Control.Concurrent
import Control.Concurrent.Chan
import qualified Control.Concurrent.Async as A
import Prelude as P

import qualified System.IO.Streams as S
-- import System.IO.Streams (InputStream, OutputStream)
import System.IO.Streams.Concurrent (chanToInput, chanToOutput)

--------------------------------------------------------------------------------


-- | A helper for parallel scripting.  Run work items in parallel on N worker threads
-- (a thread pool), creating only ONE output stream per worker, not one per parallel
-- task.  Specifically, each invocation of the user's function is given an
-- OutputStream that it holds a "lock" on -- it is the only thread accessing that
-- output stream.
--
-- This function returns immediately with
--   (1) a list of input streams that will carry results on the fly, as they are produced, and
--   (2) a barrier action that waits for all parallel work to be finished and yields the final results.
-- The first list is `numWorkers` long, and the second is `numTasks`.
--
-- Additional contracts: 
parForM :: Int -> [a] -> (S.OutputStream c -> a -> IO b) -> IO ([S.InputStream c], IO [b])
parForM numWorkers inputs action = 
  do let numTasks = P.length inputs
     answers     <- sequence$ P.replicate numTasks newEmptyMVar
     workLeft    <- newIORef (P.zip inputs answers)
     chans       <- sequence $ P.replicate numWorkers newChan
     resultStrms <- mapM chanToInput  chans 
     outStrms    <- mapM chanToOutput chans
     ------------
     -- That didn't seem to work.... let's try a different way of creating an input/output pair.     
     ------------     
     asyncs <- forM (zip outStrms [(0::Int)..]) $ \ (strm,idx) -> 
       A.async $ do
--           B.hPutStrLn System.IO.stderr (B.pack$ "INSIDE ASYNC, "++show idx++" of "++show (P.length outStrms))
           let pfloop = do -- Pop work off the queue:                           
                           x <- atomicModifyIORef workLeft 
                                 (\ls -> if P.null ls 
                                         then ([], Nothing) 
                                         else (P.tail ls, Just ((\ (h:_)->h) ls)))
--                           B.hPutStrLn System.IO.stderr (B.pack$ "POPPED work "++show (idx, fmap (const ()) x))
                           case x of 
                             Nothing -> S.write Nothing strm -- End the stream.
                             Just (input,mv) -> 
                               do result <- action strm input
                                  putMVar mv result
                                  pfloop
           pfloop
     let barrier = do 
           mapM_ A.wait asyncs -- For exception handling.
           mapM readMVar answers

     return (resultStrms, barrier)