module Control.Monad.Par.Meta
(
Par
, IVar
, PC.ParFuture(..)
, PC.ParIVar(..)
, runMetaPar
, runMetaParIO
, Sched(..)
, GlobalState
, Resource(..)
, Startup(..)
, WorkSearch(..)
, forkWithExceptions
, spawnWorkerOnCPU
) where
import Control.Applicative
import Control.Concurrent ( MVar
, newEmptyMVar
, putMVar
, readMVar
, takeMVar
, tryPutMVar
, tryTakeMVar
)
import Control.DeepSeq
import Control.Monad
import "mtl" Control.Monad.Cont (ContT(..), MonadCont, callCC, runContT)
import "mtl" Control.Monad.Reader (ReaderT, MonadReader, runReaderT, ask)
import Control.Monad.IO.Class
import Control.Exception (catch, throwTo, SomeException)
import Data.Concurrent.Deque.Class (WSDeque)
import Data.Concurrent.Deque.Reference.DequeInstance ()
import Data.Concurrent.Deque.Reference as R
import qualified Data.ByteString.Char8 as BS
import Data.Monoid
import Data.Set (Set)
import qualified Data.Set as Set
import Data.Typeable (Typeable)
import Data.IORef (IORef, writeIORef, newIORef)
import Data.Vector (Vector)
import qualified Data.Vector as V
import System.IO.Unsafe (unsafePerformIO)
import System.IO (stderr)
import System.Random.MWC
import Text.Printf
import qualified Debug.Trace as DT
#ifdef AFFINITY
import System.Posix.Affinity (setAffinityOS)
#endif
import Control.Monad.Par.Meta.Resources.Debugging (dbgTaggedMsg)
import Control.Monad.Par.Meta.HotVar.IORef
import qualified Control.Monad.Par.Class as PC
#if __GLASGOW_HASKELL__ >= 702
import GHC.Conc (forkOn, ThreadId, myThreadId, threadCapability)
import Control.Concurrent (getNumCapabilities)
threadCapability' tid = Just <$> threadCapability tid
#else
import GHC.Conc (forkOnIO, ThreadId, myThreadId, numCapabilities)
forkOn :: Int -> IO () -> IO ThreadId
forkOn = forkOnIO
getNumCapabilities :: IO Int
getNumCapabilities = return numCapabilities
threadCapability' tid = do
vec <- readHotVar globalScheds
let f Nothing = return $ mempty
f (Just Sched{ no, tids }) = do
set <- readHotVar tids
case Set.member tid set of
False -> return $ mempty
True -> return $ First (Just no)
cap <- getFirst . mconcat <$> mapM f (V.toList vec)
return ((,True) <$> cap)
#endif
threadCapability' :: ThreadId -> IO (Maybe (Int, Bool))
#if __GLASGOW_HASKELL__ < 700
void = fmap (const ())
#endif
dbg :: Bool
#ifdef DEBUG
dbg = True
#else
dbg = False
#endif
newtype Par a = Par { unPar :: ContT () ROnly a }
deriving (Monad, MonadCont, MonadReader Sched,
MonadIO, Applicative, Functor, Typeable)
type ROnly = ReaderT Sched IO
newtype IVar a = IVar (HotVar (IVarContents a))
instance NFData (IVar a) where
rnf _ = ()
data IVarContents a = Full a | Empty | Blocked [a -> IO ()]
type GlobalState = Vector (Maybe Sched)
newtype Startup = St { runSt ::
WorkSearch
-> HotVar GlobalState
-> IO ()
}
instance Show Startup where
show _ = "<Startup>"
instance Monoid Startup where
mempty = St $ \_ _ -> return ()
(St st1) `mappend` (St st2) = St st'
where st' ws schedMap = st1 ws schedMap >> st2 ws schedMap
newtype WorkSearch = WS { runWS ::
Sched
-> HotVar GlobalState
-> IO (Maybe (Par ()))
}
instance Show WorkSearch where
show _ = "<WorkSearch>"
instance Monoid WorkSearch where
mempty = WS $ \_ _ -> return Nothing
(WS ws1) `mappend` (WS ws2) = WS ws'
where ws' sched schedMap = do
mwork <- ws1 sched schedMap
case mwork of
Nothing -> ws2 sched schedMap
_ -> return mwork
data Resource = Resource {
startup :: Startup
, workSearch :: WorkSearch
} deriving (Show)
instance Monoid Resource where
mempty = Resource mempty mempty
Resource st1 ws1 `mappend` Resource st2 ws2 =
Resource (st1 `mappend` st2) (ws1 `mappend` ws2)
data Sched = Sched
{
no :: !Int,
tids :: HotVar (Set ThreadId),
workpool :: WSDeque (Par ()),
rng :: HotVar GenIO,
mortals :: HotVar Int,
consecutiveFailures :: IORef Int,
ivarUID :: HotVar Int,
schedWs :: WorkSearch
}
instance Show Sched where
show Sched{ no } = printf "Sched{ no=%d }" no
forkWithExceptions :: (IO () -> IO ThreadId)
-> String
-> (IO () -> IO ThreadId)
forkWithExceptions forkit descr action = do
parent <- myThreadId
forkit $
Control.Exception.catch action
(\ e -> do
BS.hPutStrLn stderr $ BS.pack $ "Exception inside child thread "++show descr++": "++show e
throwTo parent (e::SomeException)
)
ensurePinned :: IO a -> IO a
ensurePinned action = do
tid <- myThreadId
mp <- threadCapability' tid
case mp of
Just (_, True) -> action
Just (cap, _ ) -> do
mv <- newEmptyMVar
void $ forkOn cap (action >>= putMVar mv)
takeMVar mv
Nothing -> do
mv <- newEmptyMVar
void $ forkOn 0 (action >>= putMVar mv)
takeMVar mv
popWork :: Sched -> IO (Maybe (Par ()))
popWork Sched{ workpool, no } = do
when dbg $ do
#if __GLASGOW_HASKELL__ >= 702
(cap, _) <- threadCapability =<< myThreadId
dbgTaggedMsg 4 $ BS.pack $ "[meta: cap "++show cap++ "] trying to pop local work on Sched "++ show no
#else
dbgTaggedMsg 4 $ BS.pack $ "[meta] trying to pop local work on Sched "++ show no
#endif
R.tryPopL workpool
pushWork :: Sched -> Par () -> IO ()
pushWork Sched{ workpool } work = R.pushL workpool work
pushWorkEnsuringWorker :: Sched -> Par () -> IO (Maybe ())
pushWorkEnsuringWorker _ work = do
no <- takeMVar workerPresentBarrier
sched@Sched { tids } <- getSchedForCap no
set <- readHotVar tids
case Set.null set of
False -> do
when dbg $ printf "[meta] pushing ensured work onto cap %d\n" no
Just <$> pushWork sched work
True -> error $ printf "[meta] worker barrier filled by non-worker %d\n" no
globalScheds :: HotVar GlobalState
globalScheds = unsafePerformIO $ do
n <- getNumCapabilities
newHotVar $ V.replicate n Nothing
workerPresentBarrier :: MVar Int
workerPresentBarrier = unsafePerformIO newEmptyMVar
startBarrier :: MVar ()
startBarrier = unsafePerformIO newEmptyMVar
getSchedForCap :: Int -> IO Sched
getSchedForCap cap = do
scheds <- readHotVar globalScheds
case scheds V.! cap of
Just sched -> return sched
Nothing -> error $
printf "tried to get a Sched for capability %d before initializing" cap
makeOrGetSched :: WorkSearch -> Int -> IO Sched
makeOrGetSched ws cap = do
sched <- Sched cap <$> newHotVar (Set.empty)
<*> R.newQ
<*> (newHotVar =<< create)
<*> newHotVar 0
<*> newIORef 0
<*> newHotVar 0
<*> pure ws
modifyHotVar globalScheds $ \scheds ->
case scheds V.! cap of
Just sched -> (scheds, sched)
Nothing -> if dbg
then DT.trace (printf "[%d] created scheduler" cap)
(scheds V.// [(cap, Just sched)], sched)
else (scheds V.// [(cap, Just sched)], sched)
forkOn' :: Int -> IO () -> IO ThreadId
#ifdef AFFINITY
forkOn' cap k = forkOn cap $ setAffinityOS cap >> k
#else
forkOn' = forkOn
#endif
spawnWorkerOnCPU :: WorkSearch
-> Int
-> IO ThreadId
spawnWorkerOnCPU ws cap =
forkWithExceptions (forkOn' cap) "spawned Par worker" $ do
me <- myThreadId
sched@Sched{ tids } <- makeOrGetSched ws cap
modifyHotVar_ tids (Set.insert me)
when dbg$ dbgTaggedMsg 2 $ BS.pack $
printf "[meta: cap %d] spawning new worker" cap
_ <- tryPutMVar workerPresentBarrier cap
readMVar startBarrier
when dbg$ dbgTaggedMsg 2 $ BS.pack $
printf "[meta: cap %d] new working entering loop" cap
runReaderT (workerLoop 0 errK) sched
errK :: a
errK = error "this closure shouldn't be used"
reschedule :: Par a
reschedule = Par $ ContT (workerLoop 0)
workerLoop :: Int -> ignoredCont -> ROnly ()
workerLoop failCount _k = do
mysched@Sched{ no, mortals, schedWs=ws, consecutiveFailures } <- ask
mwork <- liftIO $ popWork mysched
case mwork of
Just work -> do
when dbg $ liftIO $ printf "[meta %d] popped work from own queue\n" no
runContT (unPar work) $ const (workerLoop 0 _k)
Nothing -> do
die <- liftIO $ modifyHotVar mortals $ \ms ->
case ms of
0 -> (0, False)
n | n > 0 -> (n1, True)
n -> error $
printf "unexpected mortals count %d on cap %d" n no
unless die $ do
liftIO$ writeIORef consecutiveFailures failCount
mwork <- liftIO (runWS ws mysched globalScheds)
case mwork of
Just work -> runContT (unPar work) $ const (workerLoop 0 _k)
Nothing -> do
when dbg $ liftIO $ dbgTaggedMsg 4 $ BS.pack $ "[meta: cap "++show no++"] failed to find work; looping"
workerLoop (failCount + 1) _k
fork :: Par () -> Par ()
fork child = do
sched <- ask
callCC $ \parent -> do
let wrapped = parent ()
liftIO $ pushWork sched wrapped
child >> reschedule
new :: Par (IVar a)
new = liftIO $ IVar <$> newHotVar Empty
get :: IVar a -> Par a
get (IVar hv) = callCC $ \cont -> do
contents <- liftIO $ readHotVar hv
case contents of
Full a -> return a
_ -> do
sch <- ask
join . liftIO $ modifyHotVar hv $ \contents ->
case contents of
Empty -> (Blocked [pushWork sch . cont] , reschedule)
Blocked ks -> (Blocked (pushWork sch . cont : ks), reschedule)
Full a -> (Full a , return a)
put_ :: IVar a -> a -> Par ()
put_ (IVar hv) !content = do
liftIO $ do
ks <- modifyHotVar hv $ \contents ->
case contents of
Empty -> (Full content, [])
Blocked ks -> (Full content, ks)
Full _ -> error "multiple put"
mapM_ ($content) ks
put :: NFData a => IVar a -> a -> Par ()
put iv a = deepseq a (put_ iv a)
spawn :: NFData a => Par a -> Par (IVar a)
spawn p = do r <- new; fork (p >>= put r); return r
spawn_ :: Par a -> Par (IVar a)
spawn_ p = do r <- new; fork (p >>= put_ r); return r
runMetaParIO :: Resource -> Par a -> IO a
runMetaParIO Resource{ startup=st, workSearch=ws } work = ensurePinned $
do
tid <- myThreadId
mp <- threadCapability' tid
let cap = case mp of
Just (n, _) -> n
Nothing -> 0
sched@Sched{ tids, mortals } <- makeOrGetSched ws cap
ansMVar <- newEmptyMVar
let wrappedComp = do
ans <- work
liftIO $ do
dbgTaggedMsg 2 $ BS.pack "[meta] runMetaParIO computation finished, putting final MVar..."
putMVar ansMVar ans
modifyHotVar_ mortals (1+)
dbgTaggedMsg 2 $ BS.pack "[meta] runMetaParIO: done putting mvar and incrementing mortals."
isNested <- Set.member tid <$> readHotVar tids
if isNested then
void $ spawnWorkerOnCPU ws cap
else runSt st ws globalScheds
msucc <- pushWorkEnsuringWorker sched wrappedComp
when (msucc == Nothing)
$ error "[meta] could not find a scheduler with an active worker!"
dbgTaggedMsg 2 $ BS.pack
"[meta] runMetaParIO: Work pushed onto queue, now waiting on final MVar..."
_ <- tryPutMVar startBarrier ()
_ <- tryTakeMVar workerPresentBarrier
ans <- takeMVar ansMVar
return ans
runMetaPar :: Resource -> Par a -> a
runMetaPar rsrc work = unsafePerformIO $ runMetaParIO rsrc work
spawnP :: NFData a => a -> Par (IVar a)
spawnP = spawn . return
instance PC.ParFuture IVar Par where
get = get
spawn = spawn
spawn_ = spawn_
spawnP = spawnP
instance PC.ParIVar IVar Par where
fork = fork
new = new
put_ = put_