module Control.LVish.SchedIdempotent
(
LVar(..), state, HandlerPool(),
Par(..), ClosedPar(..),
yield, newPool, fork, forkHP,
runPar, runParIO,
runParDetailed, runParLogged,
withNewPool, withNewPool_,
forkWithExceptions,
quiesce, quiesceAll,
logStrLn, dbgLvl, getLogger,
newLV, getLV, putLV, putLV_, freezeLV, freezeLVAfter,
addHandler, liftIO, toss,
mkPar, Status(..), sched, Listener(..)
) where
import Control.Monad hiding (sequence, join)
import Control.Concurrent hiding (yield)
import qualified Control.Concurrent as Conc
import qualified Control.Exception as E
import Control.DeepSeq
import Control.Applicative
import Control.LVish.MonadToss
import Control.LVish.Logging as L
import Debug.Trace(trace)
import Data.IORef
import Data.Atomics
import Data.Typeable
import qualified Data.Concurrent.Counter as C
import qualified Data.Concurrent.Bag as B
import GHC.Conc hiding (yield)
import System.IO
import System.IO.Unsafe (unsafePerformIO)
import System.Environment(getEnvironment)
import System.Mem.StableName (makeStableName, hashStableName)
import Prelude hiding (mapM, sequence, head, tail)
import qualified Prelude
import System.Random (random)
#ifdef DEBUG_LVAR
import Text.Printf (printf)
#endif
import Data.Traversable
import Control.LVish.Types
import qualified Control.LVish.SchedIdempotentInternal as Sched
data LVar a d = LVar {
state :: a,
status :: !(IORef (Status d)),
name :: !LVarID
}
type LVarID = IORef ()
newLVID = newIORef ()
noName :: LVarID
noName = unsafePerformIO $ newLVID
data Status d
= Freezing
| Frozen
| Active (B.Bag (Listener d))
data Listener d = Listener {
onUpdate :: d -> B.Token (Listener d) -> SchedState -> IO (),
onFreeze :: B.Token (Listener d) -> SchedState -> IO ()
}
data HandlerPool = HandlerPool {
numHandlers :: C.Counter,
blockedOnQuiesce :: B.Bag ClosedPar
}
newtype Par a = Par {
close :: (a -> ClosedPar) -> ClosedPar
}
newtype ClosedPar = ClosedPar {
exec :: SchedState -> IO ()
}
type SchedState = Sched.State ClosedPar LVarID
instance Functor Par where
fmap f m = Par $ \k -> close m (k . f)
instance Monad Par where
return a = Par $ \k -> k a
m >>= c = Par $ \k -> close m $ \a -> close (c a) k
instance Applicative Par where
(<*>) = ap
pure = return
mkPar :: ((a -> ClosedPar) -> SchedState -> IO ()) -> Par a
mkPar f = Par $ \k -> ClosedPar $ \q -> f k q
whenJust :: Maybe a -> (a -> IO ()) -> IO ()
whenJust Nothing _ = return ()
whenJust (Just a) f = f a
isFrozen :: LVar a d -> IO Bool
isFrozen (LVar {status}) = do
curStatus <- readIORef status
case curStatus of
Active _ -> return False
_ -> return True
logStrLn :: Int -> String -> Par ()
#ifdef DEBUG_LVAR
logStrLn lvl str = when (dbgLvl >= 1) $ do
lgr <- getLogger
num <- getWorkerNum
liftIO$ L.logOn lgr (L.StrMsg lvl ("(wrkr"++show num ++") "++ str))
#else
logStrLn _ _ = return ()
#endif
logWith :: Sched.State a s -> Int -> String -> IO ()
#ifdef DEBUG_LVAR
logWith q lvl str = when (dbgLvl >= 1) $ do
Just lgr <- readIORef (Sched.logger q)
L.logOn lgr (L.StrMsg lvl str)
#else
logWith _ _ _ = return ()
#endif
newLV :: IO a -> Par (LVar a d)
newLV init = mkPar $ \k q -> do
state <- init
listeners <- B.new
status <- newIORef $ Active listeners
name <- newLVID
exec (k $ LVar {state, status, name}) q
getLV :: (LVar a d)
-> (a -> Bool -> IO (Maybe b))
-> (d -> IO (Maybe b))
-> Par b
getLV lv@(LVar {state, status}) globalThresh deltaThresh = mkPar $ \k q -> do
let uniqsuf = ", lv "++(show$ unsafeName state)++" on worker "++(show$ Sched.no q)
logWith q 7$ " [dbg-lvish] getLV: first readIORef "++uniqsuf
curStatus <- readIORef status
case curStatus of
Active listeners -> do
logWith q 7$ " [dbg-lvish] getLV (active): check globalThresh"++uniqsuf
tripped <- globalThresh state False
case tripped of
Just b -> exec (k b) q
Nothing -> do
#if GET_ONCE
execFlag <- newIORef False
#endif
let onUpdate d = unblockWhen $ deltaThresh d
onFreeze = unblockWhen $ globalThresh state True
unblockWhen thresh tok q = do
let uniqsuf = ", lv "++(show$ unsafeName state)++" on worker "++(show$ Sched.no q)
logWith q 7$ " [dbg-lvish] getLV (active): callback: check thresh"++uniqsuf
tripped <- thresh
whenJust tripped $ \b -> do
B.remove tok
#if GET_ONCE
logWith q 8$ " [dbg-lvish] getLV (active): read execFlag for dedup"++uniqsuf
ticket <- readForCAS execFlag
unless (peekTicket ticket) $ do
(winner, _) <- do logWith q 8$ " [dbg-lvish] getLV (active): CAS execFlag dedup"++uniqsuf
casIORef execFlag ticket True
when winner $ Sched.pushWork q (k b)
#else
Sched.pushWork q (k b)
#endif
logWith q 4$ " [dbg-lvish] getLV: blocking on LVar, registering listeners"++uniqsuf
tok <- B.put listeners $ Listener onUpdate onFreeze
logWith q 8$ " [dbg-lvish] getLV (active): second frozen check"++uniqsuf
frozen <- isFrozen lv
logWith q 7$ " [dbg-lvish] getLV (active): second globalThresh check"++uniqsuf
tripped' <- globalThresh state frozen
case tripped' of
Just b -> do
logWith q 7$ " [dbg-lvish] getLV (active): second globalThresh tripped, remove tok"++uniqsuf
B.remove tok
exec (k b) q
Nothing -> sched q
_ -> do
logWith q 7$ " [dbg-lvish] getLV (frozen): about to check globalThresh"++uniqsuf
tripped <- globalThresh state True
case tripped of
Just b -> do
exec (k b) q
Nothing -> sched q
putLV_ :: LVar a d
-> (a -> Par (Maybe d, b))
-> Par b
putLV_ LVar {state, status, name} doPut = mkPar $ \k q -> do
let uniqsuf = ", lv "++(show$ unsafeName state)++" on worker "++(show$ Sched.no q)
putAfterFrzExn = E.throw$ PutAfterFreezeExn "Attempt to change a frozen LVar"
logWith q 8 $ " [dbg-lvish] putLV: initial lvar status read"++uniqsuf
fstStatus <- readIORef status
case fstStatus of
Freezing -> putAfterFrzExn
Frozen -> putAfterFrzExn
Active listeners -> do
logWith q 8 $ " [dbg-lvish] putLV: setStatus,"++uniqsuf
Sched.setStatus q name
let cont (delta, ret) = ClosedPar $ \q -> do
logWith q 8 $ " [dbg-lvish] putLV: read final status before unsetting"++uniqsuf
sndStatus <- readIORef status
logWith q 8 $ " [dbg-lvish] putLV: UN-setStatus"++uniqsuf
Sched.setStatus q noName
whenJust delta $ \d -> do
case sndStatus of
Frozen -> putAfterFrzExn
_ -> do
logWith q 9 $ " [dbg-lvish] putLV: calling each listener's onUpdate"++uniqsuf
B.foreach listeners $ \(Listener onUpdate _) tok -> onUpdate d tok q
exec (k ret) q
logWith q 5 $ " [dbg-lvish] putLV: about to mutate lvar"++uniqsuf
exec (close (doPut state) cont) q
putLV :: LVar a d
-> (a -> IO (Maybe d))
-> Par ()
putLV lv doPut = putLV_ lv doPut'
where doPut' a = do r <- liftIO (doPut a); return (r, ())
freezeLV :: LVar a d -> Par ()
freezeLV LVar {name, status} = mkPar $ \k q -> do
let uniqsuf = ", lv "++(show$ unsafeName state)++" on worker "++(show$ Sched.no q)
logWith q 5 $ " [dbg-lvish] freezeLV: atomic modify status to Freezing"++uniqsuf
oldStatus <- atomicModifyIORef status $ \s -> (Freezing, s)
case oldStatus of
Frozen -> return ()
Freezing -> return ()
Active listeners -> do
logWith q 7 $ " [dbg-lvish] freezeLV: begin busy-wait for putter status"++uniqsuf
Sched.await q (name /=)
logWith q 7 $ " [dbg-lvish] freezeLV: calling each listener's onFreeze"++uniqsuf
B.foreach listeners $ \Listener {onFreeze} tok -> onFreeze tok q
logWith q 7 $ " [dbg-lvish] freezeLV: finalizing status as Frozen"++uniqsuf
writeIORef status Frozen
exec (k ()) q
newPool :: Par HandlerPool
newPool = mkPar $ \k q -> do
cnt <- C.new
bag <- B.new
let hp = HandlerPool cnt bag
hpMsg q " [dbg-lvish] Created new pool" hp
exec (k hp) q
withNewPool :: (HandlerPool -> Par a) -> Par (a, HandlerPool)
withNewPool f = do
hp <- newPool
a <- f hp
return (a, hp)
withNewPool_ :: (HandlerPool -> Par ()) -> Par HandlerPool
withNewPool_ f = do
hp <- newPool
f hp
return hp
data DecStatus = HasDec | HasNotDec
closeInPool :: Maybe HandlerPool -> Par () -> IO ClosedPar
closeInPool Nothing c = return $ close c $ const (ClosedPar sched)
closeInPool (Just hp) c = do
decRef <- newIORef HasNotDec
let cnt = numHandlers hp
tryDecRef = do
ticket <- readForCAS decRef
case peekTicket ticket of
HasDec -> return False
HasNotDec -> do
(firstToDec, _) <- casIORef decRef ticket HasDec
return firstToDec
onFinishHandler _ = ClosedPar $ \q -> do
shouldDec <- tryDecRef
when shouldDec $ do
C.dec cnt
quiescent <- C.poll cnt
when quiescent $ do
hpMsg q " [dbg-lvish] -> Quiescent now.. waking conts" hp
let invoke t tok = do
B.remove tok
Sched.pushWork q t
B.foreach (blockedOnQuiesce hp) invoke
sched q
C.inc $ numHandlers hp
return $ close c onFinishHandler
addHandler :: Maybe HandlerPool
-> LVar a d
-> (a -> Par ())
-> (d -> IO (Maybe (Par ())))
-> Par ()
addHandler hp LVar {state, status} globalCB updateThresh =
let spawnWhen thresh q = do
tripped <- thresh
whenJust tripped $ \cb -> do
logWith q 5 " [dbg-lvish] addHandler: Delta threshold triggered, pushing work.."
closed <- closeInPool hp cb
Sched.pushWork q closed
onUpdate d _ q = spawnWhen (updateThresh d) q
onFreeze _ _ = return ()
runWhen thresh q = do
tripped <- thresh
whenJust tripped $ \cb ->
exec (close cb nullCont) q
in mkPar $ \k q -> do
curStatus <- readIORef status
case curStatus of
Active listeners ->
do B.put listeners $ Listener onUpdate onFreeze; return ()
Frozen -> return ()
Freezing -> return ()
logWith q 4 " [dbg-lvish] addHandler: calling globalCB.."
exec (close (globalCB state) nullCont) q
exec (k ()) q
nullCont = (\() -> ClosedPar (\_ -> return ()))
quiesce :: HandlerPool -> Par ()
quiesce hp@(HandlerPool cnt bag) = mkPar $ \k q -> do
hpMsg q " [dbg-lvish] Begin quiescing pool, identity= " hp
tok <- B.put bag (k ())
quiescent <- C.poll cnt
if quiescent then do
B.remove tok
hpMsg q " [dbg-lvish] -> Quiescent already!" hp
exec (k ()) q
else do
hpMsg q " [dbg-lvish] -> Not quiescent yet, back to sched" hp
sched q
quiesceAll :: Par ()
quiesceAll = mkPar $ \k q -> do
sched q
logWith q 1 " [dbg-lvish] Return from global barrier."
exec (k ()) q
freezeLVAfter :: LVar a d
-> (a -> Par ())
-> (d -> IO (Maybe (Par ())))
-> Par ()
freezeLVAfter lv globalCB updateCB = do
let globalCB' = globalCB
updateCB' = updateCB
hp <- newPool
addHandler (Just hp) lv globalCB' updateCB'
quiesce hp
freezeLV lv
forkHP :: Maybe HandlerPool -> Par () -> Par ()
forkHP mh child = mkPar $ \k q -> do
closed <- closeInPool mh child
Sched.pushWork q (k ())
exec closed q
fork :: Par () -> Par ()
fork f = forkHP Nothing f
liftIO :: IO a -> Par a
liftIO io = mkPar $ \k q -> do
r <- io
exec (k r) q
getLogger :: Par L.Logger
getLogger = mkPar $ \k q -> do
Just lgr <- readIORef (Sched.logger q)
exec (k lgr) q
getWorkerNum :: Par Int
getWorkerNum = mkPar $ \k q -> exec (k (Sched.no q)) q
instance MonadToss Par where
toss = mkPar $ \k q -> do
g <- readIORef $ Sched.prng q
let (b, g' ) = random g
writeIORef (Sched.prng q) g'
exec (k b) q
yield :: Par ()
yield = mkPar $ \k q -> do
Sched.yieldWork q (k ())
sched q
sched :: SchedState -> IO ()
sched q = do
n <- Sched.next q
case n of
Just t -> exec t q
Nothing -> return ()
instance NFData (LVar a d) where
rnf _ = ()
runParDetailed :: DbgCfg
-> Int
-> Par a
-> IO ([String], Either E.SomeException a)
runParDetailed DbgCfg {dbgRange, dbgDests, dbgScheduling } numWrkrs comp = do
queues <- Sched.new numWrkrs noName
main_cpu <- Sched.currentCPU
answerMV <- newEmptyMVar
wrkrtids <- newIORef []
let setLogger = do
ls <- readIORef wrkrtids
if length ls == numWrkrs
then Sched.initLogger queues ls (minLvl,maxLvl) dbgDests dbgScheduling
else do Conc.yield
setLogger
(minLvl, maxLvl) = case dbgRange of
Just b -> b
Nothing -> (0,dbgLvl)
#if 1
let forkit = forM_ (zip [0..] queues) $ \(cpu, q) -> do
tid <- L.forkWithExceptions (forkOn cpu) "worker thread" $ do
if cpu == main_cpu
then let k x = ClosedPar $ \q -> do
sched q
putMVar answerMV x
in do
#ifdef DEBUG_LVAR
when (maxLvl >= 1) setLogger
#endif
exec (close comp k) q
else sched q
atomicModifyIORef_ wrkrtids (tid:)
ans <- E.catch (forkit >> fmap Right (takeMVar answerMV))
(\ (e :: E.SomeException) -> do
tids <- readIORef wrkrtids
logWith (Prelude.head queues) 1 $ " [dbg-lvish] Killing off workers due to exception: "++show tids
mapM_ killThread tids
mytid <- myThreadId
return $! Left e
)
logWith (Prelude.head queues) 1 " [dbg-lvish] parent thread escaped unscathed"
mlgr <- readIORef (Sched.logger (Prelude.head queues))
logs <- case mlgr of
Nothing -> return []
Just lgr -> do L.closeIt lgr
L.flushLogs lgr
return $! (logs,ans)
#else
let runWorker (cpu, q) = do
if (cpu /= main_cpu)
then sched q
else let k x = ClosedPar $ \q -> do
sched q
putMVar answerMV x
in exec (close comp k) q
let loop [] asyncs = mapM_ wait asyncs
loop ((cpu,q):tl) asyncs =
withAsyncOn cpu (runWorker (cpu,q))
(\a -> loop tl (a:asyncs))
mapConcurrently runWorker (zip [0..] queues)
takeMVar answerMV
#endif
defaultRun :: Par b -> IO b
defaultRun = fmap (fromRight . snd) .
runParDetailed cfg numCapabilities
where
cfg = DbgCfg { dbgRange = Just (0,dbgLvl)
, dbgDests = [L.OutputTo stderr, L.OutputEvents]
, dbgScheduling = False }
runPar :: Par a -> a
runPar = unsafePerformIO . defaultRun
runParIO :: Par a -> IO a
runParIO = defaultRun
runParLogged :: Par a -> IO ([String],a)
runParLogged comp = do
(logs,ans) <- runParDetailed
DbgCfg { dbgRange = (Just (0,dbgLvl))
, dbgDests = [L.OutputEvents, L.OutputInMemory]
, dbgScheduling = False }
numCapabilities comp
return $! (logs,fromRight ans)
fromRight :: Either E.SomeException a -> a
fromRight (Right x) = x
fromRight (Left e) = E.throw e
atomicModifyIORef_ :: IORef a -> (a -> a) -> IO ()
atomicModifyIORef_ ref fn = atomicModifyIORef' ref (\ x -> (fn x,()))
unsafeName :: a -> Int
unsafeName x = unsafePerformIO $ do
sn <- makeStableName x
return (hashStableName sn)
hpMsg :: Sched.State a s -> String -> HandlerPool -> IO ()
hpMsg q msg hp = do
#ifdef DEBUG_LVAR
s <- hpId_ hp
logWith q 3 $ msg++", pool identity= " ++s
#else
return ()
#endif
hpId :: HandlerPool -> String
hpId hp = unsafePerformIO (hpId_ hp)
hpId_ :: HandlerPool -> IO String
hpId_ (HandlerPool cnt bag) = do
sn1 <- makeStableName cnt
sn2 <- makeStableName bag
c <- readIORef cnt
return $ show (hashStableName sn1) ++"/"++ show (hashStableName sn2) ++
" transient cnt "++show c