-- | The 'Logger' type of logging back-ends.
module Log.Logger
  ( LoggerEnv(..)
  , Logger
  , mkLogger
  , mkLogger'
  , mkBulkLogger
  , mkBulkLogger'
  , execLogger
  , waitForLogger
  , shutdownLogger
  ) where

import Control.Concurrent
import Control.Concurrent.STM
import Control.Exception
import Control.Monad
import qualified Data.Aeson.Types as A
import qualified Data.Text as T
import qualified Data.Text.IO as T

import Log.Data
import Log.Internal.Logger

-- | The state that every 'LogT' carries around.
data LoggerEnv = LoggerEnv
  { LoggerEnv -> Logger
leLogger    :: !Logger   -- ^ The 'Logger' to use.
  , LoggerEnv -> Text
leComponent :: !T.Text   -- ^ Current application component.
  , LoggerEnv -> [Text]
leDomain    :: ![T.Text] -- ^ Current application domain.
  , LoggerEnv -> [Pair]
leData      :: ![A.Pair] -- ^ Additional data to be merged with the log
                             -- message\'s data.
  , LoggerEnv -> LogLevel
leMaxLogLevel :: LogLevel -- ^ The maximum log level allowed to be logged.
  }

-- | Start a logger thread that consumes one queued message at a time.
--
-- /Note:/ a bounded queue of size 1000000 is used internally to avoid
-- unrestricted memory consumption.
mkLogger :: T.Text -> (LogMessage -> IO ()) -> IO Logger
mkLogger :: Text -> (LogMessage -> IO ()) -> IO Logger
mkLogger = Int -> Text -> (LogMessage -> IO ()) -> IO Logger
mkLogger' Int
defaultQueueCapacity

-- | Like 'mkBulkLogger', but with configurable queue size.
--
-- @since 0.9.0.0
mkLogger' :: Int -> T.Text -> (LogMessage -> IO ()) -> IO Logger
mkLogger' :: Int -> Text -> (LogMessage -> IO ()) -> IO Logger
mkLogger' Int
cap Text
name LogMessage -> IO ()
exec = forall queue msgs.
IO queue
-> (queue -> STM Bool)
-> (queue -> STM msgs)
-> (queue -> LogMessage -> STM ())
-> IO ()
-> Text
-> (msgs -> IO ())
-> IO ()
-> IO Logger
mkLoggerImpl
  (forall a. Natural -> IO (TBQueue a)
newTBQueueIO forall a b. (a -> b) -> a -> b
$ forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
cap) forall a. TBQueue a -> STM Bool
isEmptyTBQueue forall a. TBQueue a -> STM a
readTBQueue forall a. TBQueue a -> a -> STM ()
writeTBQueue
  (forall (m :: * -> *) a. Monad m => a -> m a
return ()) Text
name LogMessage -> IO ()
exec (forall (m :: * -> *) a. Monad m => a -> m a
return ())

-- | Start an asynchronous logger thread that consumes all queued messages once
-- per second.
--
-- /Note:/ a bounded queue of size 1000000 is used internally to avoid
-- unrestricted memory consumption.
--
-- To make sure that the messages get written out in the presence of exceptions,
-- use high-level wrappers like 'withLogger',
-- 'Log.Backend.ElasticSearch.withElasticSearchLogger' or
-- 'Log.Backend.StandardOutput.Bulk.withBulkStdOutLogger' instead of this
-- function directly.
--
-- /Note:/ some messages can be lost when the main thread shuts down
-- without making sure that all logger threads have written out all
-- messages, because in that case child threads are not given a chance
-- to clean up by the RTS. This is apparently a feature:
-- <https://mail.haskell.org/pipermail/haskell-cafe/2014-February/112754.html>
--
-- To work around this issue, make sure that the main thread doesn't
-- exit until all its children have terminated. The 'async' package
-- makes this easy.
--
-- Problematic example:
--
-- @
-- import Control.Concurrent.Async
--
-- main :: IO ()
-- main = do
--    logger \<- 'Log.Backend.ElasticSearch.elasticSearchLogger'
--    a \<- 'Control.Concurrent.Async.async' ('Log.Backend.ElasticSearch.withElasticSearchLogger' $ \\logger ->
--                'Log.Monad.runLogT' "main" logger defaultLogLevel $ 'Log.Class.logTrace_' "foo")
--    -- Main thread exits without waiting for the child
--    -- to finish and without giving the child a chance
--    -- to do proper cleanup.
-- @
--
-- Fixed example:
--
-- @
-- import Control.Concurrent.Async
--
-- main :: IO ()
-- main = do
--    logger \<- 'Log.Backend.ElasticSearch.elasticSearchLogger'
--    a \<- 'Control.Concurrent.Async.async' ('Log.Backend.ElasticSearch.withElasticSearchLogger' $ \\logger ->
--                'Log.Monad.runLogT' "main" logger defaultLogLevel $ 'Log.Class.logTrace_' "foo")
--    'Control.Concurrent.Async.wait' a
--    -- Main thread waits for the child to finish, giving
--    -- it a chance to shut down properly. This works even
--    -- in the presence of exceptions in the child thread.
-- @
mkBulkLogger :: T.Text -> ([LogMessage] -> IO ()) -> IO () -> IO Logger
mkBulkLogger :: Text -> ([LogMessage] -> IO ()) -> IO () -> IO Logger
mkBulkLogger = Int -> Int -> Text -> ([LogMessage] -> IO ()) -> IO () -> IO Logger
mkBulkLogger' Int
defaultQueueCapacity Int
1000000

-- | Like 'mkBulkLogger', but with configurable queue size and thread delay.
--
-- @since 0.7.4.0
mkBulkLogger'
    :: Int                      -- ^ queue capacity (default 1000000)
    -> Int                      -- ^ thread delay (microseconds, default 1000000)
    -> T.Text                   -- ^ logger name
    -> ([LogMessage] -> IO ())  -- ^ write
    -> IO ()                    -- ^ flush
    -> IO Logger
mkBulkLogger' :: Int -> Int -> Text -> ([LogMessage] -> IO ()) -> IO () -> IO Logger
mkBulkLogger' Int
cap Int
dur = forall queue msgs.
IO queue
-> (queue -> STM Bool)
-> (queue -> STM msgs)
-> (queue -> LogMessage -> STM ())
-> IO ()
-> Text
-> (msgs -> IO ())
-> IO ()
-> IO Logger
mkLoggerImpl
  (forall a. Int -> IO (SBQueue a)
newSBQueueIO Int
cap) forall a. SBQueue a -> STM Bool
isEmptySBQueue forall a. SBQueue a -> STM [a]
readSBQueue forall a. SBQueue a -> a -> STM ()
writeSBQueue
  (Int -> IO ()
threadDelay Int
dur)

----------------------------------------

-- | Default capacity of log queues (TBQueue for regular logger, 'SBQueue' for
-- bulk loggers). This corresponds to approximately 200 MiB memory residency
-- when the queue is full.
defaultQueueCapacity :: Int
defaultQueueCapacity :: Int
defaultQueueCapacity = Int
1000000

-- | A simple STM based bounded queue.
data SBQueue a = SBQueue !(TVar [a]) !(TVar Int) !Int

-- | Create an instance of 'SBQueue' with a given capacity.
newSBQueueIO :: Int -> IO (SBQueue a)
newSBQueueIO :: forall a. Int -> IO (SBQueue a)
newSBQueueIO Int
capacity = forall a. TVar [a] -> TVar Int -> Int -> SBQueue a
SBQueue forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. a -> IO (TVar a)
newTVarIO [] forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> forall a. a -> IO (TVar a)
newTVarIO Int
0 forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> forall (f :: * -> *) a. Applicative f => a -> f a
pure Int
capacity

-- | Check if an 'SBQueue' is empty.
isEmptySBQueue :: SBQueue a -> STM Bool
isEmptySBQueue :: forall a. SBQueue a -> STM Bool
isEmptySBQueue (SBQueue TVar [a]
queue TVar Int
count Int
_capacity) = do
  Bool
isEmpty  <- forall (t :: * -> *) a. Foldable t => t a -> Bool
null forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. TVar a -> STM a
readTVar TVar [a]
queue
  Int
numElems <- forall a. TVar a -> STM a
readTVar TVar Int
count
  forall a. (?callStack::CallStack) => Bool -> a -> a
assert (if Bool
isEmpty then Int
numElems forall a. Eq a => a -> a -> Bool
== Int
0 else Int
numElems forall a. Ord a => a -> a -> Bool
> Int
0) forall a b. (a -> b) -> a -> b
$
    forall (m :: * -> *) a. Monad m => a -> m a
return Bool
isEmpty

-- | Read all the values stored in an 'SBQueue'.
readSBQueue :: SBQueue a -> STM [a]
readSBQueue :: forall a. SBQueue a -> STM [a]
readSBQueue (SBQueue TVar [a]
queue TVar Int
count Int
_capacity) = do
  [a]
elems <- forall a. TVar a -> STM a
readTVar TVar [a]
queue
  forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (forall (t :: * -> *) a. Foldable t => t a -> Bool
null [a]
elems) forall a. STM a
retry
  forall a. TVar a -> a -> STM ()
writeTVar TVar [a]
queue []
  forall a. TVar a -> a -> STM ()
writeTVar TVar Int
count Int
0
  forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a. [a] -> [a]
reverse [a]
elems

-- | Write a value to an 'SBQueue'.
writeSBQueue :: SBQueue a -> a -> STM ()
writeSBQueue :: forall a. SBQueue a -> a -> STM ()
writeSBQueue (SBQueue TVar [a]
queue TVar Int
count Int
capacity) a
a = do
  Int
numElems <- forall a. TVar a -> STM a
readTVar TVar Int
count
  if Int
numElems forall a. Ord a => a -> a -> Bool
< Int
capacity
    then do forall a. TVar a -> (a -> a) -> STM ()
modifyTVar TVar [a]
queue (a
a forall a. a -> [a] -> [a]
:)
            -- Strict modification of the queue size to avoid space leak
            forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar Int
count (forall a. Num a => a -> a -> a
+Int
1)
    else forall (m :: * -> *) a. Monad m => a -> m a
return ()

----------------------------------------

mkLoggerImpl :: IO queue
             -> (queue -> STM Bool)
             -> (queue -> STM msgs)
             -> (queue -> LogMessage -> STM ())
             -> IO ()
             -> T.Text
             -> (msgs -> IO ())
             -> IO ()
             -> IO Logger
mkLoggerImpl :: forall queue msgs.
IO queue
-> (queue -> STM Bool)
-> (queue -> STM msgs)
-> (queue -> LogMessage -> STM ())
-> IO ()
-> Text
-> (msgs -> IO ())
-> IO ()
-> IO Logger
mkLoggerImpl IO queue
newQueue queue -> STM Bool
isQueueEmpty queue -> STM msgs
readQueue queue -> LogMessage -> STM ()
writeQueue IO ()
afterExecDo
  Text
name msgs -> IO ()
exec IO ()
sync = do
  queue
queue      <- IO queue
newQueue
  TVar Bool
inProgress <- forall a. a -> IO (TVar a)
newTVarIO Bool
False
  TVar Bool
isRunning  <- forall a. a -> IO (TVar a)
newTVarIO Bool
True
  ThreadId
tid <- forall a. IO a -> (Either SomeException a -> IO ()) -> IO ThreadId
forkFinally (forall (f :: * -> *) a b. Applicative f => f a -> f b
forever forall a b. (a -> b) -> a -> b
$ queue -> TVar Bool -> IO ()
loop queue
queue TVar Bool
inProgress)
                     (\Either SomeException Any
_ -> queue -> TVar Bool -> IO ()
cleanup queue
queue TVar Bool
inProgress)
  forall (m :: * -> *) a. Monad m => a -> m a
return Logger {
    loggerWriteMessage :: LogMessage -> IO ()
loggerWriteMessage = \LogMessage
msg -> forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
        TVar Bool -> STM ()
checkIsRunning TVar Bool
isRunning
        queue -> LogMessage -> STM ()
writeQueue queue
queue LogMessage
msg,
    loggerWaitForWrite :: IO ()
loggerWaitForWrite = do
        forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ queue -> TVar Bool -> STM ()
waitForWrite queue
queue TVar Bool
inProgress
        IO ()
sync,
    loggerShutdown :: IO ()
loggerShutdown     = do
        ThreadId -> IO ()
killThread ThreadId
tid
        forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TVar a -> a -> STM ()
writeTVar TVar Bool
isRunning Bool
False
    }
  where
    checkIsRunning :: TVar Bool -> STM ()
checkIsRunning TVar Bool
isRunning' = do
      Bool
isRunning <- forall a. TVar a -> STM a
readTVar TVar Bool
isRunning'
      forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool -> Bool
not Bool
isRunning) forall a b. (a -> b) -> a -> b
$
        forall e a. Exception e => e -> STM a
throwSTM ([Char] -> AssertionFailed
AssertionFailed forall a b. (a -> b) -> a -> b
$ [Char]
"Log.Logger.mkLoggerImpl: "
                   forall a. [a] -> [a] -> [a]
++ [Char]
"attempt to write to a shut down logger")

    loop :: queue -> TVar Bool -> IO ()
loop queue
queue TVar Bool
inProgress = do
      queue -> TVar Bool -> IO ()
step queue
queue TVar Bool
inProgress
      IO ()
afterExecDo

    step :: queue -> TVar Bool -> IO ()
step queue
queue TVar Bool
inProgress = do
      msgs
msgs <- forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
        forall a. TVar a -> a -> STM ()
writeTVar TVar Bool
inProgress Bool
True
        queue -> STM msgs
readQueue queue
queue
      msgs -> IO ()
exec msgs
msgs
      forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TVar a -> a -> STM ()
writeTVar TVar Bool
inProgress Bool
False

    cleanup :: queue -> TVar Bool -> IO ()
cleanup queue
queue TVar Bool
inProgress = do
      queue -> TVar Bool -> IO ()
step queue
queue TVar Bool
inProgress
      IO ()
sync
      -- Don't call afterExecDo, since it's either a no-op or a
      -- threadDelay.
      IO ()
printLoggerTerminated

    waitForWrite :: queue -> TVar Bool -> STM ()
waitForWrite queue
queue TVar Bool
inProgress = do
      Bool
isEmpty <- queue -> STM Bool
isQueueEmpty queue
queue
      Bool
isInProgress <- forall a. TVar a -> STM a
readTVar TVar Bool
inProgress
      forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool -> Bool
not Bool
isEmpty Bool -> Bool -> Bool
|| Bool
isInProgress) forall a. STM a
retry

    printLoggerTerminated :: IO ()
printLoggerTerminated = Text -> IO ()
T.putStrLn forall a b. (a -> b) -> a -> b
$ Text
name forall a. Semigroup a => a -> a -> a
<> Text
": logger thread terminated"
-- Prevent GHC from inlining this function so its callers are small and
-- considered for inlining instead (as they will be generalized to MonadIO or
-- MonadUnliftIO).
{-# NOINLINE mkLoggerImpl #-}