{-# LANGUAGE CPP #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE NamedFieldPuns, BangPatterns #-}

{-|

Thread-safe Logging with bonus controlled-schedule debugging capabilities.

This module supports logging to memory, serializing messages and deferring the work
of actually printing them.  Another thread can flush the logged messages at its
leisure.

The second capability of this infrastructure is to use the debugging print messages
as points at which to gate the execution of the program.  That is, each `logStrLn_`
call becomes a place where the program blocks and checks in with a central
coordinator, which only allows one thread to unblock at a time.  Thus, if there are
sufficient debug logging messages in the program, this can enable a form of
deterministic replay (and quickcheck-style testing of different interleavings).

 -}

module Control.LVish.Logging
       (

         -- * Global variables
         dbgLvl, 

         -- * New logger interface
         newLogger, logOn, Logger(closeIt, flushLogs),
         WaitMode(..), LogMsg(..), mapMsg, OutDest(..),

         -- * General utilities
         forkWithExceptions,

         Backoff(totalWait), newBackoff, backoff
       )
       where

import           Control.Monad
import qualified Control.Exception as E
import qualified Control.Concurrent.Async as A
import           Data.IORef
import qualified Data.Sequence as Seq
import           Data.List (sortBy)
import           GHC.Conc hiding (yield)
import           Control.Concurrent
import           System.IO.Unsafe (unsafePerformIO)
import           System.IO (stderr, stdout, hFlush, hPutStrLn, Handle)
import           System.Environment(getEnvironment)
import           System.Random
import           Text.Printf (printf, hPrintf)
import           Debug.Trace (trace, traceEventIO)

import Control.LVish.Types
-- import qualified Control.LVish.SchedIdempotentInternal as Sched

----------------------------------------------------------------------------------------------------

-- | A Logger coordinates a set of threads that print debug logging messages.
--
--   This are abstract objects supporting only the operations provided by this module
--   and the non-hidden fields of the Logger.
data Logger = Logger { coordinator :: A.Async () -- ThreadId
                                      -- ^ (private) The thread that chooses which action to unblock next
                                      -- and handles printing to the screen as well.
                     , minLvl :: Int  -- ^ The minimum level of messages accepted by this logger (usually 0).
                     , maxLvl :: Int  -- ^ The maximum level of messages accepted by this logger.
                     , checkPoint :: SmplChan Writer -- ^ The serialized queue of writers attempting to log dbg messages.
                     , closeIt :: IO () -- ^ (public) A method to complete flushing, close down the helper thread,
                                        -- and generally wrap up.
                     , loutDests :: [OutDest] -- ^ Where to send output.  If empty, messages dropped entirely.
                     , logged   :: IORef [String] -- ^ (private) In-memory buffer of messages, if OutputInMemory is selected.
                                                  -- This is stored in reverse-temporal order during execution.
                     , flushLogs :: IO [String] -- ^ Clear buffered log messages and return in the order they occurred.
                     , waitWorkers :: WaitMode
                     }


-- | A single thread attempting to log a message.  It only unblocks when the attached
-- MVar is filled.
data Writer = Writer { who :: String
                     , continue :: MVar ()
                     , msg :: LogMsg
                       -- TODO: Indicate whether this writer has useful work to do or
                       -- is about to block... this provides a simple notion of
                       -- priority.
                     }

-- | Several different ways we know to wait for quiescence in the concurrent mutator
-- before proceeding.
data WaitMode = WaitDynamic -- ^ UNFINISHED: Dynamically track tasks/workers.  The
                            -- num workers starts at 1 and then is modified
                            -- with `incrTasks` and `decrTasks`.
              | WaitNum {
                numThreads  :: Int,   -- ^ How many threads total must check in?
                downThreads :: IO Int -- ^ Poll how many threads won't participate this round.
                                      --   After all productive threads have checked in 
                                      --   this number must grow to eventually include all other threads.
                } -- ^ A fixed set of threads must check-in each round before proceeding.
              | DontWait -- ^ In this mode, logging calls are non-blocking and return
                         -- immediately, rather than waiting on a central coordinator.
                         -- This is what we want if we're simply printing debugging output,
                         -- not controlling the schedule for stress testing.
  deriving Show

instance Show (IO Bool) where
  show _ = "<IO Bool>"
  
instance Show (IO Int) where
  show _ = "<IO Int>"

-- | We allow logging in O(1) time in String or ByteString format.  In practice the
-- distinction is not that important, because only *thunks* should be logged; the
-- thread printing the logs should deal with forcing those thunks.
data LogMsg = StrMsg { lvl::Int, body::String }
            | OffTheRecord { lvl :: Int, obod :: String }
                -- ^ This sort of message is chatter and NOT meant 
                --   to participate in the scheduler-testing framework.
--          | ByteStrMsg { lvl::Int,  }
  deriving (Show,Eq,Ord,Read)

mapMsg :: (String -> String) -> LogMsg -> LogMsg
mapMsg f (StrMsg l s)       = StrMsg       l (f s)
mapMsg f (OffTheRecord l s) = OffTheRecord l (f s)

toString :: LogMsg -> String
toString x = case x of 
               StrMsg {body} -> body
               OffTheRecord _ s -> s

maxWait :: Int
maxWait = 10*1000 -- 10ms

andM :: [IO Bool] -> IO a -> IO a -> IO a
andM [] t _f = t
andM (hd:tl) t f = do
  b <- hd
  if b then andM tl t f
       else f

catchAll :: ThreadId -> E.SomeException -> IO ()
catchAll parent exn =
  case E.fromException exn of 
    Just E.ThreadKilled -> return ()
    _ -> do
     hPutStrLn stderr ("! Exception on Logger thread: "++show exn)
     hFlush stderr
     E.throwTo parent exn
     E.throwIO exn

--------------------------------------------------------------------------------

-- | Create a new logger, which includes forking a coordinator thread.
--   Takes as argument the number of worker threads participating in the computation.
newLogger :: (Int,Int) -- ^ What inclusive range of messages do we accept?  Defaults to `(0,dbgLvl)`.
          -> [OutDest]
          -> WaitMode
          -> IO Logger
newLogger (minLvl, maxLvl) loutDests waitWorkers = do
  logged      <- newIORef []  
  checkPoint  <- newSmplChan
  parent      <- myThreadId
  let flushLogs = atomicModifyIORef' logged $ \ ls -> ([],reverse ls)

  shutdownFlag     <- newIORef False -- When true, time to start shutdown.
  
  -- Here's the new thread that corresponds to this logger:
  coordinator <- A.async $ E.handle (catchAll parent) $ do
                   runCoordinator waitWorkers shutdownFlag checkPoint logged loutDests
  let closeIt = do
        atomicModifyIORef' shutdownFlag (\_ -> (True,())) -- Declare that it's time to shutdown:
        A.wait coordinator -- Gently wait for it to be done.
  return $! Logger { coordinator, checkPoint, closeIt, loutDests,
                     logged, flushLogs,
                     waitWorkers, minLvl, maxLvl }

--------------------------------------------------------------------------------

-- | Run a logging coordinator thread until completion/shutdown.
runCoordinator :: WaitMode -> IORef Bool -> IORef (Seq.Seq Writer) -> IORef [String] -> [OutDest] -> IO ()
runCoordinator waitWorkers shutdownFlag checkPoint logged loutDests = 
       case waitWorkers of
         DontWait -> printLoop =<< newBackoff maxWait
         _ -> schedloop (0::Int) [] =<< newBackoff maxWait -- Kick things off.
  where
          -- Proceed in rounds, gather the set of actions that may happen in parallel, then
          -- pick one.  We log the series of decisions we make for reproducability.
          schedloop :: Int 
                    -> [Writer]   -- ^ Waiting threads, reverse chronological (newest first)
                    -> Backoff -> IO ()
          schedloop !iters !waiting !bkoff = do
            when (iters > 0 && iters `mod` 500 == 0) $
              putStrLn $ "Warning: logger has spun for "++show iters++" iterations, "++show (length waiting)++" are waiting."
            hFlush stdout
            fl <- readIORef shutdownFlag
            if fl then flushLoop
             else do 
              let keepWaiting w = do b <- backoff bkoff
                                     schedloop (iters+1) w b
              case waitWorkers of
                DontWait -> error "newLogger: internal invariant broken."
                WaitNum target extra -> do
                  waiting2 <- flushChan waiting
                  let numWait = length waiting2
                  n <- extra -- Atomically check how many extra workers are blocked.
                  -- putStrLn $ "TEMP: schedloop/WaitNum: polled for waiting/extra workers: "
                  --            ++show (numWait,n)++" target "++show target
                  if (numWait + n >= target)
                    then if numWait > 0 
                         then pickAndProceed waiting2
                         else keepWaiting waiting2 -- This sounds like a shutdown is happening, all are idle.
                    else keepWaiting waiting2 -- We don't know if we're waiting for idles to arrive or blocked waiters.

          -- | Keep printing messages until there is (transiently) nothing left.
          flushLoop = do 
              x <- tryReadSmplChan checkPoint
              case x of
                Just wr -> do printAll (formatMessage "" wr)
                              flushLoop
                Nothing -> return ()

          flushChan !acc = do
            x <- tryReadSmplChan checkPoint
            case x of
              Just h  -> case msg h of 
                          StrMsg {}       -> flushChan (h:acc)
                          OffTheRecord {} -> do printAll (formatMessage "" h) 
                                                flushChan acc
              Nothing -> return acc

          -- | A simpler alternative schedloop that only does printing (e.g. for DontWait mode).
          printLoop bk = do
            fl <- readIORef shutdownFlag
            if fl then flushLoop
                  else do mwr <- tryReadSmplChan checkPoint
                          case mwr of 
                            Nothing -> do printLoop =<< backoff bk 
                            Just wr -> do printAll (formatMessage "" wr)
                                          printLoop =<< newBackoff (cap bk)

          -- Take the set of logically-in-parallel tasks, choose one, execute it, and
          -- then return to the main scheduler loop.
          pickAndProceed [] = error "pickAndProceed: this should only be called on a non-empty list"
          pickAndProceed waiting = do
            let order a b =
                  let s1 = toString (msg a)
                      s2 = toString (msg b) in
                  case compare s1 s2 of
                    GT -> GT
                    LT -> LT
                    EQ -> error $" [Logger] Need in-parallel log messages to have an ordering, got two equal:\n "++s1
                sorted = sortBy order waiting
                len = length waiting
            -- For now let's randomly pick an action:
            pos <- randomRIO (0,len-1)
            let pick = sorted !! pos
                (pref,suf) = splitAt pos sorted
                rst = pref ++ tail suf
            -- putStrLn$ "TEMP: pickAndProceed, unblocking "++show (pos,len,msg pick)
            unblockTask pos len pick -- The task will asynchronously run when it can.
            yield -- If running on one thread, give it a chance to run.
            -- Return to the scheduler to wait for the next quiescent point:
            bnew <- newBackoff maxWait
            schedloop 0 rst bnew

          unblockTask pos len wr@Writer{continue} = do
            printAll (messageInContext pos len wr)
            putMVar continue () -- Signal that the thread may continue.

          -- This is the format we use for debugging messages
          formatMessage extra Writer{msg} = "|"++show (lvl msg)++ "| "++extra++ toString msg
          -- One of these message reports how many tasks are in parallel with it:
          messageInContext pos len wr = formatMessage ("#"++show (1+pos)++" of "++show len ++": ") wr
          printOne str (OutputTo h)   = hPrintf h "%s\n" str
          printOne str OutputEvents = traceEventIO str
          printOne str OutputInMemory =
            -- This needs to be atomic because other messages might be calling "flush"
            -- at the same time.
            atomicModifyIORef' logged $ \ ls -> (str:ls,())
          printAll str = mapM_ (printOne str) loutDests




chatter :: String -> IO ()
-- chatter = hPrintf stderr
-- chatter = printf "%s\n"
chatter _ = return ()

printNTrace s = do putStrLn s; traceEventIO s; hFlush stdout

-- UNFINISHED:
incrTasks = undefined
decrTasks = undefined

-- | Write a log message from the current thread, IF the level of the
-- message falls into the range accepted by the given `Logger`,
-- otherwise, the message is ignored.
logOn :: Logger -> LogMsg -> IO ()
logOn Logger{checkPoint,minLvl,maxLvl,waitWorkers} msg = do   
  
  if (minLvl <= lvl msg) && (lvl msg <= maxLvl) then do     
    -- putStrLn$ "TEMP: "++show (minLvl,maxLvl)++" attempt to log msg: "++show msg
    case waitWorkers of
      -- In this mode we are non-blocking:
      DontWait -> writeSmplChan checkPoint Writer{who="",continue=dummyMVar,msg}
      _ -> do continue <- newEmptyMVar
              writeSmplChan checkPoint Writer{who="",continue,msg}
              takeMVar continue -- Block until we're given permission to proceed.
   else return ()

{-# NOINLINE dummyMVar #-}
dummyMVar :: MVar ()
dummyMVar = unsafePerformIO newEmptyMVar

----------------------------------------------------------------------------------------------------
-- Simple back-off strategy.

-- | The state for an exponential backoff.
data Backoff = Backoff { current :: !Int
                       , cap :: !Int  -- ^ Maximum nanoseconds to wait.
                       , totalWait :: !Int
                       }
  deriving Show

-- | Create an object used for exponentential backoff; see `backoff`.
newBackoff :: Int -- ^ Maximum delay, nanoseconds
           -> IO Backoff
newBackoff cap = return Backoff{cap,current=0,totalWait=0}

-- | Perform the backoff, possibly delaying the thread.
backoff :: Backoff -> IO Backoff
backoff Backoff{current,cap,totalWait} = do
  if current < 1 then 
    -- Yield before we start delaying:
    do yield
       return Backoff{cap,current=current+1,totalWait}
   else
    do let nxt = min cap (2*current)
       threadDelay current
       return Backoff{cap,current=nxt,totalWait=totalWait+current}

----------------------------------------------------------------------------------------------------
-- Simple channels: we need non-blocking reads so we can't use
-- Control.Concurrent.Chan.  We could use TChan, but I don't want to bring STM into
-- it right now.

-- type MyChan a = Chan a

-- -- | A simple channel.  Take-before-put is the protocol.
-- type SmplChan a = MVar [a]

-- | Simple channels that don't support real blocking.
type SmplChan a = IORef (Seq.Seq a) -- New elements pushed on right.

newSmplChan :: IO (SmplChan a)
newSmplChan = newIORef Seq.empty

-- | Non-blocking read.
tryReadSmplChan :: SmplChan a -> IO (Maybe a)
tryReadSmplChan ch = do
  x <- atomicModifyIORef' ch $ \ sq -> 
       case Seq.viewl sq of
         Seq.EmptyL -> (Seq.empty, Nothing)
         h Seq.:< t -> (t, Just h)
  return x

-- | A synchronous read that must block or busy-wait until a value is available.
readSmplChan :: SmplChan a -> IO a
readSmplChan ch = loop =<< newBackoff maxWait
 where
   loop bk = do
     x <- tryReadSmplChan ch
     case x of
       Nothing -> do b2 <- backoff bk
                     loop b2
       Just h  -> return h

-- | Always succeeds.  Asynchronous write to channel.
writeSmplChan :: SmplChan a -> a -> IO ()
writeSmplChan ch x = do
  atomicModifyIORef' ch $ \ s -> (s Seq.|> x,())

----------------------------------------------------------------------------------------------------

{-# NOINLINE theEnv #-}
theEnv :: [(String, String)]
theEnv = unsafePerformIO getEnvironment

-- | Debugging flag shared by several modules.
--   This is activated by setting the environment variable @DEBUG=1..5@.
-- 
--   By convention @DEBUG=100@ turns on full sequentialization of the program and
--   control over the interleavings in concurrent code, enabling systematic debugging
--   of concurrency problems.
dbgLvl :: Int
#ifdef DEBUG_LVAR
{-# NOINLINE dbgLvl #-}
dbgLvl = case lookup "DEBUG" theEnv of
       Nothing  -> defaultDbg
       Just ""  -> defaultDbg
       Just "0" -> defaultDbg
       Just s   ->
         case reads s of
           ((n,_):_) -> trace (" [!] LVish responding to env Var: DEBUG="++show n) n
           [] -> error$"Attempt to parse DEBUG env var as Int failed: "++show s
#else 
{-# INLINE dbgLvl #-}
dbgLvl = 0
#endif

defaultDbg :: Int
defaultDbg = 0

replayDbg :: Int
replayDbg = 100


-- | Exceptions that walk up the fork-tree of threads.
--   
--   WARNING: By holding onto the ThreadId we keep the parent thread from being
--   garbage collected (at least as of GHC 7.6).  This means that even if it was
--   complete, it will still be hanging around to accept the exception below.
forkWithExceptions :: (IO () -> IO ThreadId) -> String -> IO () -> IO ThreadId
forkWithExceptions forkit descr action = do 
   parent <- myThreadId
   forkit $ do
      tid <- myThreadId
      E.catch action 
	 (\ e -> 
           case E.fromException e of 
             Just E.ThreadKilled -> do
-- Killing worker threads is normal now when exception handling, so this chatter is restricted to debug mode:
#ifdef DEBUG_LVAR
               printf "\nThreadKilled exception inside child thread, %s (not propagating!): %s\n" (show tid) (show descr)
#endif
               return ()
	     _  -> do
#ifdef DEBUG_LVAR               
                      printf "\nException inside child thread %s, %s: %s\n" (show descr) (show tid) (show e)
#endif
                      E.throwTo parent (e :: E.SomeException)
	 )