-- | The 'Logger' type of logging back-ends. module Log.Logger ( Logger , mkLogger , mkBulkLogger , execLogger , waitForLogger , shutdownLogger ) where import Control.Applicative import Control.Concurrent import Control.Concurrent.STM import Control.Exception import Control.Monad import Data.Semigroup import Prelude import qualified Data.Text as T import qualified Data.Text.IO as T import Log.Data import Log.Internal.Logger -- | Start a logger thread that consumes one queued message at a time. mkLogger :: T.Text -> (LogMessage -> IO ()) -> IO Logger mkLogger name exec = mkLoggerImpl newTQueueIO isEmptyTQueue readTQueue writeTQueue (return ()) name exec (return ()) -- | Start an asynchronous logger thread that consumes all queued -- messages once per second. Uses a bounded queue internally to avoid -- space leaks. To make sure that the messages get written out in the -- presence of exceptions, use high-level wrappers like 'withLogger', -- 'Log.Backend.ElasticSearch.withElasticSearchLogger' or -- 'Log.Backend.StandardOutput.Bulk.withBulkStdOutLogger' instead of -- this function directly. -- -- Note: some messages can be lost when the main thread shuts down -- without making sure that all logger threads have written out all -- messages, because in that case child threads are not given a chance -- to clean up by the RTS. This is apparently a feature: -- -- -- To work around this issue, make sure that the main thread doesn't -- exit until all its children have terminated. The 'async' package -- makes this easy. -- -- Problematic example: -- -- @ -- import Control.Concurrent.Async -- -- main :: IO () -- main = do -- logger \<- 'Log.Backend.ElasticSearch.elasticSearchLogger' -- a \<- 'Control.Concurrent.Async.async' ('Log.Backend.ElasticSearch.withElasticSearchLogger' $ \\logger -> -- 'Log.Monad.runLogT' "main" logger $ 'Log.Class.logTrace_' "foo") -- -- Main thread exits without waiting for the child -- -- to finish and without giving the child a chance -- -- to do proper cleanup. -- @ -- -- Fixed example: -- -- @ -- import Control.Concurrent.Async -- -- main :: IO () -- main = do -- logger \<- 'Log.Backend.ElasticSearch.elasticSearchLogger' -- a \<- 'Control.Concurrent.Async.async' ('Log.Backend.ElasticSearch.withElasticSearchLogger' $ \\logger -> -- 'Log.Monad.runLogT' "main" logger $ 'Log.Class.logTrace_' "foo") -- 'Control.Concurrent.Async.wait' a -- -- Main thread waits for the child to finish, giving -- -- it a chance to shut down properly. This works even -- -- in the presence of exceptions in the child thread. -- @ mkBulkLogger :: T.Text -> ([LogMessage] -> IO ()) -> IO () -> IO Logger mkBulkLogger = mkLoggerImpl (newSBQueueIO sbDefaultCapacity) isEmptySBQueue readSBQueue writeSBQueue (threadDelay 1000000) ---------------------------------------- -- | A simple STM based bounded queue. data SBQueue a = SBQueue !(TVar [a]) !(TVar Int) !Int -- | Default capacity of a 'SBQueue'. This corresponds to -- approximately 200 MiB memory residency when the queue is full. sbDefaultCapacity :: Int sbDefaultCapacity = 1000000 -- | Create an instance of 'SBQueue' with a given capacity. newSBQueueIO :: Int -> IO (SBQueue a) newSBQueueIO capacity = SBQueue <$> newTVarIO [] <*> newTVarIO 0 <*> pure capacity -- | Check if an 'SBQueue' is empty. isEmptySBQueue :: SBQueue a -> STM Bool isEmptySBQueue (SBQueue queue count _capacity) = do isEmpty <- null <$> readTVar queue numElems <- readTVar count assert (if isEmpty then numElems == 0 else numElems > 0) $ return isEmpty -- | Read all the values stored in an 'SBQueue'. readSBQueue :: SBQueue a -> STM [a] readSBQueue (SBQueue queue count _capacity) = do elems <- readTVar queue when (null elems) retry writeTVar queue [] writeTVar count 0 return $ reverse elems -- | Write a value to an 'SBQueue'. writeSBQueue :: SBQueue a -> a -> STM () writeSBQueue (SBQueue queue count capacity) a = do numElems <- readTVar count if numElems < capacity then do modifyTVar queue (a :) modifyTVar count (+1) else return () ---------------------------------------- mkLoggerImpl :: IO queue -> (queue -> STM Bool) -> (queue -> STM msgs) -> (queue -> LogMessage -> STM ()) -> IO () -> T.Text -> (msgs -> IO ()) -> IO () -> IO Logger mkLoggerImpl newQueue isQueueEmpty readQueue writeQueue afterExecDo name exec sync = do queue <- newQueue inProgress <- newTVarIO False isRunning <- newTVarIO True tid <- forkFinally (forever $ loop queue inProgress) (\_ -> cleanup queue inProgress) return Logger { loggerWriteMessage = \msg -> atomically $ do checkIsRunning isRunning writeQueue queue msg, loggerWaitForWrite = do atomically $ waitForWrite queue inProgress sync, loggerShutdown = do killThread tid atomically $ writeTVar isRunning False } where checkIsRunning isRunning' = do isRunning <- readTVar isRunning' when (not isRunning) $ throwSTM (AssertionFailed $ "Log.Logger.mkLoggerImpl: " ++ "attempt to write to a shut down logger") loop queue inProgress = do step queue inProgress afterExecDo step queue inProgress = do msgs <- atomically $ do writeTVar inProgress True readQueue queue exec msgs atomically $ writeTVar inProgress False cleanup queue inProgress = do step queue inProgress sync -- Don't call afterExecDo, since it's either a no-op or a -- threadDelay. printLoggerTerminated waitForWrite queue inProgress = do isEmpty <- isQueueEmpty queue isInProgress <- readTVar inProgress when (not isEmpty || isInProgress) retry printLoggerTerminated = T.putStrLn $ name <> ": logger thread terminated"