{-# LANGUAGE RankNTypes, NamedFieldPuns, BangPatterns,
             ExistentialQuantification, CPP #-}
{-# OPTIONS_GHC -Wall -fno-warn-name-shadowing -fno-warn-unused-do-bind #-}
module Control.Monad.Par.Scheds.TraceInternal (
   Trace(..), Sched(..), Par(..),
   IVar(..), IVarContents(..),
   sched,
   runPar, runParIO, runParAsync,
   
   new, newFull, newFull_, get, put_, put,
   pollIVar, yield, fixPar, FixParException (..)
 ) where
import Control.Monad as M hiding (mapM, sequence, join)
import Prelude hiding (mapM, sequence, head,tail)
import Data.IORef
import System.IO.Unsafe
#if MIN_VERSION_base(4,4,0)
import GHC.IO.Unsafe (unsafeDupableInterleaveIO)
#else
import GHC.IO.Unsafe (unsafeInterleaveIO)
#endif
import Control.Concurrent hiding (yield)
import GHC.Conc (numCapabilities)
import Control.DeepSeq
import Control.Monad.Fix (MonadFix (mfix))
import Control.Exception (Exception, throwIO, BlockedIndefinitelyOnMVar (..),
                          catch)
#if !MIN_VERSION_base(4,8,0)
import Control.Applicative
#endif
#if __GLASGOW_HASKELL__ <= 700
import GHC.Conc (forkOnIO)
forkOn = forkOnIO
#endif
data Trace = forall a . Get (IVar a) (a -> Trace)
           | forall a . Put (IVar a) a Trace
           | forall a . New (IVarContents a) (IVar a -> Trace)
           | Fork Trace Trace
           | Done
           | Yield Trace
           | forall a . LiftIO (IO a) (a -> Trace)
sched :: Bool -> Sched -> Trace -> IO ()
sched _doSync queue t = loop t
 where
  loop t = case t of
    New a f -> do
      r <- newIORef a
      loop (f (IVar r))
    Get (IVar v) c -> do
      e <- readIORef v
      case e of
         Full a -> loop (c a)
         _other -> do
           r <- atomicModifyIORef v $ \e -> case e of
                        Empty    -> (Blocked [c], reschedule queue)
                        Full a   -> (Full a,      loop (c a))
                        Blocked cs -> (Blocked (c:cs), reschedule queue)
           r
    Put (IVar v) a t  -> do
      cs <- atomicModifyIORef v $ \e -> case e of
               Empty    -> (Full a, [])
               Full _   -> error "multiple put"
               Blocked cs -> (Full a, cs)
      mapM_ (pushWork queue. ($a)) cs
      loop t
    Fork child parent -> do
         pushWork queue child
         loop parent
    Done ->
         if _doSync
         then reschedule queue
         else do putStrLn " [par] Forking replacement thread..\n"
                 forkIO (reschedule queue); return ()
    Yield parent -> do
        
        let Sched { workpool } = queue
        
        
        atomicModifyIORef workpool $ \ts -> (ts++[parent], ())
        reschedule queue
    LiftIO io c -> do
        r <- io
        loop (c r)
data FixParException = FixParException deriving Show
instance Exception FixParException
reschedule :: Sched -> IO ()
reschedule queue@Sched{ workpool } = do
  e <- atomicModifyIORef workpool $ \ts ->
         case ts of
           []      -> ([], Nothing)
           (t:ts') -> (ts', Just t)
  case e of
    Nothing -> steal queue
    Just t  -> sched True queue t
steal :: Sched -> IO ()
steal q@Sched{ idle, scheds, no=my_no } = do
  
  go scheds
  where
    go [] = do m <- newEmptyMVar
               r <- atomicModifyIORef idle $ \is -> (m:is, is)
               if length r == numCapabilities - 1
                  then do
                     
                     mapM_ (\m -> putMVar m True) r
                  else do
                    done <- takeMVar m
                    if done
                       then do
                         
                         return ()
                       else do
                         
                         go scheds
    go (x:xs)
      | no x == my_no = go xs
      | otherwise     = do
         r <- atomicModifyIORef (workpool x) $ \ ts ->
                 case ts of
                    []     -> ([], Nothing)
                    (x:xs) -> (xs, Just x)
         case r of
           Just t  -> do
              
              sched True q t
           Nothing -> go xs
pushWork :: Sched -> Trace -> IO ()
pushWork Sched { workpool, idle } t = do
  atomicModifyIORef workpool $ \ts -> (t:ts, ())
  idles <- readIORef idle
  when (not (null idles)) $ do
    r <- atomicModifyIORef idle (\is -> case is of
                                          [] -> ([], return ())
                                          (i:is) -> (is, putMVar i False))
    r 
data Sched = Sched
    { no       :: {-# UNPACK #-} !Int,
      workpool :: IORef [Trace],
      idle     :: IORef [MVar Bool],
      scheds   :: [Sched] 
    }
newtype Par a = Par {
    runCont :: (a -> Trace) -> Trace
}
instance Functor Par where
    fmap f m = Par $ \c -> runCont m (c . f)
instance Monad Par where
    return = pure
    m >>= k  = Par $ \c -> runCont m $ \a -> runCont (k a) c
instance Applicative Par where
   (<*>) = ap
   pure a = Par ($ a)
instance MonadFix Par where
   mfix = fixPar
fixPar :: (a -> Par a) -> Par a
fixPar f = Par $ \ c ->
  LiftIO (do
    mv <- newEmptyMVar
    ans <- unsafeDupableInterleaveIO (readMVar mv
             `catch` \ ~BlockedIndefinitelyOnMVar -> throwIO FixParException)
    case f ans of
      Par q -> pure $ q $ \a -> LiftIO (putMVar mv a) (\ ~() -> c a)) id
#if !MIN_VERSION_base(4,4,0)
unsafeDupableInterleaveIO :: IO a -> IO a
unsafeDupableInterleaveIO = unsafeInterleaveIO
#endif
newtype IVar a = IVar (IORef (IVarContents a))
instance Eq (IVar a) where
  (IVar r1) == (IVar r2) = r1 == r2
instance NFData (IVar a) where
  rnf !_ = ()
pollIVar :: IVar a -> IO (Maybe a)
pollIVar (IVar ref) =
  do contents <- readIORef ref
     case contents of
       Full x -> return (Just x)
       _      -> return (Nothing)
data IVarContents a = Full a | Empty | Blocked [a -> Trace]
{-# INLINE runPar_internal #-}
runPar_internal :: Bool -> Par a -> IO a
runPar_internal _doSync x = do
   workpools <- replicateM numCapabilities $ newIORef []
   idle <- newIORef []
   let states = [ Sched { no=x, workpool=wp, idle, scheds=states }
                | (x,wp) <- zip [0..] workpools ]
#if __GLASGOW_HASKELL__ >= 701 /* 20110301 */
    
    
    
    
    
    
    
    
   (main_cpu, _) <- threadCapability =<< myThreadId
#else
    
    
    
    
    
    
   let main_cpu = 0
#endif
   m <- newEmptyMVar
   forM_ (zip [0..] states) $ \(cpu,state) ->
        forkOn cpu $
          if (cpu /= main_cpu)
             then reschedule state
             else do
                  rref <- newIORef Empty
                  sched _doSync state $ runCont (x >>= put_ (IVar rref)) (const Done)
                  readIORef rref >>= putMVar m
   r <- takeMVar m
   case r of
     Full a -> return a
     _ -> error "no result"
runPar :: Par a -> a
runPar = unsafePerformIO . runPar_internal True
runParIO :: Par a -> IO a
runParIO = runPar_internal True
runParAsync :: Par a -> a
runParAsync = unsafePerformIO . runPar_internal False
new :: Par (IVar a)
new  = Par $ New Empty
newFull :: NFData a => a -> Par (IVar a)
newFull x = Par $ \c -> x `deepseq` New (Full x) c
newFull_ :: a -> Par (IVar a)
newFull_ !x = Par $ New (Full x)
get :: IVar a -> Par a
get v = Par $ \c -> Get v c
put_ :: IVar a -> a -> Par ()
put_ v !a = Par $ \c -> Put v a (c ())
put :: NFData a => IVar a -> a -> Par ()
put v a = Par $ \c -> a `deepseq` Put v a (c ())
yield :: Par ()
yield = Par $ \c -> Yield (c ())