module Control.Monad.Par.Meta 
( 
  Par
, IVar
, PC.ParFuture(..)
, PC.ParIVar(..)
, runMetaPar
, runMetaParIO
, Sched(..)
, GlobalState
, Resource(..)
, Startup(..)
, WorkSearch(..)
, forkWithExceptions
, spawnWorkerOnCPU
) where
import Control.Applicative
import Control.Concurrent ( MVar
                          , newEmptyMVar
                          , putMVar
                          , readMVar
                          , takeMVar
                          , tryPutMVar
                          , tryTakeMVar
                          )
import Control.DeepSeq
import Control.Monad
import "mtl" Control.Monad.Cont (ContT(..), MonadCont, callCC, runContT)
import "mtl" Control.Monad.Reader (ReaderT, MonadReader, runReaderT, ask)
import Control.Monad.IO.Class
import Control.Exception (catch, throwTo, SomeException)
import Data.Concurrent.Deque.Class (WSDeque)
import Data.Concurrent.Deque.Reference.DequeInstance ()
import Data.Concurrent.Deque.Reference as R
import qualified Data.ByteString.Char8 as BS
import Data.Monoid
import Data.Set (Set)
import qualified Data.Set as Set
import Data.Typeable (Typeable)
import Data.IORef (IORef, writeIORef, newIORef)
import Data.Vector (Vector)
import qualified Data.Vector as V
import System.IO.Unsafe (unsafePerformIO)
import System.IO (stderr)
import System.Random.MWC
import Text.Printf
import qualified Debug.Trace as DT
#ifdef AFFINITY
import System.Posix.Affinity (setAffinityOS)
#endif
import Control.Monad.Par.Meta.Resources.Debugging (dbgTaggedMsg)
import Control.Monad.Par.Meta.HotVar.IORef
import qualified Control.Monad.Par.Class as PC
#if __GLASGOW_HASKELL__ >= 702
import GHC.Conc (forkOn, ThreadId, myThreadId, threadCapability)
import Control.Concurrent (getNumCapabilities)
threadCapability' tid = Just <$> threadCapability tid
#else
import GHC.Conc (forkOnIO, ThreadId, myThreadId, numCapabilities)
forkOn :: Int -> IO () -> IO ThreadId
forkOn = forkOnIO
getNumCapabilities :: IO Int
getNumCapabilities = return numCapabilities
threadCapability' tid = do
  vec <- readHotVar globalScheds
  let f Nothing = return $ mempty
      f (Just Sched{ no, tids }) = do
        set <- readHotVar tids
        case Set.member tid set of
          False -> return $ mempty
          True -> return $ First (Just no)
  cap <- getFirst . mconcat <$> mapM f (V.toList vec)
  return ((,True) <$> cap)
#endif
threadCapability' :: ThreadId -> IO (Maybe (Int, Bool))
#if __GLASGOW_HASKELL__ < 700
void = fmap (const ())
#endif
dbg :: Bool
#ifdef DEBUG
dbg = True
#else
dbg = False
#endif
newtype Par a = Par { unPar :: ContT () ROnly a }
    deriving (Monad, MonadCont, MonadReader Sched, 
              MonadIO, Applicative, Functor, Typeable)
type ROnly = ReaderT Sched IO
newtype IVar a = IVar (HotVar (IVarContents a))
instance NFData (IVar a) where
  rnf _ = ()
data IVarContents a = Full a | Empty | Blocked [a -> IO ()]
type GlobalState = Vector (Maybe Sched)
newtype Startup = St { runSt ::
     WorkSearch 
  -> HotVar GlobalState 
  -> IO () 
  }
instance Show Startup where
  show _ = "<Startup>"
instance Monoid Startup where
  mempty = St $ \_ _ -> return ()
  (St st1) `mappend` (St st2) = St st'
    where st' ws schedMap = st1 ws schedMap >> st2 ws schedMap            
                             
newtype WorkSearch = WS { runWS ::
     Sched
  -> HotVar GlobalState
  -> IO (Maybe (Par ()))
  }
instance Show WorkSearch where
  show _ = "<WorkSearch>"
instance Monoid WorkSearch where
  mempty = WS $ \_ _ -> return Nothing
  (WS ws1) `mappend` (WS ws2) = WS ws'
    where ws' sched schedMap = do
            mwork <- ws1 sched schedMap
            case mwork of
              Nothing -> ws2 sched schedMap
              _ -> return mwork                
data Resource = Resource {
    startup  :: Startup
  , workSearch :: WorkSearch
  } deriving (Show)
instance Monoid Resource where
  mempty = Resource mempty mempty
  Resource st1 ws1 `mappend` Resource st2 ws2 =
    Resource (st1 `mappend` st2) (ws1 `mappend` ws2)
data Sched = Sched 
    { 
      
      
      no       ::  !Int,
      
      tids     :: HotVar (Set ThreadId),
      
      
      
      workpool :: WSDeque (Par ()),
      
      rng      :: HotVar GenIO,
      
      
      
      
      mortals  :: HotVar Int, 
      
      
      
      
      consecutiveFailures :: IORef Int,
      
      
      
      ivarUID :: HotVar Int,
      
      
      schedWs :: WorkSearch
    }
instance Show Sched where
  show Sched{ no } = printf "Sched{ no=%d }" no 
forkWithExceptions :: (IO () -> IO ThreadId) 
                   -> String 
                   -> (IO () -> IO ThreadId)
forkWithExceptions forkit descr action = do 
   parent <- myThreadId
   forkit $ 
      Control.Exception.catch action
	 (\ e -> do
	  BS.hPutStrLn stderr $ BS.pack $ "Exception inside child thread "++show descr++": "++show e
	  throwTo parent (e::SomeException)
	 )
ensurePinned :: IO a -> IO a
ensurePinned action = do 
  tid <- myThreadId
  mp <- threadCapability' tid  
  case mp of
    Just (_, True) -> action
    Just (cap, _ ) -> do
      mv <- newEmptyMVar 
      void $ forkOn cap (action >>= putMVar mv)
      takeMVar mv
    Nothing -> do
      
      
      
      
      
      mv <- newEmptyMVar 
      void $ forkOn 0 (action >>= putMVar mv)
      takeMVar mv
popWork :: Sched -> IO (Maybe (Par ()))
popWork Sched{ workpool, no } = do
  when dbg $ do
#if __GLASGOW_HASKELL__ >= 702
    (cap, _) <- threadCapability =<< myThreadId
    dbgTaggedMsg 4 $ BS.pack $ "[meta: cap "++show cap++ "] trying to pop local work on Sched "++ show no
#else
    dbgTaggedMsg 4 $ BS.pack $ "[meta] trying to pop local work on Sched "++ show no
#endif
  R.tryPopL workpool
pushWork :: Sched -> Par () -> IO ()
pushWork Sched{ workpool } work = R.pushL workpool work
pushWorkEnsuringWorker :: Sched -> Par () -> IO (Maybe ())
pushWorkEnsuringWorker _ work = do
  no <- takeMVar workerPresentBarrier
  sched@Sched { tids } <- getSchedForCap no
  set <- readHotVar tids
  case Set.null set of
    False -> do
      when dbg $ printf "[meta] pushing ensured work onto cap %d\n" no
      Just <$> pushWork sched work
    True -> error $ printf "[meta] worker barrier filled by non-worker %d\n" no
  
globalScheds :: HotVar GlobalState
globalScheds = unsafePerformIO $ do
  n <- getNumCapabilities
  newHotVar $ V.replicate n Nothing
workerPresentBarrier :: MVar Int
workerPresentBarrier = unsafePerformIO newEmptyMVar
startBarrier :: MVar ()
startBarrier = unsafePerformIO newEmptyMVar
getSchedForCap :: Int -> IO Sched
getSchedForCap cap = do
  scheds <- readHotVar globalScheds
  case scheds V.! cap of
    Just sched -> return sched
    Nothing -> error $ 
      printf "tried to get a Sched for capability %d before initializing" cap
makeOrGetSched :: WorkSearch -> Int -> IO Sched
makeOrGetSched ws cap = do
  sched <- Sched cap <$> newHotVar (Set.empty)  
                     <*> R.newQ                 
                     <*> (newHotVar =<< create) 
                     <*> newHotVar 0            
                     <*> newIORef  0            
                     <*> newHotVar 0            
                     <*> pure ws                
  modifyHotVar globalScheds $ \scheds ->
    case scheds V.! cap of
      Just sched -> (scheds, sched)
      Nothing -> if dbg
                 then DT.trace (printf "[%d] created scheduler" cap)
                               (scheds V.// [(cap, Just sched)], sched)
                 else (scheds V.// [(cap, Just sched)], sched)
forkOn' :: Int -> IO () -> IO ThreadId
#ifdef AFFINITY
forkOn' cap k = forkOn cap $ setAffinityOS cap >> k
#else
forkOn' = forkOn
#endif
spawnWorkerOnCPU :: WorkSearch 
                 -> Int        
                 -> IO ThreadId
spawnWorkerOnCPU ws cap = 
  forkWithExceptions (forkOn' cap) "spawned Par worker" $ do
    me <- myThreadId
    sched@Sched{ tids } <- makeOrGetSched ws cap
    modifyHotVar_ tids (Set.insert me)
    when dbg$ dbgTaggedMsg 2 $ BS.pack $
      printf "[meta: cap %d] spawning new worker" cap
    
    _ <- tryPutMVar workerPresentBarrier cap
    
    readMVar startBarrier
    when dbg$ dbgTaggedMsg 2 $ BS.pack $ 
      printf "[meta: cap %d] new working entering loop" cap
    runReaderT (workerLoop 0 errK) sched
errK :: a
errK = error "this closure shouldn't be used"
reschedule :: Par a
reschedule = Par $ ContT (workerLoop 0)
workerLoop :: Int -> ignoredCont -> ROnly ()
workerLoop failCount _k = do
  mysched@Sched{ no, mortals, schedWs=ws, consecutiveFailures } <- ask
  mwork <- liftIO $ popWork mysched
  case mwork of
    Just work -> do
      when dbg $ liftIO $ printf "[meta %d] popped work from own queue\n" no
      runContT (unPar work) $ const (workerLoop 0 _k)
    Nothing -> do
      
      die <- liftIO $ modifyHotVar mortals $ \ms ->
               case ms of
                 0         -> (0, False)                              
                 n | n > 0 -> (n1, True)
                 n         -> error $
                   printf "unexpected mortals count %d on cap %d" n no
      unless die $ do
        
        
        
        
        liftIO$ writeIORef consecutiveFailures failCount
        mwork <- liftIO (runWS ws mysched globalScheds)
        case mwork of
          Just work -> runContT (unPar work) $ const (workerLoop 0 _k)
          Nothing -> do 
            when dbg $ liftIO $ dbgTaggedMsg 4 $ BS.pack $ "[meta: cap "++show no++"] failed to find work; looping" 
            workerLoop (failCount + 1) _k 
fork :: Par () -> Par ()
fork child = do
  sched <- ask
  callCC $ \parent -> do
    let wrapped = parent ()
    liftIO $ pushWork sched wrapped
    child >> reschedule
new :: Par (IVar a)
new = liftIO $ IVar <$> newHotVar Empty
get :: IVar a -> Par a
get (IVar hv) = callCC $ \cont -> do
  contents <- liftIO $ readHotVar hv
  case contents of
    Full a -> return a
    _ -> do
      sch <- ask
      join . liftIO $ modifyHotVar hv $ \contents ->
        case contents of
          Empty      -> (Blocked [pushWork sch . cont]     , reschedule)
          Blocked ks -> (Blocked (pushWork sch . cont : ks), reschedule)
          Full a     -> (Full a                            , return a)
put_ :: IVar a -> a -> Par ()
put_ (IVar hv) !content = do
  liftIO $ do
    ks <- modifyHotVar hv $ \contents ->
      case contents of
        Empty      -> (Full content, [])
        Blocked ks -> (Full content, ks)
        Full _      -> error "multiple put"
    mapM_ ($content) ks
put :: NFData a => IVar a -> a -> Par ()
put iv a = deepseq a (put_ iv a)
spawn :: NFData a => Par a -> Par (IVar a)
spawn p = do r <- new; fork (p >>= put  r); return r
spawn_ :: Par a -> Par (IVar a)
spawn_ p = do r <- new; fork (p >>= put_ r); return r
runMetaParIO :: Resource -> Par a -> IO a
runMetaParIO Resource{ startup=st, workSearch=ws } work = ensurePinned $ 
  do
  
  tid <- myThreadId
  mp <- threadCapability' tid
  let cap = case mp of
              Just (n, _) -> n
              
              
              Nothing -> 0
  sched@Sched{ tids, mortals } <- makeOrGetSched ws cap
  
  
  ansMVar <- newEmptyMVar
  let wrappedComp = do 
        ans <- work
        liftIO $ do
          dbgTaggedMsg 2 $ BS.pack "[meta] runMetaParIO computation finished, putting final MVar..."
          putMVar ansMVar ans
          
          
          
          modifyHotVar_ mortals (1+)
          dbgTaggedMsg 2 $ BS.pack "[meta] runMetaParIO: done putting mvar and incrementing mortals."
  
  isNested <- Set.member tid <$> readHotVar tids
  if isNested then
        
        void $ spawnWorkerOnCPU ws cap
        
   else runSt st ws globalScheds
  
  msucc <- pushWorkEnsuringWorker sched wrappedComp
  when (msucc == Nothing)
    $ error "[meta] could not find a scheduler with an active worker!"
  dbgTaggedMsg 2 $ BS.pack
    "[meta] runMetaParIO: Work pushed onto queue, now waiting on final MVar..."
  
  _ <- tryPutMVar startBarrier ()
  
  _ <- tryTakeMVar workerPresentBarrier
  ans <- takeMVar ansMVar
  
  
  return ans
runMetaPar :: Resource -> Par a -> a
runMetaPar rsrc work = unsafePerformIO $ runMetaParIO rsrc work
spawnP :: NFData a => a -> Par (IVar a)
spawnP = spawn . return
instance PC.ParFuture IVar Par where
  get    = get
  spawn  = spawn
  spawn_ = spawn_
  spawnP = spawnP
instance PC.ParIVar IVar Par where
  fork = fork
  new  = new
  put_ = put_