module Control.Monad.Par.Scheds.TraceInternal (
Trace(..), Sched(..), Par(..),
IVar(..), IVarContents(..),
sched,
runPar, runParAsync, runParAsyncHelper,
new, newFull, newFull_, get, put_, put,
pollIVar, yield,
) where
import Control.Monad as M hiding (sequence, join)
import Prelude hiding (mapM, sequence)
import Data.IORef
import System.IO.Unsafe
import Control.Concurrent hiding (yield)
import GHC.Conc hiding (yield)
import Control.DeepSeq
import Control.Applicative
import Data.Array
import Data.List (partition, find)
data Trace = forall a . Get (IVar a) (a -> Trace)
| forall a . Put (IVar a) a Trace
| forall a . New (IVarContents a) (IVar a -> Trace)
| Fork Trace Trace
| Done
| Yield Trace
data Sched = Sched
{ no :: !ThreadNumber,
workpool :: IORef WorkPool,
status :: IORef AllStatus,
scheds :: Array ThreadNumber Sched,
tId :: IORef ThreadId
}
type ThreadNumber = Int
type UId = Int
type CountRef = IORef Int
type WorkLimit = (UId, CountRef)
isWLUId :: WorkLimit -> (UId -> Bool) -> Bool
isWLUId (uid, _) op = op uid
shouldEndWorkSet :: WorkLimit -> IO Bool
shouldEndWorkSet (u,_) | u == 1 = return False
shouldEndWorkSet (_, cr) = do
c <- readIORef cr
return (c == 0)
idleAtWL :: WorkLimit -> MVar Bool -> Idle
idleAtWL (uid, _) m = Idle uid m
sched :: Bool -> WorkLimit -> Sched -> (IORef [Trace]) -> UId -> Trace -> IO ()
sched _doSync wl q@Sched{status, workpool} queueref uid t = loop t
where
loop t = case t of
New a f -> do
r <- newIORef a
loop (f (IVar r))
Get (IVar v) c -> do
e <- readIORef v
case e of
Full a -> loop (c a)
_other -> do
r <- atomicModifyIORef v $ \e -> case e of
Empty -> (Blocked [c], go)
Full a -> (Full a, loop (c a))
Blocked cs -> (Blocked (c:cs), go)
r
Put (IVar v) a t -> do
cs <- atomicModifyIORef v $ \e -> case e of
Empty -> (Full a, [])
Full _ -> error "multiple put"
Blocked cs -> (Full a, cs)
mapM_ (pushWork status uid queueref . ($a)) cs
loop t
Fork child parent -> do
pushWork status uid queueref child
loop parent
Done ->
if _doSync
then go
else do
forkIO go; return ()
Yield parent -> do
atomicModifyIORef queueref $ \ts -> (ts++[parent],())
go
go = do
mt <- atomicPopIORef queueref
case mt of
Just t -> loop t
Nothing -> do
cr <- wpRemoveWork uid workpool
workDone <- decWorkerCount uid cr status
unless (isWLUId wl (== uid) && workDone) $
reschedule wl q
reschedule :: WorkLimit -> Sched -> IO ()
reschedule wl q@Sched{ workpool, status } = do
wp <- readIORef workpool
case wp of
Work uid cr wqref _ | isWLUId wl (uid >=) -> do
incWorkerCount cr
nextTrace <- atomicPopIORef wqref
case nextTrace of
Just t -> sched True wl q wqref uid t
Nothing -> do
wpRemoveWork uid workpool
workDone <- decWorkerCount uid cr status
unless (isWLUId wl (== uid) && workDone) $
reschedule wl q
_ -> steal wl q
steal :: WorkLimit -> Sched -> IO ()
steal wl q@Sched{ status, scheds, no=my_no } =
go l
where
(l,u) = bounds scheds
go n
| n > u = do
m <- newEmptyMVar
atomicModifyIORef status $ addIdler (idleAtWL wl m)
s <- shouldEndWorkSet wl
if s
then do
endWorkSet status (fst wl)
return ()
else do
finished <- takeMVar m
unless finished $ go l
| n == my_no = go (n+1)
| otherwise = readIORef (workpool (scheds!n)) >>= tryToSteal
where
tryToSteal (Work uid cr wqref wp) | isWLUId wl (uid >=) = do
incWorkerCount cr
stolenTrace <- atomicPopIORef wqref
case stolenTrace of
Nothing -> decWorkerCount uid cr status >> tryToSteal wp
Just t -> do
sublst <- newIORef []
atomicModifyIORef (workpool q) $ \wp' -> (Work uid cr sublst wp', ())
sched True wl q sublst uid t
tryToSteal _ = go (n+1)
pushWork :: IORef AllStatus -> UId -> (IORef [Trace]) -> Trace -> IO ()
pushWork status uid wqref t = do
atomicModifyIORef wqref $ (\ts -> (t:ts, ()))
allstatus <- readIORef status
when (hasIdleWorker uid allstatus) $ do
r <- atomicModifyIORef status $ getIdleWorker uid
case r of
Just b -> putMVar b False
Nothing -> return ()
decWorkerCount :: UId -> CountRef -> IORef AllStatus -> IO Bool
decWorkerCount uid countref status = do
done <- atomicModifyIORef countref $
(\n -> if n == 0 then error "Impossible value in decWorkerCount" else (n1, n == 1))
when done $ (endWorkSet status uid >> globalWorkComplete uid)
return done
incWorkerCount :: CountRef -> IO ()
incWorkerCount countref = do
atomicModifyIORef countref $ (\n -> (n+1, ()))
atomicPopIORef :: IORef [a] -> IO (Maybe a)
atomicPopIORef ref = atomicModifyIORef ref $ \lst ->
case lst of
[] -> ([], Nothing)
(e:es) -> (es, Just e)
data Idle = Idle !UId (MVar Bool)
data ExtIdle = ExtIdle !UId (MVar ())
type AllStatus = ([Idle], [ExtIdle])
newStatus :: AllStatus
newStatus = ([], [])
addIdler :: Idle -> AllStatus -> (AllStatus, ())
addIdler i@(Idle u _) (is, es) = ((insert is, es), ())
where insert [] = [i]
insert xs@(i'@(Idle u' _):xs') = if u <= u'
then i : xs
else i' : insert xs'
addExtIdler :: ExtIdle -> AllStatus -> (AllStatus, ())
addExtIdler e (is, es) = ((is, e:es), ())
getIdleWorker :: UId -> AllStatus -> (AllStatus, Maybe (MVar Bool))
getIdleWorker u q = case q of
([],_) -> (q, Nothing)
((Idle u' m'):rst, es) -> if u' <= u then ((rst,es), Just m') else (q, Nothing)
hasIdleWorker :: UId -> AllStatus -> Bool
hasIdleWorker uid q = case getIdleWorker uid q of
(_, Nothing) -> False
(_, Just _) -> True
endWorkSet :: IORef AllStatus -> UId -> IO ()
endWorkSet status uid = do
(is, es) <- atomicModifyIORef status $ getAllAtID
mapM_ (\(ExtIdle _ mb) -> putMVar mb ()) es
mapM_ (\(Idle _ mb) -> putMVar mb True) is
where
getAllAtID (is, es) = ((is', es'), (elems1, elems2))
where
(elems1, is') = partition (\(Idle u _) -> u == uid) is
(elems2, es') = partition (\(ExtIdle u _) -> u == uid) es
data WorkPool = Work !UId CountRef (IORef [Trace]) WorkPool | NoWork
wpRemoveWork :: UId -> IORef WorkPool -> IO CountRef
wpRemoveWork uid pRef = atomicModifyIORef pRef f
where f :: WorkPool -> (WorkPool, CountRef)
f (Work uid' cr' _ p') | uid == uid' = (p', cr')
f (Work uid' cr' wq' p') =
let (p'', cr'') = f p'
in (Work uid' cr' wq' p'', cr'')
f NoWork = error "Impossible state in wpRemoveWork"
newtype Par a = Par {
runCont :: (a -> Trace) -> Trace
}
instance Functor Par where
fmap f m = Par $ \c -> runCont m (c . f)
instance Monad Par where
return a = Par ($ a)
m >>= k = Par $ \c -> runCont m $ \a -> runCont (k a) c
instance Applicative Par where
(<*>) = ap
pure = return
newtype IVar a = IVar (IORef (IVarContents a))
data IVarContents a = Full a | Empty | Blocked [a -> Trace]
instance NFData (IVar a) where
rnf _ = ()
pollIVar :: IVar a -> IO (Maybe a)
pollIVar (IVar ref) =
do contents <- readIORef ref
case contents of
Full x -> return (Just x)
_ -> return (Nothing)
data GlobalThreadState = GTS (Array ThreadNumber Sched) !UId !Int
globalThreadState :: IORef (Maybe GlobalThreadState)
globalThreadState = unsafePerformIO $ newIORef $ Nothing
globalWorkComplete :: UId -> IO ()
globalWorkComplete _ =
atomicModifyIORef globalThreadState f
where f Nothing = error "Impossible state in globalWorkComplete."
f (Just (GTS retA n c)) = (Just (GTS retA n (c+1)), ())
data GTSResult = Success UId | Failure UId (Array ThreadNumber Sched)
globalEstablishScheds :: Array ThreadNumber Sched -> IO GTSResult
globalEstablishScheds a =
atomicModifyIORef globalThreadState f
where f Nothing = (Just (GTS a 1 0), Success 0)
f (Just (GTS retA n c)) = (Just (GTS retA (n+1) c), Failure n retA)
globalThreadShutdown :: IO Bool
globalThreadShutdown = do
ma <- atomicModifyIORef globalThreadState f
case ma of
Nothing -> return False
Just a -> do
let s = status $ a ! (fst $ bounds a)
(is, es) <- atomicModifyIORef s $ \x -> (newStatus, x)
mapM_ (\(ExtIdle _ m) -> putMVar m ()) es
mapM_ (\(Idle _ mb) -> putMVar mb True) is
return True
where f (Just (GTS a n c)) | n == c = (Nothing, Just a)
f gts = (gts, Nothing)
runPar_internal :: Bool -> Par a -> a
runPar_internal _doSync x = unsafePerformIO $ do
myTId <- myThreadId
tIds <- replicateM numCapabilities $ newIORef myTId
workpools <- replicateM numCapabilities $ newIORef NoWork
statusRef <- newIORef newStatus
let states = listArray (0, numCapabilities1)
[ Sched { no=n, workpool=wp, status=statusRef, scheds=states, tId=t }
| n <- [0..] | wp <- workpools | t <- tIds ]
res <- globalEstablishScheds states
case res of
Success uid -> do
#if __GLASGOW_HASKELL__ >= 701 /* 20110301 */
(main_cpu, _) <- threadCapability =<< myThreadId
#else
let main_cpu = 0
#endif
currentWorkers <- newIORef 1
let workLimit' = (1, undefined)
let workLimit = (0, currentWorkers)
m <- newEmptyMVar
rref <- newIORef Empty
atomicModifyIORef statusRef $ addExtIdler (ExtIdle uid m)
forM_ (elems states) $ \(state@Sched{no=cpu}) -> do
forkOnIO cpu $ do
myTId <- myThreadId
writeIORef (tId state) myTId
if (cpu /= main_cpu)
then reschedule workLimit' state
else do
sublst <- newIORef []
atomicModifyIORef (workpool state) $ \wp -> (Work uid currentWorkers sublst wp, ())
sched _doSync workLimit state sublst uid $ runCont (x >>= put_ (IVar rref)) (const Done)
takeMVar m
r <- readIORef rref
b <- globalThreadShutdown
case r of
Full a -> return a
_ -> error "no result"
Failure uid cScheds -> do
#if __GLASGOW_HASKELL__ >= 701 /* 20110301 */
(main_cpu, _) <- threadCapability myTId
cTId <- readIORef $ tId $ cScheds ! main_cpu
let doWork = cTId == myTId
#else
cTIds <- mapM (\s -> (readIORef $ tId $ s) >>= (\t -> return (s,t))) (elems cScheds)
let (main_cpu, doWork) = case find ((== myTId) . snd) cTIds of
Nothing -> (0, False)
Just (s,_) -> (no s, True)
#endif
rref <- newIORef Empty
let task = runCont (x >>= put_ (IVar rref)) (const Done)
state = cScheds ! main_cpu
if doWork
then do
currentWorkers <- newIORef 1
sublst <- newIORef []
let workLimit = (uid, currentWorkers)
atomicModifyIORef (workpool state) $ \wp -> (Work uid currentWorkers sublst wp, ())
sched _doSync workLimit state sublst uid $ task
else do
currentWorkers <- newIORef 0
sublst <- newIORef [task]
m <- newEmptyMVar
atomicModifyIORef (status state) $ addExtIdler (ExtIdle uid m)
atomicModifyIORef (workpool state) $ \wp -> (Work uid currentWorkers sublst wp, ())
takeMVar m
r <- readIORef rref
case r of
Full a -> return a
_ -> error "no result"
runPar :: Par a -> a
runPar = runPar_internal True
runParAsync :: Par a -> a
runParAsync = runPar_internal False
runParAsyncHelper :: Par a -> (a, IO ())
runParAsyncHelper = undefined
new :: Par (IVar a)
new = Par $ New Empty
newFull :: NFData a => a -> Par (IVar a)
newFull x = deepseq x (Par $ New (Full x))
newFull_ :: a -> Par (IVar a)
newFull_ !x = Par $ New (Full x)
get :: IVar a -> Par a
get v = Par $ \c -> Get v c
put_ :: IVar a -> a -> Par ()
put_ v !a = Par $ \c -> Put v a (c ())
put :: NFData a => IVar a -> a -> Par ()
put v a = deepseq a (Par $ \c -> Put v a (c ()))
yield :: Par ()
yield = Par $ \c -> Yield (c ())