module Control.LVish.SchedIdempotent
(
LVar(), state, HandlerPool(),
Par(..), ClosedPar(..),
yield, newPool, fork, forkHP,
runPar, runParIO, runParLogged,
withNewPool, withNewPool_,
forkWithExceptions,
quiesce, quiesceAll,
logStrLn, dbgLvl,
newLV, getLV, putLV, putLV_, freezeLV, freezeLVAfter,
addHandler, liftIO, toss
) where
import Control.Monad hiding (sequence, join)
import Control.Concurrent hiding (yield)
import qualified Control.Exception as E
import Control.DeepSeq
import Control.Applicative
import Control.LVish.MonadToss
import Data.IORef
import Data.Atomics
import Data.Typeable
import qualified Data.Concurrent.Counter as C
import qualified Data.Concurrent.Bag as B
import GHC.Conc hiding (yield)
import System.IO
import System.IO.Unsafe (unsafePerformIO)
import System.Environment(getEnvironment)
import System.Mem.StableName (makeStableName, hashStableName)
import Debug.Trace(trace)
import Prelude hiding (mapM, sequence, head, tail)
import System.Random (random)
#ifdef DEBUG_LVAR
import Text.Printf (printf)
#endif
import Data.Traversable
import Control.LVish.Types
import qualified Control.LVish.SchedIdempotentInternal as Sched
globalLog :: IORef [String]
globalLog = unsafePerformIO $ newIORef []
logStrLn :: String -> Par ()
logStrLn_ :: String -> IO ()
logLnAt_ :: Int -> String -> IO ()
#ifdef DEBUG_LVAR
#warning "Compiling in LVish DEBUG mode."
logStrLn = liftIO . logStrLn_
logStrLn_ s = logLnAt_ 1 s
logLnAt_ lvl s | dbgLvl >= 5 = putStrLn s
| dbgLvl >= lvl = atomicModifyIORef globalLog $ \ss -> (s:ss, ())
| otherwise = return ()
#else
logStrLn _ = return ()
logStrLn_ _ = return ()
logLnAt_ _ _ = return ()
#endif
printLog :: IO ()
printLog = do
lines <- atomicModifyIORef globalLog $ \ss -> ([], ss)
mapM_ putStrLn $ reverse lines
printLogThread :: IO (IO ())
printLogThread = do
tid <- forkIO $
E.catch loop (\ (e :: E.AsyncException) -> do
printLog
putStrLn " [dbg-log-printer] Shutting down."
)
return (do killThread tid
let wait = do
stat <- threadStatus tid
case stat of
ThreadRunning -> threadDelay 1000 >> wait
_ -> return ()
wait)
where
loop = do
printLog
threadDelay (200 * 1000)
loop
theEnv :: [(String, String)]
theEnv = unsafePerformIO getEnvironment
dbgLvl :: Int
dbgLvl = case lookup "DEBUG" theEnv of
Nothing -> defaultDbg
Just "" -> defaultDbg
Just "0" -> defaultDbg
Just s ->
case reads s of
((n,_):_) -> trace (" [!] Responding to env Var: DEBUG="++show n) n
[] -> error$"Attempt to parse DEBUG env var as Int failed: "++show s
defaultDbg :: Int
defaultDbg = 0
data LVar a d = LVar {
state :: a,
status :: !(IORef (Status d)),
name :: !LVarID
}
type LVarID = IORef ()
newLVID = newIORef ()
noName :: LVarID
noName = unsafePerformIO $ newLVID
data Status d
= Frozen
| Active (B.Bag (Listener d))
data Listener d = Listener {
onUpdate :: d -> B.Token (Listener d) -> SchedState -> IO (),
onFreeze :: B.Token (Listener d) -> SchedState -> IO ()
}
data HandlerPool = HandlerPool {
numHandlers :: C.Counter,
blockedOnQuiesce :: B.Bag ClosedPar
}
newtype Par a = Par {
close :: (a -> ClosedPar) -> ClosedPar
}
newtype ClosedPar = ClosedPar {
exec :: SchedState -> IO ()
}
type SchedState = Sched.State ClosedPar LVarID
instance Functor Par where
fmap f m = Par $ \k -> close m (k . f)
instance Monad Par where
return a = Par $ \k -> k a
m >>= c = Par $ \k -> close m $ \a -> close (c a) k
instance Applicative Par where
(<*>) = ap
pure = return
mkPar :: ((a -> ClosedPar) -> SchedState -> IO ()) -> Par a
mkPar f = Par $ \k -> ClosedPar $ \q -> f k q
whenJust :: Maybe a -> (a -> IO ()) -> IO ()
whenJust Nothing _ = return ()
whenJust (Just a) f = f a
isFrozen :: LVar a d -> IO Bool
isFrozen (LVar {status}) = do
curStatus <- readIORef status
case curStatus of
Active _ -> return False
Frozen -> return True
newLV :: IO a -> Par (LVar a d)
newLV init = mkPar $ \k q -> do
state <- init
listeners <- B.new
status <- newIORef $ Active listeners
name <- newLVID
exec (k $ LVar {state, status, name}) q
getLV :: (LVar a d)
-> (a -> Bool -> IO (Maybe b))
-> (d -> IO (Maybe b))
-> Par b
getLV lv@(LVar {state, status}) globalThresh deltaThresh = mkPar $ \k q -> do
curStatus <- readIORef status
case curStatus of
Frozen -> do
tripped <- globalThresh state True
case tripped of
Just b -> exec (k b) q
Nothing -> sched q
Active listeners -> do
tripped <- globalThresh state False
case tripped of
Just b -> exec (k b) q
Nothing -> do
#if GET_ONCE
execFlag <- newIORef False
#endif
let onUpdate d = unblockWhen $ deltaThresh d
onFreeze = unblockWhen $ globalThresh state True
unblockWhen thresh tok q = do
tripped <- thresh
whenJust tripped $ \b -> do
B.remove tok
#if GET_ONCE
ticket <- readForCAS execFlag
unless (peekTicket ticket) $ do
(winner, _) <- casIORef execFlag ticket True
when winner $ Sched.pushWork q (k b)
#else
Sched.pushWork q (k b)
#endif
tok <- B.put listeners $ Listener onUpdate onFreeze
frozen <- isFrozen lv
tripped' <- globalThresh state frozen
case tripped' of
Just b -> do
B.remove tok
exec (k b) q
Nothing -> sched q
putLV_ :: LVar a d
-> (a -> Par (Maybe d, b))
-> Par b
putLV_ LVar {state, status, name} doPut = mkPar $ \k q -> do
Sched.setStatus q name
let cont (delta, ret) = ClosedPar $ \q -> do
curStatus <- readIORef status
Sched.setStatus q noName
whenJust delta $ \d -> do
case curStatus of
Frozen -> E.throw$ PutAfterFreezeExn "Attempt to change a frozen LVar"
Active listeners ->
B.foreach listeners $ \(Listener onUpdate _) tok -> onUpdate d tok q
exec (k ret) q
exec (close (doPut state) cont) q
putLV :: LVar a d
-> (a -> IO (Maybe d))
-> Par ()
putLV lv doPut = putLV_ lv doPut'
where doPut' a = do r <- liftIO (doPut a); return (r, ())
freezeLV :: LVar a d -> Par ()
freezeLV LVar {name, status} = mkPar $ \k q -> do
oldStatus <- atomicModifyIORef status $ \s -> (Frozen, s)
case oldStatus of
Frozen -> return ()
Active listeners -> do
Sched.await q (name /=)
B.foreach listeners $ \Listener {onFreeze} tok -> onFreeze tok q
exec (k ()) q
newPool :: Par HandlerPool
newPool = mkPar $ \k q -> do
cnt <- C.new
bag <- B.new
let hp = HandlerPool cnt bag
hpMsg " [dbg-lvish] Created new pool" hp
exec (k hp) q
withNewPool :: (HandlerPool -> Par a) -> Par (a, HandlerPool)
withNewPool f = do
hp <- newPool
a <- f hp
return (a, hp)
withNewPool_ :: (HandlerPool -> Par ()) -> Par HandlerPool
withNewPool_ f = do
hp <- newPool
f hp
return hp
data DecStatus = HasDec | HasNotDec
closeInPool :: Maybe HandlerPool -> Par () -> IO ClosedPar
closeInPool Nothing c = return $ close c $ const (ClosedPar sched)
closeInPool (Just hp) c = do
decRef <- newIORef HasNotDec
let cnt = numHandlers hp
tryDecRef = do
ticket <- readForCAS decRef
case peekTicket ticket of
HasDec -> return False
HasNotDec -> do
(firstToDec, _) <- casIORef decRef ticket HasDec
return firstToDec
onFinishHandler _ = ClosedPar $ \q -> do
shouldDec <- tryDecRef
when shouldDec $ do
C.dec cnt
quiescent <- C.poll cnt
when quiescent $ do
hpMsg " [dbg-lvish] -> Quiescent now.. waking conts" hp
let invoke t tok = do
B.remove tok
Sched.pushWork q t
B.foreach (blockedOnQuiesce hp) invoke
sched q
C.inc $ numHandlers hp
return $ close c onFinishHandler
addHandler :: Maybe HandlerPool
-> LVar a d
-> (a -> IO (Maybe (Par ())))
-> (d -> IO (Maybe (Par ())))
-> Par ()
addHandler hp LVar {state, status} globalThresh updateThresh =
let spawnWhen thresh q = do
tripped <- thresh
whenJust tripped $ \cb -> do
closed <- closeInPool hp cb
Sched.pushWork q closed
onUpdate d _ q = spawnWhen (updateThresh d) q
onFreeze _ _ = return ()
in mkPar $ \k q -> do
curStatus <- readIORef status
case curStatus of
Active listeners ->
do B.put listeners $ Listener onUpdate onFreeze; return ()
Frozen -> return ()
spawnWhen (globalThresh state) q
exec (k ()) q
quiesce :: HandlerPool -> Par ()
quiesce hp@(HandlerPool cnt bag) = mkPar $ \k q -> do
hpMsg " [dbg-lvish] Begin quiescing pool, identity= " hp
tok <- B.put bag (k ())
quiescent <- C.poll cnt
if quiescent then do
B.remove tok
hpMsg " [dbg-lvish] -> Quiescent already!" hp
exec (k ()) q
else do
hpMsg " [dbg-lvish] -> Not quiescent yet, back to sched" hp
sched q
quiesceAll :: Par ()
quiesceAll = mkPar $ \k q -> do
sched q
logStrLn_ " [dbg-lvish] Return from global barrier."
exec (k ()) q
freezeLVAfter :: LVar a d
-> (a -> IO (Maybe (Par ())))
-> (d -> IO (Maybe (Par ())))
-> Par ()
freezeLVAfter lv globalCB updateCB = do
let globalCB' = globalCB
updateCB' = updateCB
hp <- newPool
addHandler (Just hp) lv globalCB' updateCB'
quiesce hp
freezeLV lv
forkHP :: Maybe HandlerPool -> Par () -> Par ()
forkHP mh child = mkPar $ \k q -> do
closed <- closeInPool mh child
Sched.pushWork q (k ())
exec closed q
fork :: Par () -> Par ()
fork f = forkHP Nothing f
liftIO :: IO a -> Par a
liftIO io = mkPar $ \k q -> do
r <- io
exec (k r) q
instance MonadToss Par where
toss = mkPar $ \k q -> do
g <- readIORef $ Sched.prng q
let (b, g' ) = random g
writeIORef (Sched.prng q) g'
exec (k b) q
yield :: Par ()
yield = mkPar $ \k q -> do
Sched.yieldWork q (k ())
sched q
sched :: SchedState -> IO ()
sched q = do
n <- Sched.next q
case n of
Just t -> exec t q
Nothing -> return ()
instance NFData (LVar a d) where
rnf _ = ()
runPar_internal :: Par a -> IO a
runPar_internal c = do
closeLogger <- if dbgLvl >= 1
then printLogThread
else return (return ())
res <- runPar_internal2 c
closeLogger
hFlush stdout
return res
runPar_internal2 :: Par a -> IO a
runPar_internal2 c = do
queues <- Sched.new numCapabilities noName
main_cpu <- Sched.currentCPU
answerMV <- newEmptyMVar
#if 1
wrkrtids <- newIORef []
let forkit = forM_ (zip [0..] queues) $ \(cpu, q) -> do
tid <- forkWithExceptions (forkOn cpu) "worker thread" $
if cpu == main_cpu
then let k x = ClosedPar $ \q -> do
sched q
putMVar answerMV x
in exec (close c k) q
else sched q
atomicModifyIORef_ wrkrtids (tid:)
logStrLn_ " [dbg-lvish] About to fork workers..."
ans <- E.catch (forkit >> takeMVar answerMV)
(\ (e :: E.SomeException) -> do
tids <- readIORef wrkrtids
logStrLn_$ " [dbg-lvish] Killing off workers due to exception: "++show tids
mapM_ killThread tids
mytid <- myThreadId
when (dbgLvl >= 1) printLog
E.throw$ LVarSpecificExn ("EXCEPTION in runPar("++show mytid++"): "++show e)
)
logStrLn_ " [dbg-lvish] parent thread escaped unscathed"
return ans
#else
let runWorker (cpu, q) = do
if (cpu /= main_cpu)
then sched q
else let k x = ClosedPar $ \q -> do
sched q
putMVar answerMV x
in exec (close c k) q
let loop [] asyncs = mapM_ wait asyncs
loop ((cpu,q):tl) asyncs =
withAsyncOn cpu (runWorker (cpu,q))
(\a -> loop tl (a:asyncs))
mapConcurrently runWorker (zip [0..] queues)
takeMVar answerMV
#endif
runPar :: Par a -> a
runPar = unsafePerformIO . runPar_internal
runParIO :: Par a -> IO a
runParIO = runPar_internal
runParLogged :: Par a -> IO ([String],a)
runParLogged c =
do res <- runPar_internal2 c
lines <- atomicModifyIORef globalLog $ \ss -> ([], ss)
return (reverse lines, res)
atomicModifyIORef_ :: IORef a -> (a -> a) -> IO ()
atomicModifyIORef_ ref fn = atomicModifyIORef ref (\ x -> (fn x,()))
unsafeName :: a -> Int
unsafeName x = unsafePerformIO $ do
sn <- makeStableName x
return (hashStableName sn)
hpMsg msg hp =
when (dbgLvl >= 3) $ do
s <- hpId_ hp
logLnAt_ 3 $ msg++", pool identity= " ++s
hpId hp = unsafePerformIO (hpId_ hp)
hpId_ (HandlerPool cnt bag) = do
sn1 <- makeStableName cnt
sn2 <- makeStableName bag
c <- readIORef cnt
return $ show (hashStableName sn1) ++"/"++ show (hashStableName sn2) ++
" transient cnt "++show c
forkWithExceptions :: (IO () -> IO ThreadId) -> String -> IO () -> IO ThreadId
forkWithExceptions forkit descr action = do
parent <- myThreadId
forkit $ do
tid <- myThreadId
E.catch action
(\ e ->
case E.fromException e of
Just E.ThreadKilled -> do
#ifdef DEBUG_LVAR
printf "\nThreadKilled exception inside child thread, %s (not propagating!): %s\n" (show tid) (show descr)
#endif
return ()
_ -> do
#ifdef DEBUG_LVAR
printf "\nException inside child thread %s, %s: %s\n" (show descr) (show tid) (show e)
#endif
E.throwTo parent (e :: E.SomeException)
)