module Control.LVish.Logging
(
dbgLvl,
newLogger, logOn, Logger(closeIt, flushLogs),
WaitMode(..), LogMsg(..), mapMsg, OutDest(..),
forkWithExceptions,
Backoff(totalWait), newBackoff, backoff
)
where
import Control.Monad
import qualified Control.Exception as E
import qualified Control.Concurrent.Async as A
import Data.IORef
import qualified Data.Sequence as Seq
import Data.List (sortBy)
import GHC.Conc hiding (yield)
import Control.Concurrent
import System.IO.Unsafe (unsafePerformIO)
import System.IO (stderr, stdout, hFlush, hPutStrLn, Handle)
import System.Environment(getEnvironment)
import System.Random
import Text.Printf (printf, hPrintf)
import Debug.Trace (trace, traceEventIO)
import Control.LVish.Types
data Logger = Logger { coordinator :: A.Async ()
, minLvl :: Int
, maxLvl :: Int
, checkPoint :: SmplChan Writer
, closeIt :: IO ()
, loutDests :: [OutDest]
, logged :: IORef [String]
, flushLogs :: IO [String]
, waitWorkers :: WaitMode
}
data Writer = Writer { who :: String
, continue :: MVar ()
, msg :: LogMsg
}
data WaitMode = WaitDynamic
| WaitNum {
numThreads :: Int,
downThreads :: IO Int
}
| DontWait
deriving Show
instance Show (IO Bool) where
show _ = "<IO Bool>"
instance Show (IO Int) where
show _ = "<IO Int>"
data LogMsg = StrMsg { lvl::Int, body::String }
| OffTheRecord { lvl :: Int, obod :: String }
deriving (Show,Eq,Ord,Read)
mapMsg :: (String -> String) -> LogMsg -> LogMsg
mapMsg f (StrMsg l s) = StrMsg l (f s)
mapMsg f (OffTheRecord l s) = OffTheRecord l (f s)
toString :: LogMsg -> String
toString x = case x of
StrMsg {body} -> body
OffTheRecord _ s -> s
maxWait :: Int
maxWait = 10*1000
andM :: [IO Bool] -> IO a -> IO a -> IO a
andM [] t _f = t
andM (hd:tl) t f = do
b <- hd
if b then andM tl t f
else f
catchAll :: ThreadId -> E.SomeException -> IO ()
catchAll parent exn =
case E.fromException exn of
Just E.ThreadKilled -> return ()
_ -> do
hPutStrLn stderr ("! Exception on Logger thread: "++show exn)
hFlush stderr
E.throwTo parent exn
E.throwIO exn
newLogger :: (Int,Int)
-> [OutDest]
-> WaitMode
-> IO Logger
newLogger (minLvl, maxLvl) loutDests waitWorkers = do
logged <- newIORef []
checkPoint <- newSmplChan
parent <- myThreadId
let flushLogs = atomicModifyIORef' logged $ \ ls -> ([],reverse ls)
shutdownFlag <- newIORef False
coordinator <- A.async $ E.handle (catchAll parent) $ do
runCoordinator waitWorkers shutdownFlag checkPoint logged loutDests
let closeIt = do
atomicModifyIORef' shutdownFlag (\_ -> (True,()))
A.wait coordinator
return $! Logger { coordinator, checkPoint, closeIt, loutDests,
logged, flushLogs,
waitWorkers, minLvl, maxLvl }
runCoordinator :: WaitMode -> IORef Bool -> IORef (Seq.Seq Writer) -> IORef [String] -> [OutDest] -> IO ()
runCoordinator waitWorkers shutdownFlag checkPoint logged loutDests =
case waitWorkers of
DontWait -> printLoop =<< newBackoff maxWait
_ -> schedloop (0::Int) [] =<< newBackoff maxWait
where
schedloop :: Int
-> [Writer]
-> Backoff -> IO ()
schedloop !iters !waiting !bkoff = do
when (iters > 0 && iters `mod` 500 == 0) $
putStrLn $ "Warning: logger has spun for "++show iters++" iterations, "++show (length waiting)++" are waiting."
hFlush stdout
fl <- readIORef shutdownFlag
if fl then flushLoop
else do
let keepWaiting w = do b <- backoff bkoff
schedloop (iters+1) w b
case waitWorkers of
DontWait -> error "newLogger: internal invariant broken."
WaitNum target extra -> do
waiting2 <- flushChan waiting
let numWait = length waiting2
n <- extra
if (numWait + n >= target)
then if numWait > 0
then pickAndProceed waiting2
else keepWaiting waiting2
else keepWaiting waiting2
flushLoop = do
x <- tryReadSmplChan checkPoint
case x of
Just wr -> do printAll (formatMessage "" wr)
flushLoop
Nothing -> return ()
flushChan !acc = do
x <- tryReadSmplChan checkPoint
case x of
Just h -> case msg h of
StrMsg {} -> flushChan (h:acc)
OffTheRecord {} -> do printAll (formatMessage "" h)
flushChan acc
Nothing -> return acc
printLoop bk = do
fl <- readIORef shutdownFlag
if fl then flushLoop
else do mwr <- tryReadSmplChan checkPoint
case mwr of
Nothing -> do printLoop =<< backoff bk
Just wr -> do printAll (formatMessage "" wr)
printLoop =<< newBackoff (cap bk)
pickAndProceed [] = error "pickAndProceed: this should only be called on a non-empty list"
pickAndProceed waiting = do
let order a b =
let s1 = toString (msg a)
s2 = toString (msg b) in
case compare s1 s2 of
GT -> GT
LT -> LT
EQ -> error $" [Logger] Need in-parallel log messages to have an ordering, got two equal:\n "++s1
sorted = sortBy order waiting
len = length waiting
pos <- randomRIO (0,len1)
let pick = sorted !! pos
(pref,suf) = splitAt pos sorted
rst = pref ++ tail suf
unblockTask pos len pick
yield
bnew <- newBackoff maxWait
schedloop 0 rst bnew
unblockTask pos len wr@Writer{continue} = do
printAll (messageInContext pos len wr)
putMVar continue ()
formatMessage extra Writer{msg} = "|"++show (lvl msg)++ "| "++extra++ toString msg
messageInContext pos len wr = formatMessage ("#"++show (1+pos)++" of "++show len ++": ") wr
printOne str (OutputTo h) = hPrintf h "%s\n" str
printOne str OutputEvents = traceEventIO str
printOne str OutputInMemory =
atomicModifyIORef' logged $ \ ls -> (str:ls,())
printAll str = mapM_ (printOne str) loutDests
chatter :: String -> IO ()
chatter _ = return ()
printNTrace s = do putStrLn s; traceEventIO s; hFlush stdout
incrTasks = undefined
decrTasks = undefined
logOn :: Logger -> LogMsg -> IO ()
logOn Logger{checkPoint,minLvl,maxLvl,waitWorkers} msg = do
if (minLvl <= lvl msg) && (lvl msg <= maxLvl) then do
case waitWorkers of
DontWait -> writeSmplChan checkPoint Writer{who="",continue=dummyMVar,msg}
_ -> do continue <- newEmptyMVar
writeSmplChan checkPoint Writer{who="",continue,msg}
takeMVar continue
else return ()
dummyMVar :: MVar ()
dummyMVar = unsafePerformIO newEmptyMVar
data Backoff = Backoff { current :: !Int
, cap :: !Int
, totalWait :: !Int
}
deriving Show
newBackoff :: Int
-> IO Backoff
newBackoff cap = return Backoff{cap,current=0,totalWait=0}
backoff :: Backoff -> IO Backoff
backoff Backoff{current,cap,totalWait} = do
if current < 1 then
do yield
return Backoff{cap,current=current+1,totalWait}
else
do let nxt = min cap (2*current)
threadDelay current
return Backoff{cap,current=nxt,totalWait=totalWait+current}
type SmplChan a = IORef (Seq.Seq a)
newSmplChan :: IO (SmplChan a)
newSmplChan = newIORef Seq.empty
tryReadSmplChan :: SmplChan a -> IO (Maybe a)
tryReadSmplChan ch = do
x <- atomicModifyIORef' ch $ \ sq ->
case Seq.viewl sq of
Seq.EmptyL -> (Seq.empty, Nothing)
h Seq.:< t -> (t, Just h)
return x
readSmplChan :: SmplChan a -> IO a
readSmplChan ch = loop =<< newBackoff maxWait
where
loop bk = do
x <- tryReadSmplChan ch
case x of
Nothing -> do b2 <- backoff bk
loop b2
Just h -> return h
writeSmplChan :: SmplChan a -> a -> IO ()
writeSmplChan ch x = do
atomicModifyIORef' ch $ \ s -> (s Seq.|> x,())
theEnv :: [(String, String)]
theEnv = unsafePerformIO getEnvironment
dbgLvl :: Int
#ifdef DEBUG_LVAR
dbgLvl = case lookup "DEBUG" theEnv of
Nothing -> defaultDbg
Just "" -> defaultDbg
Just "0" -> defaultDbg
Just s ->
case reads s of
((n,_):_) -> trace (" [!] LVish responding to env Var: DEBUG="++show n) n
[] -> error$"Attempt to parse DEBUG env var as Int failed: "++show s
#else
dbgLvl = 0
#endif
defaultDbg :: Int
defaultDbg = 0
replayDbg :: Int
replayDbg = 100
forkWithExceptions :: (IO () -> IO ThreadId) -> String -> IO () -> IO ThreadId
forkWithExceptions forkit descr action = do
parent <- myThreadId
forkit $ do
tid <- myThreadId
E.catch action
(\ e ->
case E.fromException e of
Just E.ThreadKilled -> do
#ifdef DEBUG_LVAR
printf "\nThreadKilled exception inside child thread, %s (not propagating!): %s\n" (show tid) (show descr)
#endif
return ()
_ -> do
#ifdef DEBUG_LVAR
printf "\nException inside child thread %s, %s: %s\n" (show descr) (show tid) (show e)
#endif
E.throwTo parent (e :: E.SomeException)
)