module Control.LVish.SchedIdempotent
(
LVar(..), state, HandlerPool(),
Par(..), ClosedPar(..),
yield, newPool, fork, forkHP,
runPar, runParIO,
runParDetailed, runParLogged,
withNewPool, withNewPool_,
forkWithExceptions,
quiesce, quiesceAll,
logStrLn, dbgLvl, getLogger,
newLV, getLV, putLV, putLV_, freezeLV, freezeLVAfter,
addHandler, liftIO, toss,
mkPar, Status(..), sched, Listener(..)
) where
import Control.Monad hiding (sequence, join)
import Control.Concurrent hiding (yield)
import qualified Control.Concurrent as Conc
import qualified Control.Exception as E
import qualified Control.Concurrent.Async as A
import Control.DeepSeq
import Control.Applicative
import Control.LVish.MonadToss
import Control.LVish.Logging as L
import Debug.Trace(trace)
import Data.IORef
import Data.Atomics
import Data.Typeable
import qualified Data.Atomics.Counter as C2
import qualified Data.Concurrent.Counter as C
import qualified Data.Concurrent.Bag as B
import GHC.Conc hiding (yield)
import qualified GHC.Conc
import System.IO
import System.IO.Unsafe (unsafePerformIO)
import System.Environment(getEnvironment)
import System.Mem.StableName (makeStableName, hashStableName)
import Prelude hiding (mapM, sequence, head, tail)
import qualified Prelude
import System.Random (random)
import Text.Printf (printf, hPrintf)
import Data.Traversable hiding (forM)
import Control.LVish.Types
import qualified Control.LVish.SchedIdempotentInternal as Sched
data LVar a d = LVar {
state :: a,
status :: !(IORef (Status d)),
name :: !LVarID
}
type LVarID = IORef ()
newLVID = newIORef ()
noName :: LVarID
noName = unsafePerformIO $ newLVID
data Status d
= Freezing
| Frozen
| Active (B.Bag (Listener d))
data Listener d = Listener {
onUpdate :: d -> B.Token (Listener d) -> SchedState -> IO (),
onFreeze :: B.Token (Listener d) -> SchedState -> IO ()
}
data HandlerPool = HandlerPool {
numHandlers :: C.Counter,
blockedOnQuiesce :: B.Bag ClosedPar
}
newtype Par a = Par {
close :: (a -> ClosedPar) -> ClosedPar
}
newtype ClosedPar = ClosedPar {
exec :: SchedState -> IO ()
}
type SchedState = Sched.State ClosedPar LVarID
instance Functor Par where
fmap f m = Par $ \k -> close m (k . f)
instance Monad Par where
return a = Par $ \k -> k a
m >>= c = Par $ \k -> close m $ \a -> close (c a) k
instance Applicative Par where
(<*>) = ap
pure = return
mkPar :: ((a -> ClosedPar) -> SchedState -> IO ()) -> Par a
mkPar f = Par $ \k -> ClosedPar $ \q -> f k q
whenJust :: Maybe a -> (a -> IO ()) -> IO ()
whenJust Nothing _ = return ()
whenJust (Just a) f = f a
isFrozen :: LVar a d -> IO Bool
isFrozen (LVar {status}) = do
curStatus <- readIORef status
case curStatus of
Active _ -> return False
_ -> return True
logStrLn :: Int -> String -> Par ()
#ifdef DEBUG_LVAR
logStrLn lvl str = when (dbgLvl >= 1) $ do
lgr <- getLogger
num <- getWorkerNum
if lvl < 0
then liftIO$ logHelper (Just lgr) num (L.OffTheRecord (lvl) str)
else liftIO$ logHelper (Just lgr) num (L.StrMsg lvl str)
#else
logStrLn _ _ = return ()
#endif
logHelper :: Maybe Logger -> Int -> LogMsg -> IO ()
logHelper lgr num msg = when (dbgLvl >= 1) $ do
let msg' = L.mapMsg (("wrkr"++show num++" ")++) msg
case lgr of
Just lgr -> L.logOn lgr msg'
Nothing -> hPutStrLn stderr ("WARNING/nologger:"++show msg')
logWith :: Sched.State a s -> Int -> String -> IO ()
logOffRecord :: Sched.State a s -> Int -> String -> IO ()
#ifdef DEBUG_LVAR
logWith q lvl str = logHelper (Sched.logger q) (Sched.no q) (L.StrMsg lvl str)
logOffRecord q lvl str = logHelper (Sched.logger q) (Sched.no q) (L.OffTheRecord lvl str)
#else
logWith _ _ _ = return ()
logOffRecord _ _ _ = return ()
#endif
newLV :: IO a -> Par (LVar a d)
newLV init = mkPar $ \k q -> do
logOffRecord q 7$ " [dbg-lvish] newLV: allocating... "
state <- init
listeners <- B.new
status <- newIORef $ Active listeners
name <- newLVID
exec (k $ LVar {state, status, name}) q
getLV :: (LVar a d)
-> (a -> Bool -> IO (Maybe b))
-> (d -> IO (Maybe b))
-> Par b
getLV lv@(LVar {state, status}) globalThresh deltaThresh = mkPar $ \k q -> do
let uniqsuf = ", lv "++(show$ unsafeName state)++" on worker "++(show$ Sched.no q)
logWith q 7$ " [dbg-lvish] getLV: first readIORef "++uniqsuf
curStatus <- readIORef status
case curStatus of
Active listeners -> do
logWith q 7$ " [dbg-lvish] getLV (active): check globalThresh"++uniqsuf
tripped <- globalThresh state False
case tripped of
Just b -> exec (k b) q
Nothing -> do
execFlag <- newDedupCheck
let onUpdate d = unblockWhen $ deltaThresh d
onFreeze = unblockWhen $ globalThresh state True
unblockWhen thresh tok q = do
let uniqsuf = ", lv "++(show$ unsafeName state)++" on worker "++(show$ Sched.no q)
logWith q 7$ " [dbg-lvish] getLV (active): callback: check thresh"++uniqsuf
tripped <- thresh
whenJust tripped $ \b -> do
B.remove tok
winnerCheck execFlag q (Sched.pushWork q (k b)) (return ())
logWith q 8$ " [dbg-lvish] getLV "++show(unsafeName execFlag)++
": blocking on LVar, registering listeners..."
tok <- B.put listeners $ Listener onUpdate onFreeze
logWith q 8$ " [dbg-lvish] getLV (active): second frozen check"++uniqsuf
frozen <- isFrozen lv
logWith q 7$ " [dbg-lvish] getLV (active): second globalThresh check"++uniqsuf
tripped' <- globalThresh state frozen
case tripped' of
Just b -> do
logWith q 7$ " [dbg-lvish] getLV (active): second globalThresh tripped, remove tok"++uniqsuf
B.remove tok
winnerCheck execFlag q (exec (k b) q) (sched q)
Nothing -> sched q
_ -> do
logWith q 7$ " [dbg-lvish] getLV (frozen): about to check globalThresh"++uniqsuf
tripped <- globalThresh state True
case tripped of
Just b -> do
exec (k b) q
Nothing -> sched q
winnerCheck :: DedupCell -> Sched.State a s -> IO () -> IO () -> IO ()
newDedupCheck :: IO DedupCell
#if GET_ONCE
# if 0
type DedupCell = IORef Bool
newDedupCheck = newIORef False
winnerCheck execFlag q tru fal = do
ticket <- readForCAS execFlag
if (peekTicket ticket)
then do logWith q 8 $ " [dbg-lvish] getLV winnerCheck failed.."
fal
else do
(winner, _) <- casIORef execFlag ticket True
logWith q 8 $ " [dbg-lvish] getLV "++show(unsafeName execFlag)
++" on worker "++ (show$ Sched.no q) ++": winner check? " ++show winner
++ ", ticks " ++ show (ticket, peekTicket ticket)
if winner then tru else fal
# else
type DedupCell = C2.AtomicCounter
newDedupCheck = C2.newCounter 0
winnerCheck execFlag q tru fal = do
cnt <- C2.incrCounter 1 execFlag
logWith q 8 $ " [dbg-lvish] getLV "++show(unsafeName execFlag)
++" on worker "++ (show$ Sched.no q) ++": winner check? " ++show (cnt==1)
++ ", counter val " ++ show cnt
if cnt==1 then tru else fal
# endif
#else
type DedupCell = ()
newDedupCheck = return ()
winnerCheck _ _ tr _ = tr
#endif
putLV_ :: LVar a d
-> (a -> Par (Maybe d, b))
-> Par b
putLV_ LVar {state, status, name} doPut = mkPar $ \k q -> do
let uniqsuf = ", lv "++(show$ unsafeName state)++" on worker "++(show$ Sched.no q)
putAfterFrzExn = E.throw$ PutAfterFreezeExn "Attempt to change a frozen LVar"
logWith q 8 $ " [dbg-lvish] putLV: initial lvar status read"++uniqsuf
fstStatus <- readIORef status
case fstStatus of
Freezing -> putAfterFrzExn
Frozen -> putAfterFrzExn
Active listeners -> do
logWith q 8 $ " [dbg-lvish] putLV: setStatus,"++uniqsuf
Sched.setStatus q name
let cont (delta, ret) = ClosedPar $ \q -> do
logWith q 8 $ " [dbg-lvish] putLV: read final status before unsetting"++uniqsuf
sndStatus <- readIORef status
logWith q 8 $ " [dbg-lvish] putLV: UN-setStatus"++uniqsuf
Sched.setStatus q noName
whenJust delta $ \d -> do
case sndStatus of
Frozen -> putAfterFrzExn
_ -> do
logWith q 9 $ " [dbg-lvish] putLV: calling each listener's onUpdate"++uniqsuf
B.foreach listeners $ \(Listener onUpdate _) tok -> onUpdate d tok q
exec (k ret) q
logWith q 5 $ " [dbg-lvish] putLV: about to mutate lvar"++uniqsuf
exec (close (doPut state) cont) q
putLV :: LVar a d
-> (a -> IO (Maybe d))
-> Par ()
putLV lv doPut = putLV_ lv doPut'
where doPut' a = do r <- liftIO (doPut a); return (r, ())
freezeLV :: LVar a d -> Par ()
freezeLV LVar {name, status} = mkPar $ \k q -> do
let uniqsuf = ", lv "++(show$ unsafeName state)++" on worker "++(show$ Sched.no q)
logWith q 5 $ " [dbg-lvish] freezeLV: atomic modify status to Freezing"++uniqsuf
oldStatus <- atomicModifyIORef status $ \s -> (Freezing, s)
case oldStatus of
Frozen -> return ()
Freezing -> return ()
Active listeners -> do
logWith q 7 $ " [dbg-lvish] freezeLV: begin busy-wait for putter status"++uniqsuf
Sched.await q (name /=)
logWith q 7 $ " [dbg-lvish] freezeLV: calling each listener's onFreeze"++uniqsuf
B.foreach listeners $ \Listener {onFreeze} tok -> onFreeze tok q
logWith q 7 $ " [dbg-lvish] freezeLV: finalizing status as Frozen"++uniqsuf
writeIORef status Frozen
exec (k ()) q
newPool :: Par HandlerPool
newPool = mkPar $ \k q -> do
cnt <- C.new
bag <- B.new
let hp = HandlerPool cnt bag
hpMsg q " [dbg-lvish] Created new pool" hp
exec (k hp) q
withNewPool :: (HandlerPool -> Par a) -> Par (a, HandlerPool)
withNewPool f = do
hp <- newPool
a <- f hp
return (a, hp)
withNewPool_ :: (HandlerPool -> Par ()) -> Par HandlerPool
withNewPool_ f = do
hp <- newPool
f hp
return hp
data DecStatus = HasDec | HasNotDec
closeInPool :: Maybe HandlerPool -> Par () -> IO ClosedPar
closeInPool Nothing c = return $ close c $ const (ClosedPar sched)
closeInPool (Just hp) c = do
decRef <- newIORef HasNotDec
let cnt = numHandlers hp
tryDecRef = do
ticket <- readForCAS decRef
case peekTicket ticket of
HasDec -> return False
HasNotDec -> do
(firstToDec, _) <- casIORef decRef ticket HasDec
return firstToDec
onFinishHandler _ = ClosedPar $ \q -> do
shouldDec <- tryDecRef
when shouldDec $ do
C.dec cnt
quiescent <- C.poll cnt
when quiescent $ do
hpMsg q " [dbg-lvish] -> Quiescent now.. waking conts" hp
let invoke t tok = do
B.remove tok
Sched.pushWork q t
B.foreach (blockedOnQuiesce hp) invoke
sched q
C.inc $ numHandlers hp
return $ close c onFinishHandler
addHandler :: Maybe HandlerPool
-> LVar a d
-> (a -> Par ())
-> (d -> IO (Maybe (Par ())))
-> Par ()
addHandler hp LVar {state, status} globalCB updateThresh =
let spawnWhen thresh q = do
tripped <- thresh
whenJust tripped $ \cb -> do
logWith q 5 " [dbg-lvish] addHandler: Delta threshold triggered, pushing work.."
closed <- closeInPool hp cb
Sched.pushWork q closed
onUpdate d _ q = spawnWhen (updateThresh d) q
onFreeze _ _ = return ()
in mkPar $ \k q -> do
curStatus <- readIORef status
case curStatus of
Active listeners ->
do B.put listeners $ Listener onUpdate onFreeze; return ()
Frozen -> return ()
Freezing -> return ()
logWith q 4 " [dbg-lvish] addHandler: calling globalCB.."
exec (close (globalCB state) k) q
quiesce :: HandlerPool -> Par ()
quiesce hp@(HandlerPool cnt bag) = mkPar $ \k q -> do
hpMsg q " [dbg-lvish] Begin quiescing pool, identity= " hp
tok <- B.put bag (k ())
hpMsg q " [dbg-lvish] quiesce: poll count" hp
quiescent <- C.poll cnt
if quiescent then do
hpMsg q " [dbg-lvish] already quiesced, remove token from bag" hp
B.remove tok
exec (k ()) q
else do
logOffRecord q 4 " [dbg-lvish] -> Not quiescent yet, back to sched"
sched q
quiesceAll :: Par ()
quiesceAll = mkPar $ \k q -> do
logWith q 1 " [dbg-lvish] quiesceAll: initiating global barrier."
sched q
logWith q 1 " [dbg-lvish] quiesceAll: Past global barrier."
exec (k ()) q
freezeLVAfter :: LVar a d
-> (a -> Par ())
-> (d -> IO (Maybe (Par ())))
-> Par ()
freezeLVAfter lv globalCB updateCB = do
let globalCB' = globalCB
updateCB' = updateCB
hp <- newPool
addHandler (Just hp) lv globalCB' updateCB'
quiesce hp
freezeLV lv
forkHP :: Maybe HandlerPool -> Par () -> Par ()
forkHP mh child = mkPar $ \k q -> do
closed <- closeInPool mh child
Sched.pushWork q (k ())
exec closed q
fork :: Par () -> Par ()
fork f = forkHP Nothing f
liftIO :: IO a -> Par a
liftIO io = mkPar $ \k q -> do
r <- io
exec (k r) q
getLogger :: Par L.Logger
getLogger = mkPar $ \k q ->
let Just lgr = Sched.logger q in
exec (k lgr) q
getWorkerNum :: Par Int
getWorkerNum = mkPar $ \k q -> exec (k (Sched.no q)) q
instance MonadToss Par where
toss = mkPar $ \k q -> do
g <- readIORef $ Sched.prng q
let (b, g' ) = random g
writeIORef (Sched.prng q) g'
exec (k b) q
yield :: Par ()
yield = mkPar $ \k q -> do
Sched.yieldWork q (k ())
sched q
sched :: SchedState -> IO ()
sched q = do
n <- Sched.next q
case n of
Just t -> exec t q
Nothing -> return ()
instance NFData (LVar a d) where
rnf _ = ()
runParDetailed :: DbgCfg
-> Int
-> Par a
-> IO ([String], Either E.SomeException a)
runParDetailed cfg@DbgCfg{dbgRange, dbgDests, dbgScheduling } numWrkrs comp = do
(lgr,queues) <- Sched.new cfg numWrkrs noName
main_cpu <- Sched.currentCPU
answerMV <- newEmptyMVar
let grabLogs = do
logOffRecord (Prelude.head queues) 1 " [dbg-lvish] parent thread escaped unscathed. Optionally closing logger."
case lgr of
Nothing -> return []
Just lgr -> do L.closeIt lgr
L.flushLogs lgr
mlog s = case lgr of
Nothing -> return ()
Just l -> L.logOn l (L.OffTheRecord 4 s)
let runWorker :: (Int,Sched.State ClosedPar LVarID) -> IO ()
runWorker (cpu, q) = do
if (cpu /= main_cpu)
then do logOffRecord q 3 $ " [dbg-lvish] Auxillary worker #"++show cpu++" starting."
sched q
logOffRecord q 3 $ " [dbg-lvish] Auxillary worker #"++show cpu++" exitting."
else let k x = ClosedPar $ \q -> do
logOffRecord q 3 " [dbg-lvish] Final continuation of main worker: reenter sched to cleanup."
sched q
logOffRecord q 3 " [dbg-lvish] Main worker: past global barrier, putting answer."
b <- tryPutMVar answerMV x
#ifdef GET_ONCE
unless b $ error "Final continuation of Par computation was duplicated, in spite of GET_ONCE!"
#endif
return ()
in do logOffRecord q 3 " [dbg-lvish] Main worker thread starting."
exec (close comp k) q
let loop [] asyncs = do tid <- myThreadId
mlog $ " [dbg-lvish] (tid "++show tid++") Wait on at least one async to complete.."
(_,x) <- A.waitAnyCatch asyncs
case x of
Left e -> return $! Left e
Right () -> waitloop asyncs
loop ((cpu,q):tl) asyncs =
A.withAsyncOn cpu (runWorker (cpu,q))
(\a -> loop tl (a:asyncs))
waitloop [] = do
mlog " [dbg-lvish] All asyncs complete, read final answer MVar."
fmap Right (dbgTakeMVar [] "runPar/final answer" answerMV)
waitloop (hd:tl) = do mlog " [dbg-lvish] Waiting for one async.."
me <- A.waitCatch hd
case me of
Left e -> return $! Left e
Right () -> waitloop tl
ans <- loop (zip [0..] queues) []
logs <- grabLogs
return $! (logs,ans)
defaultRun :: Par b -> IO b
defaultRun = fmap (fromRight . snd) .
runParDetailed cfg numCapabilities
where
cfg = DbgCfg { dbgRange = Just (0,dbgLvl)
, dbgDests = [L.OutputTo stderr, L.OutputEvents]
, dbgScheduling = False }
runPar :: Par a -> a
runPar = unsafePerformIO . defaultRun
runParIO :: Par a -> IO a
runParIO = defaultRun
runParLogged :: Par a -> IO ([String],a)
runParLogged comp = do
(logs,ans) <- runParDetailed
DbgCfg { dbgRange = (Just (0,dbgLvl))
, dbgDests = [L.OutputEvents, L.OutputInMemory]
, dbgScheduling = False }
numCapabilities comp
return $! (logs,fromRight ans)
fromRight :: Either E.SomeException a -> a
fromRight (Right x) = x
fromRight (Left e) = E.throw e
atomicModifyIORef_ :: IORef a -> (a -> a) -> IO ()
atomicModifyIORef_ ref fn = atomicModifyIORef' ref (\ x -> (fn x,()))
unsafeName :: a -> Int
unsafeName x = unsafePerformIO $ do
sn <- makeStableName x
return (hashStableName sn)
hpMsg :: Sched.State a s -> String -> HandlerPool -> IO ()
hpMsg q msg hp = do
#ifdef DEBUG_LVAR
s <- hpId_ hp
logWith q 3 $ msg++", pool identity= " ++s
#else
return ()
#endif
hpId :: HandlerPool -> String
hpId hp = unsafePerformIO (hpId_ hp)
hpId_ :: HandlerPool -> IO String
hpId_ (HandlerPool cnt bag) = do
sn1 <- makeStableName cnt
sn2 <- makeStableName bag
c <- readIORef cnt
return $ show (hashStableName sn1) ++"/"++ show (hashStableName sn2) ++
" transient cnt "++show c
busyTakeMVar :: [ThreadId] -> String -> MVar a -> IO a
busyTakeMVar tids msg mv =
do b <- L.newBackoff maxWait
try b
where
maxWait = 10000
timeOut = (3 * 1000 * 1000)
try bkoff | totalWait bkoff >= timeOut = do
error "OVER WAIT"
tid <- myThreadId
hPrintf stderr "%s not unblocked yet, for: %s\n" (show tid) msg
stats <- Prelude.mapM threadStatus tids
hPrintf stderr $ "Worker statuses: " ++ show (zip tids stats) ++"\n"
try =<< L.backoff bkoff
try bkoff = do
x <- tryTakeMVar mv
case x of
Just y -> return y
Nothing -> try =<< L.backoff bkoff
#ifdef DEBUG_LVAR
dbgTakeMVar = busyTakeMVar
#else
dbgTakeMVar _ _ = takeMVar
#endif