{-# LANGUAGE RankNTypes, NamedFieldPuns, BangPatterns,
             ExistentialQuantification, CPP, ParallelListComp
	     #-}
{- OPTIONS_GHC -Wall -fno-warn-name-shadowing -fno-warn-unused-do-bind -}

-- | This module exposes the internals of the @Par@ monad so that you
-- can build your own scheduler or other extensions.  Do not use this
-- module for purposes other than extending the @Par@ monad with new
-- functionality.

module Control.Monad.Par.Scheds.TraceInternal (
   Trace(..), Sched(..), Par(..),
   IVar(..), IVarContents(..),
   sched,
   runPar, runParAsync, runParAsyncHelper,
   new, newFull, newFull_, get, put_, put,
   pollIVar, yield,
 ) where


import Control.Monad as M hiding (sequence, join)
import Prelude hiding (mapM, sequence)
import Data.IORef
import System.IO.Unsafe
import Control.Concurrent hiding (yield)
import GHC.Conc hiding (yield)
import Control.DeepSeq
import Control.Applicative
import Data.Array
import Data.List (partition, find)
--import Text.Printf


-- ---------------------------------------------------------------------------
-- MAIN SCHEDULING AND RUNNING
-- ---------------------------------------------------------------------------

data Trace = forall a . Get (IVar a) (a -> Trace)
           | forall a . Put (IVar a) a Trace
           | forall a . New (IVarContents a) (IVar a -> Trace)
           | Fork Trace Trace
           | Done
           | Yield Trace

data Sched = Sched
  { no          :: {-# UNPACK #-} !ThreadNumber,
        -- ^ The threadnumber of this worker
    workpool    :: IORef WorkPool,
        -- ^ The workpool for this worker
    status      :: IORef AllStatus,
        -- ^ The Schedulers' status
    scheds      :: Array ThreadNumber Sched,
        -- ^ The list of all workers by thread
    tId         :: IORef ThreadId
        -- ^ The ThreadId of this worker
  }

type ThreadNumber = Int
type UId = Int
type CountRef = IORef Int
type WorkLimit = (UId, CountRef)
-- ^ The UId and the count of tasks left or Nothing if there's no limit
--   When the UId is -1, it means that the worker will remain alive until 
--   purposely killed (by globalThreadShutdown).
--
-- The reason for a work limit is to make sure that nested threads properly exit.
-- Imagine a scenario where thread A, a worker thread, encounters a runPar.  It 
-- recursively enters worker status, but it needs ot leave worker status at some 
-- point to finish the task that caused it to call runPar.  Suppose now that it 
-- encounters another call to runPar.  If it has the ability to finish and return, 
-- we must make sure it returns first for the nested runPar or else it will return 
-- to the wrong place!  The work limit helps achieve this.
--
-- TODO: Perhaps the work limit need not restrict what a thread can work on, but 
-- instead it simply provides the singular point that a thread is allowed to return 
-- from.  The only concern is some potential for bad blocking - is that a legit 
-- concern?

isWLUId :: WorkLimit -> (UId -> Bool) -> Bool
--isWLUId Nothing _ = False
isWLUId (uid, _) op = op uid

shouldEndWorkSet :: WorkLimit -> IO Bool
shouldEndWorkSet (u,_) | u == -1 = return False
shouldEndWorkSet (_, cr) = do
    c <- readIORef cr
    return (c == 0)

idleAtWL :: WorkLimit -> MVar Bool -> Idle
--idleAtWL Nothing m = Idle Nothing m
idleAtWL (uid, _) m = Idle uid m

-- | The main scheduler loop.
--   This takes the synchrony flag, our Sched, the particular work queue we're
--   currently working on, the uid of the work queue (for pushing work), our 
--   work limit, and the already-popped, first trace in the work queue.
--
--   INVARIANT: This should only be called by threads who ARE currently marked
--              as working.
sched :: Bool -> WorkLimit -> Sched -> (IORef [Trace]) -> UId -> Trace -> IO ()
sched _doSync wl q@Sched{status, workpool} queueref uid t = loop t
 where 
  loop t = case t of
    New a f -> do
      r <- newIORef a
      loop (f (IVar r))
    Get (IVar v) c -> do
      e <- readIORef v
      case e of
         Full a -> loop (c a)
         _other -> do
           r <- atomicModifyIORef v $ \e -> case e of
                    Empty      -> (Blocked [c],    go)
                    Full a     -> (Full a,         loop (c a))
                    Blocked cs -> (Blocked (c:cs), go)
           r
    Put (IVar v) a t  -> do
      cs <- atomicModifyIORef v $ \e -> case e of
               Empty    -> (Full a, [])
               Full _   -> error "multiple put"
               Blocked cs -> (Full a, cs)
      mapM_ (pushWork status uid queueref . ($a)) cs
      loop t
    Fork child parent -> do
         pushWork status uid queueref child
         loop parent
    Done ->
         if _doSync
         then go
                -- We could fork an extra thread here to keep numCapabilities workers
                -- even when the main thread returns to the runPar caller...
         else do -- putStrLn " [par] Forking replacement thread..\n"
                 forkIO go; return ()
                -- But even if we don't we are not orphaning any work in this
                -- thread's work-queue because it can be stolen by other threads.
                --	 else return ()

    Yield parent -> do 
        -- Go to the end of the worklist:
        -- TODO: Perhaps consider Data.Seq here.
        -- This would also be a chance to steal and work from opposite ends of the queue.
        atomicModifyIORef queueref $ \ts -> (ts++[parent],())
        go
  go = do
    mt <- atomicPopIORef queueref
    case mt of
      Just t  -> loop t
      Nothing -> do
            -- SCARY: we better be working on the top queue in the pool!
        cr <- wpRemoveWork uid workpool
        workDone <- decWorkerCount uid cr status
            -- If this uid is our workLimit id AND worker count == 0, then 
            -- we should just return () rather than calling reschedule q
        unless (isWLUId wl (== uid) && workDone) $
               reschedule wl q


-- | Process the next work queue on the work pool, or failing that, go into 
--   work stealing mode.
--
--   INVARIANT: This should only be called by threads who are NOT currently 
--              marked as working (or if they are, the task they were working 
--              on executed a runPar).
reschedule :: WorkLimit -> Sched -> IO ()
reschedule wl q@Sched{ workpool, status } = do
    wp <- readIORef workpool
    case wp of
        Work uid cr wqref _ | isWLUId wl (uid >=) -> do
            incWorkerCount cr
            nextTrace <- atomicPopIORef wqref
            case nextTrace of
                Just t  -> sched True wl q wqref uid t
                Nothing -> do
                    wpRemoveWork uid workpool
                    workDone <- decWorkerCount uid cr status
                        -- If this uid is our workLimit id AND worker count == 0, then 
                        -- we should just return () rather than calling reschedule q
                    unless (isWLUId wl (== uid) && workDone) $
                           reschedule wl q
        _ -> steal wl q


-- RRN: Note -- NOT doing random work stealing breaks the traditional
-- Cilk time/space bounds if one is running strictly nested (series
-- parallel) programs.

-- | Attempt to steal work or, failing that, give up and go idle.
steal :: WorkLimit -> Sched -> IO ()
steal wl q@Sched{ status, scheds, no=my_no } = 
  -- printf "cpu %d stealing\n" my_no >> 
  go l
  where
    (l,u) = bounds scheds
    go n
      | n > u = do
            -- Prepare to go idle
          m  <- newEmptyMVar
          atomicModifyIORef status $ addIdler (idleAtWL wl m)
            -- Check to see if this workset is ready to close
          s <- shouldEndWorkSet wl
          if s
            then do
                    -- Time to close this workset
                --printf "cpu %d shutting down workset %d\n" my_no myPriLimit
                endWorkSet status (fst wl)
                return ()
            else do
                    -- There's more work being done here, so I'll go idle
                finished <- takeMVar m
                unless finished $ go l
      | n == my_no = go (n+1)
      | otherwise  = readIORef (workpool (scheds!n)) >>= tryToSteal
          where
            tryToSteal (Work uid cr wqref wp) | isWLUId wl (uid >=) = do
                incWorkerCount cr
                stolenTrace <- atomicPopIORef wqref
                case stolenTrace of
                    Nothing -> decWorkerCount uid cr status >> tryToSteal wp
                    Just t  -> do
                        sublst <- newIORef []
                        atomicModifyIORef (workpool q) $ \wp' -> (Work uid cr sublst wp', ())
                        sched True wl q sublst uid t
            tryToSteal _ = go (n+1)


-- ---------------------------------------------------------------------------
-- UTILITY FUNCTIONS
-- ---------------------------------------------------------------------------

-- | Push work.  Then, find an idle worker with uid less than the pushed work.
-- If one is found, wake it up.
pushWork :: IORef AllStatus -> UId -> (IORef [Trace]) -> Trace -> IO ()
pushWork status uid wqref t = do
    atomicModifyIORef wqref $ (\ts -> (t:ts, ()))
    allstatus <- readIORef status
    when (hasIdleWorker uid allstatus) $ do
        r <- atomicModifyIORef status $ getIdleWorker uid
        case r of
            Just b  -> putMVar b False
            Nothing -> return ()

-- | A utility function for decreasing the task count of a work set.
-- If the count becomes 0, endWorkSet is called on the work set.
decWorkerCount :: UId -> CountRef -> IORef AllStatus -> IO Bool
decWorkerCount uid countref status = do
    done <- atomicModifyIORef countref $ 
        (\n -> if n == 0 then error "Impossible value in decWorkerCount" else (n-1, n == 1))
    when done $ (endWorkSet status uid >> globalWorkComplete uid)
    return done

-- | A utility function for increasing the task count of a work set.
incWorkerCount :: CountRef -> IO ()
incWorkerCount countref = do
    atomicModifyIORef countref $ (\n -> (n+1, ()))

-- | A utility for popping an element off of an IORef list.
-- The return value is Just a where a is the head of the list
-- or Nothing if the list is null.
atomicPopIORef :: IORef [a] -> IO (Maybe a)
atomicPopIORef ref = atomicModifyIORef ref $ \lst ->
    case lst of
        []      -> ([], Nothing)
        (e:es)  -> (es, Just e)

-- ---------------------------------------------------------------------------
-- IDLING STATUS
-- ---------------------------------------------------------------------------

data Idle    = Idle    {-# UNPACK #-} !UId (MVar Bool)
data ExtIdle = ExtIdle {-# UNPACK #-} !UId (MVar ())
type AllStatus = ([Idle], [ExtIdle])

-- | A new empty PQueue of Statuses
newStatus :: AllStatus
newStatus = ([], [])

-- | Adds a new Idler to the AllStatus.
addIdler :: Idle -> AllStatus -> (AllStatus, ())
addIdler i@(Idle u _) (is, es) = ((insert is, es), ())
    where insert [] = [i]
          insert xs@(i'@(Idle u' _):xs') = if u <= u'
            then i  : xs
            else i' : insert xs'

-- | Adds a new External idler to the AllStatus.
addExtIdler :: ExtIdle -> AllStatus -> (AllStatus, ())
addExtIdler e (is, es) = ((is, e:es), ())

-- | Returns an idle worker with uid less than or equal to the given one 
--   (if it exists) and removes it from the AllStatus
getIdleWorker :: UId -> AllStatus -> (AllStatus, Maybe (MVar Bool))
getIdleWorker u q = case q of
    ([],_) -> (q, Nothing)
    ((Idle u' m'):rst, es) -> if u' <= u then ((rst,es), Just m') else (q, Nothing)

-- | Returns true if there is an idle worker with uid less than the given one
hasIdleWorker :: UId -> AllStatus -> Bool
hasIdleWorker uid q = case getIdleWorker uid q of
    (_, Nothing) -> False
    (_, Just _)  -> True

-- | Wakes up all idle workers at the given uid with the True signal
endWorkSet :: IORef AllStatus -> UId -> IO ()
endWorkSet status uid = do
    (is, es) <- atomicModifyIORef status $ getAllAtID
    mapM_ (\(ExtIdle _ mb) -> putMVar mb ())   es
    mapM_ (\(Idle _ mb)    -> putMVar mb True) is
    where
      getAllAtID (is, es) = ((is', es'), (elems1, elems2))
        where
          (elems1, is') = partition (\(Idle    u _) -> u == uid) is
          (elems2, es') = partition (\(ExtIdle u _) -> u == uid) es


-- ---------------------------------------------------------------------------
-- WorkPool
-- ---------------------------------------------------------------------------

-- | The WorkPool keeps a queue where each element has a UId, a list of 
--   traces, and the countRef of how many workers are working on Traces 
--   of this UId.
--
--   It should be that by the natural pushing done in sched, this pool 
--   should always be in order.  We take advantage of this by making 
--   guarantees but not actually checking at runtime whether they're true.
data WorkPool = Work {-# UNPACK #-} !UId CountRef (IORef [Trace]) WorkPool | NoWork

-- | Pop the next work queue from the work pool.  This should only be called 
--   if both the work pool contains a pool, and the queue in that pool is 
--   empty.  Thus, it should only be called by the pool's owner.
wpRemoveWork :: UId -> IORef WorkPool -> IO CountRef
wpRemoveWork uid pRef = atomicModifyIORef pRef f
  where f :: WorkPool -> (WorkPool, CountRef)
        f (Work uid' cr' _ p') | uid == uid' = (p', cr')
        f (Work uid' cr' wq' p') = 
            let (p'', cr'') = f p'
            in (Work uid' cr' wq' p'', cr'')
        f NoWork = error "Impossible state in wpRemoveWork"


-- ---------------------------------------------------------------------------
-- PAR AND IVAR
-- ---------------------------------------------------------------------------

newtype Par a = Par {
    runCont :: (a -> Trace) -> Trace
}

instance Functor Par where
    fmap f m = Par $ \c -> runCont m (c . f)

instance Monad Par where
    return a = Par ($ a)
    m >>= k  = Par $ \c -> runCont m $ \a -> runCont (k a) c

instance Applicative Par where
   (<*>) = ap
   pure  = return

newtype IVar a = IVar (IORef (IVarContents a))
-- data IVar a = IVar (IORef (IVarContents a))

data IVarContents a = Full a | Empty | Blocked [a -> Trace]

-- Forcing evaluation of a IVar is fruitless.
instance NFData (IVar a) where
  rnf _ = ()

-- From outside the Par computation we can peek.  But this is
-- nondeterministic; it should perhaps have "unsafe" in the name.
pollIVar :: IVar a -> IO (Maybe a)
pollIVar (IVar ref) = 
  do contents <- readIORef ref
     case contents of 
       Full x -> return (Just x)
       _      -> return (Nothing)


-- ---------------------------------------------------------------------------
-- GLOBAL THREAD IDENTIFICATION
-- ---------------------------------------------------------------------------

-- Global thread identification is handled byt the globalThreadState object.
-- The main way to interact with this object is to attempt to establish global 
-- Scheds, shut down the threads and clear the Scheds, or to mark a work set 
-- as complete.

data GlobalThreadState = GTS (Array ThreadNumber Sched) !UId !Int

-- | This is the global thread state variable
globalThreadState :: IORef (Maybe GlobalThreadState)
globalThreadState = unsafePerformIO $ newIORef $ Nothing

-- | This is called when a work set completes (see decWorkerCount).
--   We do this so that we can know if it's okay to do a 
--   globalThreadShutdown.
globalWorkComplete :: UId -> IO ()
globalWorkComplete _ = 
    atomicModifyIORef globalThreadState f
    where f Nothing               = error "Impossible state in globalWorkComplete."
          f (Just (GTS retA n c)) = (Just (GTS retA n (c+1)), ())

-- | Attempts to set the global Scheds.  If they are already extablished, 
--   this returns a Failure with a new UId (to interact with the global 
--   threads) and the current global Scheds.  Otherwise, it establishes 
--   the given array as the global Scheds, and returns a Success containing 
--   the UId to use.
data GTSResult = Success UId | Failure UId (Array ThreadNumber Sched)
globalEstablishScheds :: Array ThreadNumber Sched -> IO GTSResult
globalEstablishScheds a = 
    atomicModifyIORef globalThreadState f
    where f Nothing               = (Just (GTS a    1     0), Success 0)
          f (Just (GTS retA n c)) = (Just (GTS retA (n+1) c), Failure n retA)

-- | Attempts to shutdown the global threads.  If there are unfinished tasks, 
--   this shuts down nothing and returns False.  Otherwise, this shuts down 
--   all threads, un-establishes the global Scheds, and returns True.
--   If the Scheds are currently unestablished, this does nothing and returns 
--   False.
--
-- TODO: This can sometimes leave threads hanging who are not doing any work 
--       but have not yet marked themselves as idle.  Things won't exactly 
--       break, but there may be MVar errors that are thrown.
globalThreadShutdown :: IO Bool
globalThreadShutdown = do 
    ma <- atomicModifyIORef globalThreadState f
    case ma of
      Nothing -> return False
      Just a  -> do
        let s = status $ a ! (fst $ bounds a)
        (is, es) <- atomicModifyIORef s $ \x -> (newStatus, x)
        mapM_ (\(ExtIdle _ m)  -> putMVar m ())    es
        mapM_ (\(Idle    _ mb) -> putMVar mb True) is
        return True
    where f (Just (GTS a n c)) | n == c = (Nothing, Just a)
          f gts = (gts, Nothing)


-- ---------------------------------------------------------------------------
-- RUNPAR
-- ---------------------------------------------------------------------------

-- [Notes on threadCapability]
--
-- We create a thread on each CPU with forkOnIO.  Ideally, the CPU on 
-- which the current thread is running will host the main thread; the 
-- other CPUs will host worker threads.
--
-- This is possible using threadCapability, but this requires
-- GHC 7.1.20110301, because that is when threadCapability was added.
--
-- Lacking threadCapability, we always pick CPU #0 to run the main
-- thread.  If the current thread is not running on CPU #0, this
-- will require some data to be shipped over the memory bus, and
-- hence will be slightly slower than the version using threadCapability.
--
-- If this is a nested runPar call, then we can do slightly better.  We 
-- can look at the current workers' ThreadIds and see if we are one of 
-- them.  If so, we do the work on that core.  If not, we are once again 
-- forced to choose arbitrarily, so we send the work to CPU #0.
--


{-# INLINE runPar_internal #-}
runPar_internal :: Bool -> Par a -> a
runPar_internal _doSync x = unsafePerformIO $ do
        -- Set up the schedulers
    myTId <- myThreadId
    tIds <- replicateM numCapabilities $ newIORef myTId
    workpools <- replicateM numCapabilities $ newIORef NoWork
    statusRef <- newIORef newStatus
    let states = listArray (0, numCapabilities-1)
                    [ Sched { no=n, workpool=wp, status=statusRef, scheds=states, tId=t }
                    | n <- [0..] | wp <- workpools | t <- tIds ]
    res <- globalEstablishScheds states
    case res of
      Success uid -> do
#if __GLASGOW_HASKELL__ >= 701 /* 20110301 */
            -- See [Notes on threadCapability] for more details
        (main_cpu, _) <- threadCapability =<< myThreadId
#else
        let main_cpu = 0
#endif
        currentWorkers <- newIORef 1
        let workLimit' = (-1, undefined)
        let workLimit = (0, currentWorkers)
        
        m <- newEmptyMVar
        rref <- newIORef Empty
        atomicModifyIORef statusRef $ addExtIdler (ExtIdle uid m)
        forM_ (elems states) $ \(state@Sched{no=cpu}) -> do
          forkOnIO cpu $ do
            myTId <- myThreadId
            --printf "cpu %d setting threadId=%s\n" cpu (show myTId)
            writeIORef (tId state) myTId
            if (cpu /= main_cpu)
              then reschedule workLimit' state
              else do
                sublst <- newIORef []
                atomicModifyIORef (workpool state) $ \wp -> (Work uid currentWorkers sublst wp, ())
                sched _doSync workLimit state sublst uid $ runCont (x >>= put_ (IVar rref)) (const Done)
        takeMVar m
        --printf "done\n"
        r <- readIORef rref
        
        -- TODO: If we're doing this nested strategy, we should probably just keep the 
        -- threads alive indefinitely.  After all, we can get some weird conditions 
        -- doing it this way.  At the least, we should put this in steal where the 
        -- shutdown occurs.
        b <- globalThreadShutdown
--         putStrLn $ "Global thread shutdown: " ++ show b
        case r of
            Full a -> return a
            _ -> error "no result"

      Failure uid cScheds -> do
#if __GLASGOW_HASKELL__ >= 701 /* 20110301 */
            -- See [Notes on threadCapability] for more details
        (main_cpu, _) <- threadCapability myTId
        cTId <- readIORef $ tId $ cScheds ! main_cpu
        let doWork = cTId == myTId
#else
        cTIds <- mapM (\s -> (readIORef $ tId $ s) >>= (\t -> return (s,t))) (elems cScheds)
        let (main_cpu, doWork) = case find ((== myTId) . snd) cTIds of
                                        Nothing    -> (0, False)
                                        Just (s,_) -> (no s, True)
#endif
        
        rref <- newIORef Empty
        let task = runCont (x >>= put_ (IVar rref)) (const Done)
            state = cScheds ! main_cpu
        if doWork
          then do
            --printf "cpu %d using old threads, of which I am one\n" main_cpu
            currentWorkers <- newIORef 1
            sublst <- newIORef []
            let workLimit = (uid, currentWorkers)
            atomicModifyIORef (workpool state) $ \wp -> (Work uid currentWorkers sublst wp, ())
            sched _doSync workLimit state sublst uid $ task
          else do
            --printf "cpu %d using old threads, of which I am not one\n" main_cpu
            currentWorkers <- newIORef 0
            sublst <- newIORef [task]
            m <- newEmptyMVar
            atomicModifyIORef (status state) $ addExtIdler (ExtIdle uid m)
            atomicModifyIORef (workpool state) $ \wp -> (Work uid currentWorkers sublst wp, ())
            takeMVar m
        --printf "cpu %d finished in child\n" main_cpu
        r <- readIORef rref
--        globalThreadShutdown
        case r of
            Full a -> return a
            _ -> error "no result"

-- | The main way to run a Par computation
runPar :: Par a -> a
runPar = runPar_internal True

-- | An asynchronous version in which the main thread of control in a
-- Par computation can return while forked computations still run in
-- the background.  
runParAsync :: Par a -> a
runParAsync = runPar_internal False

-- | An alternative version in which the consumer of the result has
--   the option to "help" run the Par computation if results it is
--   interested in are not ready yet.
runParAsyncHelper :: Par a -> (a, IO ())
runParAsyncHelper = undefined -- TODO: Finish Me.


-- ---------------------------------------------------------------------------
-- PAR FUNCTIONS
-- ---------------------------------------------------------------------------

new :: Par (IVar a)
new  = Par $ New Empty

newFull :: NFData a => a -> Par (IVar a)
newFull x = deepseq x (Par $ New (Full x))

newFull_ :: a -> Par (IVar a)
newFull_ !x = Par $ New (Full x)

get :: IVar a -> Par a
get v = Par $ \c -> Get v c

put_ :: IVar a -> a -> Par ()
put_ v !a = Par $ \c -> Put v a (c ())

put :: NFData a => IVar a -> a -> Par ()
put v a = deepseq a (Par $ \c -> Put v a (c ()))

yield :: Par ()
yield = Par $ \c -> Yield (c ())