{-# LANGUAGE Unsafe #-} {-# LANGUAGE RankNTypes #-} {-# LANGUAGE CPP #-} {-# LANGUAGE NamedFieldPuns, BangPatterns #-} {-# LANGUAGE ExistentialQuantification #-} {-# LANGUAGE MultiParamTypeClasses #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE DoAndIfThenElse #-} {-# LANGUAGE TypeFamilies #-} {-# LANGUAGE TypeOperators #-} {-# LANGUAGE UndecidableInstances #-} {-# LANGUAGE DeriveDataTypeable #-} {-# LANGUAGE GeneralizedNewtypeDeriving #-} {-# LANGUAGE FlexibleInstances #-} -- For DeepFreeze {-# OPTIONS_GHC -Wall -fno-warn-name-shadowing -fno-warn-unused-do-bind #-} -- | This is an internal module that provides the core parallel scheduler. -- It is /not/ for end-users. module Control.LVish.SchedIdempotent ( -- * Basic types and accessors LVar(..), state, HandlerPool(), Par(..), ClosedPar(..), -- * Safe, deterministic operations yield, newPool, fork, forkHP, runPar, runParIO, runParDetailed, runParLogged, withNewPool, withNewPool_, forkWithExceptions, -- * Quasi-deterministic operations quiesce, quiesceAll, -- * Re-exported debug facilities logStrLn, dbgLvl, getLogger, -- * Unsafe operations; should be used only by experts to build new abstractions newLV, getLV, putLV, putLV_, freezeLV, freezeLVAfter, addHandler, liftIO, toss, -- * Internal, private bits. mkPar, Status(..), sched, Listener(..) ) where import Control.Monad hiding (sequence, join) import Control.Concurrent hiding (yield) import qualified Control.Concurrent as Conc import qualified Control.Exception as E import Control.DeepSeq import Control.Applicative import Control.LVish.MonadToss import Control.LVish.Logging as L import Debug.Trace(trace) import Data.IORef import Data.Atomics import Data.Typeable import qualified Data.Concurrent.Counter as C import qualified Data.Concurrent.Bag as B import GHC.Conc hiding (yield) import System.IO import System.IO.Unsafe (unsafePerformIO) import System.Environment(getEnvironment) import System.Mem.StableName (makeStableName, hashStableName) import Prelude hiding (mapM, sequence, head, tail) import qualified Prelude import System.Random (random) #ifdef DEBUG_LVAR import Text.Printf (printf) #endif -- import Control.Compose ((:.), unO) import Data.Traversable import Control.LVish.Types import qualified Control.LVish.SchedIdempotentInternal as Sched ------------------------------------------------------------------------------ -- LVar and Par monad representation ------------------------------------------------------------------------------ -- | LVars are parameterized by two types: -- -- * The first, @a@, characterizes the \"state\" of the LVar (i.e., the lattice -- element), and should be a concurrently mutable data type. That means, in -- particular, that only a /transient snapshot/ of the state can be -- obtained in general. But the information in such a snapshot is always a -- lower bound on the current value of the LVar. -- -- * The second, @d@, characterizes the \"delta\" associated with a @putLV@ -- operation (i.e., the actual change, if any, to the LVar's state). -- In many cases such deltas allow far more efficient communication between -- @putLV@s and blocked @getLV@s or handlers. It is crucial, however, that -- the behavior of a @get@ or handler does not depend on the /particular/ -- choice of @putLV@ operations (and hence deltas) that moved the LVar over -- the threshold. For simple data structures, the delta may just be the -- entire LVar state, but for, e.g., collection data structures, delta will -- generally represent a single insertion. data LVar a d = LVar { state :: a, -- the current, "global" state of the LVar status :: {-# UNPACK #-} !(IORef (Status d)), -- is the LVar active or frozen? name :: {-# UNPACK #-} !LVarID -- a unique identifier for this LVar } type LVarID = IORef () newLVID = newIORef () -- | a global ID that is *not* the name of any LVar. Makes it possible to -- represent Maybe (LVarID) with the type LVarID -- i.e., without any allocation. noName :: LVarID noName = unsafePerformIO $ newLVID {-# NOINLINE noName #-} -- | The frozen bit of an LVar is tied together with the bag of waiting listeners, -- which allows the entire bag to become garbage immediately after freezing. -- (Note, however, that outstanding @put@s that occurred just before freezing -- may still reference the bag, which is necessary to ensure that all listeners -- are informed of the @put@ prior to freezing.) data Status d = Freezing -- ^ further changes to the state are forbidden | Frozen -- ^ further changes to the state are forbidden | Active (B.Bag (Listener d)) -- ^ bag of blocked threshold reads and handlers -- | A listener for an LVar is informed of each change to the LVar's state -- (represented as a delta) and the event of the LVar freezing. The listener is -- given access to a bag token, allowing it to remove itself from the bag of -- listeners, after unblocking a threshold read, for example. It is also given -- access to the scheduler queue for the CPU that generated the event, which it -- can use to add threads. data Listener d = Listener { onUpdate :: d -> B.Token (Listener d) -> SchedState -> IO (), onFreeze :: B.Token (Listener d) -> SchedState -> IO () } -- | A @HandlerPool@ contains a way to count outstanding parallel computations that -- are affiliated with the pool. It detects the condition where all such threads -- have completed. data HandlerPool = HandlerPool { numHandlers :: C.Counter, -- How many handler callbacks are currently -- running? blockedOnQuiesce :: B.Bag ClosedPar } -- | A monadic type constructor for parallel computations producing an answer @a@. -- This is the internal, unsafe type. newtype Par a = Par { -- the computation is represented in CPS close :: (a -> ClosedPar) -> ClosedPar } -- A "closed" Par computation is one that has been plugged into a continuation. -- It is represented in a "Church encoded" style, i.e., directly in terms of its -- interpretation into the IO monad. Since the continuation has already been -- plugged into the computation, there is no answer type here. newtype ClosedPar = ClosedPar { exec :: SchedState -> IO () } type SchedState = Sched.State ClosedPar LVarID instance Functor Par where fmap f m = Par $ \k -> close m (k . f) instance Monad Par where return a = Par $ \k -> k a m >>= c = Par $ \k -> close m $ \a -> close (c a) k instance Applicative Par where (<*>) = ap pure = return ------------------------------------------------------------------------------ -- A few auxiliary functions ------------------------------------------------------------------------------ mkPar :: ((a -> ClosedPar) -> SchedState -> IO ()) -> Par a mkPar f = Par $ \k -> ClosedPar $ \q -> f k q whenJust :: Maybe a -> (a -> IO ()) -> IO () whenJust Nothing _ = return () whenJust (Just a) f = f a isFrozen :: LVar a d -> IO Bool isFrozen (LVar {status}) = do curStatus <- readIORef status case curStatus of Active _ -> return False _ -> return True -- | Logging within the (internal) Par monad. logStrLn :: Int -> String -> Par () #ifdef DEBUG_LVAR -- logStrLn = liftIO . logStrLn_ logStrLn lvl str = when (dbgLvl >= 1) $ do lgr <- getLogger num <- getWorkerNum liftIO$ L.logOn lgr (L.StrMsg lvl ("(wrkr"++show num ++") "++ str)) #else logStrLn _ _ = return () #endif logWith :: Sched.State a s -> Int -> String -> IO () #ifdef DEBUG_LVAR -- Only when the debug level is 1 or higher is the logger even initialized: logWith q lvl str = when (dbgLvl >= 1) $ do Just lgr <- readIORef (Sched.logger q) L.logOn lgr (L.StrMsg lvl str) #else logWith _ _ _ = return () #endif ------------------------------------------------------------------------------ -- LVar operations ------------------------------------------------------------------------------ -- | Create an LVar. newLV :: IO a -> Par (LVar a d) newLV init = mkPar $ \k q -> do state <- init listeners <- B.new status <- newIORef $ Active listeners name <- newLVID exec (k $ LVar {state, status, name}) q -- | Do a threshold read on an LVar getLV :: (LVar a d) -- ^ the LVar -> (a -> Bool -> IO (Maybe b)) -- ^ already past threshold? -- The @Bool@ indicates whether the LVar is FROZEN. -> (d -> IO (Maybe b)) -- ^ does @d@ pass the threshold? -> Par b getLV lv@(LVar {state, status}) globalThresh deltaThresh = mkPar $ \k q -> do -- tradeoff: we fastpath the case where the LVar is already beyond the -- threshhold by polling *before* enrolling the callback. The price is -- that, if we are not currently above the threshhold, we will have to poll -- /again/ after enrolling the callback. This race may also result in the -- continuation being executed twice, which is permitted by idempotence. let uniqsuf = ", lv "++(show$ unsafeName state)++" on worker "++(show$ Sched.no q) logWith q 7$ " [dbg-lvish] getLV: first readIORef "++uniqsuf curStatus <- readIORef status case curStatus of Active listeners -> do logWith q 7$ " [dbg-lvish] getLV (active): check globalThresh"++uniqsuf tripped <- globalThresh state False case tripped of Just b -> exec (k b) q -- already past the threshold; invoke the -- continuation immediately Nothing -> do -- /transiently/ not past the threshhold; block #if GET_ONCE execFlag <- newIORef False #endif let onUpdate d = unblockWhen $ deltaThresh d onFreeze = unblockWhen $ globalThresh state True unblockWhen thresh tok q = do let uniqsuf = ", lv "++(show$ unsafeName state)++" on worker "++(show$ Sched.no q) logWith q 7$ " [dbg-lvish] getLV (active): callback: check thresh"++uniqsuf tripped <- thresh whenJust tripped $ \b -> do B.remove tok #if GET_ONCE logWith q 8$ " [dbg-lvish] getLV (active): read execFlag for dedup"++uniqsuf ticket <- readForCAS execFlag unless (peekTicket ticket) $ do (winner, _) <- do logWith q 8$ " [dbg-lvish] getLV (active): CAS execFlag dedup"++uniqsuf casIORef execFlag ticket True when winner $ Sched.pushWork q (k b) #else Sched.pushWork q (k b) #endif logWith q 4$ " [dbg-lvish] getLV: blocking on LVar, registering listeners"++uniqsuf -- add listener, i.e., move the continuation to the waiting bag tok <- B.put listeners $ Listener onUpdate onFreeze -- but there's a race: the threshold might be passed (or the LVar -- frozen) between our check and the enrollment as a listener, so we -- must poll again logWith q 8$ " [dbg-lvish] getLV (active): second frozen check"++uniqsuf frozen <- isFrozen lv logWith q 7$ " [dbg-lvish] getLV (active): second globalThresh check"++uniqsuf tripped' <- globalThresh state frozen case tripped' of Just b -> do logWith q 7$ " [dbg-lvish] getLV (active): second globalThresh tripped, remove tok"++uniqsuf B.remove tok -- remove the listener we just added, and exec (k b) q -- execute the continuation. this work might be -- redundant, but by idempotence that's OK Nothing -> sched q -- Freezing or Frozen: _ -> do logWith q 7$ " [dbg-lvish] getLV (frozen): about to check globalThresh"++uniqsuf tripped <- globalThresh state True case tripped of Just b -> do -- logWith q 9$ " [dbg-lvish] getLV (frozen): thresh met, invoking continuation "++uniqsuf exec (k b) q -- already past the threshold; invoke the -- continuation immediately Nothing -> sched q -- We'll NEVER be above the threshold. -- Shouldn't this be an ERROR? (blocked-indefinitely) -- Depends on our semantics for runPar quiescence / errors states. -- | Update an LVar. putLV_ :: LVar a d -- ^ the LVar -> (a -> Par (Maybe d, b)) -- ^ how to do the put, and whether the LVar's -- value changed -> Par b putLV_ LVar {state, status, name} doPut = mkPar $ \k q -> do let uniqsuf = ", lv "++(show$ unsafeName state)++" on worker "++(show$ Sched.no q) putAfterFrzExn = E.throw$ PutAfterFreezeExn "Attempt to change a frozen LVar" logWith q 8 $ " [dbg-lvish] putLV: initial lvar status read"++uniqsuf fstStatus <- readIORef status case fstStatus of Freezing -> putAfterFrzExn Frozen -> putAfterFrzExn Active listeners -> do logWith q 8 $ " [dbg-lvish] putLV: setStatus,"++uniqsuf Sched.setStatus q name -- publish our intent to modify the LVar let cont (delta, ret) = ClosedPar $ \q -> do logWith q 8 $ " [dbg-lvish] putLV: read final status before unsetting"++uniqsuf sndStatus <- readIORef status -- read the frozen bit *while q's status is marked* logWith q 8 $ " [dbg-lvish] putLV: UN-setStatus"++uniqsuf Sched.setStatus q noName -- retract our modification intent -- AFTER the retraction, freezeLV is allowed to set the state to Frozen. whenJust delta $ \d -> do case sndStatus of Frozen -> putAfterFrzExn _ -> do logWith q 9 $ " [dbg-lvish] putLV: calling each listener's onUpdate"++uniqsuf B.foreach listeners $ \(Listener onUpdate _) tok -> onUpdate d tok q exec (k ret) q logWith q 5 $ " [dbg-lvish] putLV: about to mutate lvar"++uniqsuf exec (close (doPut state) cont) q -- | Update an LVar without generating a result. putLV :: LVar a d -- ^ the LVar -> (a -> IO (Maybe d)) -- ^ how to do the put, and whether the LVar's -- value changed -> Par () putLV lv doPut = putLV_ lv doPut' where doPut' a = do r <- liftIO (doPut a); return (r, ()) -- | Freeze an LVar (introducing quasi-determinism). -- It is the data structure implementor's responsibility to expose this as quasi-deterministc. freezeLV :: LVar a d -> Par () freezeLV LVar {name, status} = mkPar $ \k q -> do let uniqsuf = ", lv "++(show$ unsafeName state)++" on worker "++(show$ Sched.no q) logWith q 5 $ " [dbg-lvish] freezeLV: atomic modify status to Freezing"++uniqsuf oldStatus <- atomicModifyIORef status $ \s -> (Freezing, s) case oldStatus of Frozen -> return () Freezing -> return () Active listeners -> do logWith q 7 $ " [dbg-lvish] freezeLV: begin busy-wait for putter status"++uniqsuf Sched.await q (name /=) -- wait until all currently-running puts have -- snapshotted the active status logWith q 7 $ " [dbg-lvish] freezeLV: calling each listener's onFreeze"++uniqsuf B.foreach listeners $ \Listener {onFreeze} tok -> onFreeze tok q logWith q 7 $ " [dbg-lvish] freezeLV: finalizing status as Frozen"++uniqsuf writeIORef status Frozen exec (k ()) q ------------------------------------------------------------------------------ -- Handler pool operations ------------------------------------------------------------------------------ -- | Create a handler pool. newPool :: Par HandlerPool newPool = mkPar $ \k q -> do cnt <- C.new bag <- B.new let hp = HandlerPool cnt bag hpMsg q " [dbg-lvish] Created new pool" hp exec (k hp) q -- | Convenience function. Execute a @Par@ computation in the context of a fresh handler pool. withNewPool :: (HandlerPool -> Par a) -> Par (a, HandlerPool) withNewPool f = do hp <- newPool a <- f hp return (a, hp) -- | Convenience function. Execute a @Par@ computation in the context of a fresh -- handler pool, while ignoring the result of the computation. withNewPool_ :: (HandlerPool -> Par ()) -> Par HandlerPool withNewPool_ f = do hp <- newPool f hp return hp data DecStatus = HasDec | HasNotDec -- | Close a @Par@ task so that it is properly registered with a handler pool. closeInPool :: Maybe HandlerPool -> Par () -> IO ClosedPar closeInPool Nothing c = return $ close c $ const (ClosedPar sched) closeInPool (Just hp) c = do decRef <- newIORef HasNotDec -- in case the thread is duplicated, ensure -- that the counter is decremented only once -- on termination let cnt = numHandlers hp tryDecRef = do -- attempt to claim the role of decrementer ticket <- readForCAS decRef case peekTicket ticket of HasDec -> return False HasNotDec -> do (firstToDec, _) <- casIORef decRef ticket HasDec return firstToDec onFinishHandler _ = ClosedPar $ \q -> do shouldDec <- tryDecRef -- are we the first copy of the thread to -- terminate? when shouldDec $ do C.dec cnt -- record handler completion in pool quiescent <- C.poll cnt -- check for (transient) quiescence when quiescent $ do -- wake any threads waiting on quiescence hpMsg q " [dbg-lvish] -> Quiescent now.. waking conts" hp let invoke t tok = do B.remove tok Sched.pushWork q t B.foreach (blockedOnQuiesce hp) invoke sched q C.inc $ numHandlers hp -- record handler invocation in pool return $ close c onFinishHandler -- close the task with a special "done" -- continuation that clears it from the -- handler pool -- | Add a handler to an existing pool. {-# INLINE addHandler #-} addHandler :: Maybe HandlerPool -- ^ pool to enroll in, if any -> LVar a d -- ^ LVar to listen to -> (a -> Par ()) -- ^ initial snapshot callback on handler registration -> (d -> IO (Maybe (Par ()))) -- ^ subsequent callbacks: updates -> Par () addHandler hp LVar {state, status} globalCB updateThresh = let spawnWhen thresh q = do tripped <- thresh whenJust tripped $ \cb -> do logWith q 5 " [dbg-lvish] addHandler: Delta threshold triggered, pushing work.." closed <- closeInPool hp cb Sched.pushWork q closed onUpdate d _ q = spawnWhen (updateThresh d) q onFreeze _ _ = return () runWhen thresh q = do tripped <- thresh whenJust tripped $ \cb -> exec (close cb nullCont) q in mkPar $ \k q -> do curStatus <- readIORef status case curStatus of Active listeners -> -- enroll the handler as a listener do B.put listeners $ Listener onUpdate onFreeze; return () Frozen -> return () -- frozen, so no need to enroll Freezing -> return () -- frozen, so no need to enroll logWith q 4 " [dbg-lvish] addHandler: calling globalCB.." -- At registration time, traverse (globally) over the previously inserted items -- to launch any required callbacks. exec (close (globalCB state) nullCont) q exec (k ()) q nullCont = (\() -> ClosedPar (\_ -> return ())) -- | Block until a handler pool is quiescent. quiesce :: HandlerPool -> Par () quiesce hp@(HandlerPool cnt bag) = mkPar $ \k q -> do hpMsg q " [dbg-lvish] Begin quiescing pool, identity= " hp -- tradeoff: we assume that the pool is not yet quiescent, and thus enroll as -- a blocked thread prior to checking for quiescence tok <- B.put bag (k ()) quiescent <- C.poll cnt if quiescent then do B.remove tok hpMsg q " [dbg-lvish] -> Quiescent already!" hp exec (k ()) q else do hpMsg q " [dbg-lvish] -> Not quiescent yet, back to sched" hp sched q -- | A global barrier. quiesceAll :: Par () quiesceAll = mkPar $ \k q -> do sched q logWith q 1 " [dbg-lvish] Return from global barrier." exec (k ()) q -- | Freeze an LVar after a given handler quiesces. -- This is quasi-deterministic. freezeLVAfter :: LVar a d -- ^ the LVar of interest -> (a -> Par ()) -- ^ initial snapshot callback on handler registration -> (d -> IO (Maybe (Par ()))) -- ^ subsequent callbacks: updates -> Par () freezeLVAfter lv globalCB updateCB = do let globalCB' = globalCB updateCB' = updateCB hp <- newPool addHandler (Just hp) lv globalCB' updateCB' quiesce hp freezeLV lv ------------------------------------------------------------------------------ -- Par monad operations ------------------------------------------------------------------------------ -- | Fork a child thread, optionally in the context of a handler pool. forkHP :: Maybe HandlerPool -> Par () -> Par () forkHP mh child = mkPar $ \k q -> do closed <- closeInPool mh child Sched.pushWork q (k ()) -- "Work-first" policy. -- hpMsg q " [dbg-lvish] incremented and pushed work in forkInPool, now running cont" hp exec closed q -- | Fork a child thread. fork :: Par () -> Par () fork f = forkHP Nothing f -- | Perform an @IO@ action. liftIO :: IO a -> Par a liftIO io = mkPar $ \k q -> do r <- io exec (k r) q -- | IF compiled with debugging support, this will return the Logger used by the -- current Par session, otherwise it will simply throw an exception. getLogger :: Par L.Logger getLogger = mkPar $ \k q -> do Just lgr <- readIORef (Sched.logger q) exec (k lgr) q -- | Return the worker that we happen to be running on. (NONDETERMINISTIC.) getWorkerNum :: Par Int getWorkerNum = mkPar $ \k q -> exec (k (Sched.no q)) q -- | Generate a random boolean in a core-local way. Fully nondeterministic! instance MonadToss Par where toss = mkPar $ \k q -> do g <- readIORef $ Sched.prng q let (b, g' ) = random g writeIORef (Sched.prng q) g' exec (k b) q -- | Cooperatively schedule other threads. yield :: Par () yield = mkPar $ \k q -> do Sched.yieldWork q (k ()) sched q {-# INLINE sched #-} -- | Contract: This scheduler function only returns when ALL worker threads have -- completed their work and idled. sched :: SchedState -> IO () sched q = do n <- Sched.next q case n of Just t -> exec t q Nothing -> return () -- Forcing evaluation of a LVar is fruitless. instance NFData (LVar a d) where rnf _ = () -- | A variant with full control over the relevant knobs. -- -- Returns a list of flushed debug messages at the end (if in-memory logging was -- enabled, otherwise the list is empty). -- -- This version of runPar catches ALL exceptions that occur within the runPar, and -- returns them via an Either. The reason for this is that even if an error -- occurs, it is still useful to observe the log messages that lead to the failure. -- runParDetailed :: DbgCfg -- ^ Debugging config -> Int -- ^ How many worker threads to use. -> Par a -- ^ The computation to run. -> IO ([String], Either E.SomeException a) runParDetailed DbgCfg {dbgRange, dbgDests, dbgScheduling } numWrkrs comp = do queues <- Sched.new numWrkrs noName -- We create a thread on each CPU with forkOn. The CPU on which -- the current thread is running will host the main thread; the -- other CPUs will host worker threads. main_cpu <- Sched.currentCPU answerMV <- newEmptyMVar wrkrtids <- newIORef [] -- Debugging: spin the main thread (not beginning work) until we can fully -- initialize the logging data structure. -- -- TODO: This would be easier to deal with if we used the current thread directly -- as the main worker thread... let setLogger = do ls <- readIORef wrkrtids if length ls == numWrkrs then Sched.initLogger queues ls (minLvl,maxLvl) dbgDests dbgScheduling else do Conc.yield setLogger (minLvl, maxLvl) = case dbgRange of Just b -> b Nothing -> (0,dbgLvl) -- Option 1: forkWithExceptions version: ---------------------------------------------------------------------------------- #if 1 let forkit = forM_ (zip [0..] queues) $ \(cpu, q) -> do tid <- L.forkWithExceptions (forkOn cpu) "worker thread" $ do if cpu == main_cpu then let k x = ClosedPar $ \q -> do sched q -- ensure any remaining, enabled threads run to putMVar answerMV x -- completion prior to returning the result -- [TODO: ^ perhaps better to use a binary notification tree to signal the workers to stop...] in do #ifdef DEBUG_LVAR -- This is painful, we may need to spin and wait for everybody to be forked: when (maxLvl >= 1) setLogger #endif exec (close comp k) q -- Note: The above is important: it is sketchy to leave any workers running after -- the main thread exits. Subsequent exceptions on child threads, even if -- forwarded asynchronously, can arrive much later at the main thread -- (e.g. after it has exited, or set up a new handler, etc). else sched q atomicModifyIORef_ wrkrtids (tid:) -- logWith (Prelude.head queues) " [dbg-lvish] About to fork workers..." ans <- E.catch (forkit >> fmap Right (takeMVar answerMV)) (\ (e :: E.SomeException) -> do tids <- readIORef wrkrtids logWith (Prelude.head queues) 1 $ " [dbg-lvish] Killing off workers due to exception: "++show tids mapM_ killThread tids -- if length tids < length queues then do -- TODO: we could try to chase these down in the idle list. mytid <- myThreadId -- when (maxLvl >= 1) printLog -- Unfortunately this races with the log printing thread. -- E.throw$ LVarSpecificExn ("EXCEPTION in runPar("++show mytid++"): "++show e) return $! Left e ) logWith (Prelude.head queues) 1 " [dbg-lvish] parent thread escaped unscathed" mlgr <- readIORef (Sched.logger (Prelude.head queues)) logs <- case mlgr of Nothing -> return [] Just lgr -> do L.closeIt lgr L.flushLogs lgr -- If in-memory logging is off, this will be empty. return $! (logs,ans) #else -- Option 2: This was an experiment to use Control.Concurrent.Async to deal with exceptions: ---------------------------------------------------------------------------------- let runWorker (cpu, q) = do if (cpu /= main_cpu) then sched q else let k x = ClosedPar $ \q -> do sched q -- ensure any remaining, enabled threads run to putMVar answerMV x -- completion prior to returning the result in exec (close comp k) q -- Here we want a traditional, fork-join parallel loop with proper exception handling: let loop [] asyncs = mapM_ wait asyncs loop ((cpu,q):tl) asyncs = -- withAsync (runWorker state) withAsyncOn cpu (runWorker (cpu,q)) (\a -> loop tl (a:asyncs)) ---------------------------------------- -- (1) There is a BUG in 'loop' presently: -- "thread blocked indefinitely in an STM transaction" -- loop (zip [0..] queues) [] ---------------------------------------- -- (2) This has the same problem as 'loop': -- ls <- mapM (\ pr@(cpu,_) -> Async.asyncOn cpu (runWorker pr)) (zip [0..] queues) -- mapM_ wait ls ---------------------------------------- -- (3) Using this FOR NOW, but it does NOT pin to the right processors: mapConcurrently runWorker (zip [0..] queues) ---------------------------------------- -- Now that child threads are done, it's safe for the main thread -- to call it quits. takeMVar answerMV #endif defaultRun :: Par b -> IO b defaultRun = fmap (fromRight . snd) . runParDetailed cfg numCapabilities where cfg = DbgCfg { dbgRange = Just (0,dbgLvl) , dbgDests = [L.OutputTo stderr, L.OutputEvents] , dbgScheduling = False } -- | Run a deterministic parallel computation as pure. runPar :: Par a -> a runPar = unsafePerformIO . defaultRun -- | A version that avoids an internal `unsafePerformIO` for calling -- contexts that are already in the `IO` monad. runParIO :: Par a -> IO a runParIO = defaultRun -- | Debugging aid. Return debugging logs, in realtime order, in addition to the -- final result. This is like `runParDetailed` but uses the default settings. runParLogged :: Par a -> IO ([String],a) runParLogged comp = do (logs,ans) <- runParDetailed DbgCfg { dbgRange = (Just (0,dbgLvl)) , dbgDests = [L.OutputEvents, L.OutputInMemory] , dbgScheduling = False } numCapabilities comp return $! (logs,fromRight ans) -- | Convert from a Maybe back to an exception. fromRight :: Either E.SomeException a -> a fromRight (Right x) = x fromRight (Left e) = E.throw e {-# INLINE atomicModifyIORef_ #-} atomicModifyIORef_ :: IORef a -> (a -> a) -> IO () atomicModifyIORef_ ref fn = atomicModifyIORef' ref (\ x -> (fn x,())) {-# NOINLINE unsafeName #-} unsafeName :: a -> Int unsafeName x = unsafePerformIO $ do sn <- makeStableName x return (hashStableName sn) {-# INLINE hpMsg #-} hpMsg :: Sched.State a s -> String -> HandlerPool -> IO () hpMsg q msg hp = do #ifdef DEBUG_LVAR s <- hpId_ hp logWith q 3 $ msg++", pool identity= " ++s #else return () #endif {-# NOINLINE hpId #-} hpId :: HandlerPool -> String hpId hp = unsafePerformIO (hpId_ hp) -- | Debugging tool for printing which HandlerPool hpId_ :: HandlerPool -> IO String hpId_ (HandlerPool cnt bag) = do sn1 <- makeStableName cnt sn2 <- makeStableName bag c <- readIORef cnt return $ show (hashStableName sn1) ++"/"++ show (hashStableName sn2) ++ " transient cnt "++show c -- | For debugging purposes. This can help us figure out (by an ugly -- process of elimination) which MVar reads are leading to a "Thread -- blocked indefinitely" exception. {- busyTakeMVar :: String -> MVar a -> IO a busyTakeMVar msg mv = try (10 * 1000 * 1000) where try 0 = do when dbg $ do tid <- myThreadId -- After we've failed enough times, start complaining: printf "%s not getting anywhere, msg: %s\n" (show tid) msg try (100 * 1000) try n = do x <- tryTakeMVar mv case x of Just y -> return y Nothing -> do yield; try (n-1) -}