--------------------------------------------------------------------------------
-- Logstash client for Haskell                                                --
--------------------------------------------------------------------------------
-- This source code is licensed under the MIT license found in the LICENSE    --
-- file in the root directory of this source tree.                            --
--------------------------------------------------------------------------------

-- | A simple Logstash client.
module Logstash (
    module Logstash.Connection,
    
    -- * Running Logstash actions
    LogstashContext(..),
    runLogstashConn,
    runLogstashPool,

    LogstashQueueCfg(..),
    defaultLogstashQueueCfg,
    withLogstashQueue,

    -- * Codecs
    stash, 
    stashJsonLine
) where 

--------------------------------------------------------------------------------

import Control.Concurrent
import Control.Concurrent.Async
import Control.Concurrent.STM
import Control.Concurrent.STM.TBMQueue
import Control.Concurrent.Timeout
import Control.Exception
import Control.Monad.Catch (MonadMask)
import Control.Monad.Reader
import Control.Monad.Trans.Control
import Control.Retry

import Data.Aeson
import Data.Acquire
import qualified Data.ByteString.Lazy as BSL
import Data.Either (isRight)
import Data.Pool

import UnliftIO (MonadUnliftIO(..))

import Logstash.Connection

--------------------------------------------------------------------------------

-- | `stash` @timeout message@ is a computation which sends @message@ to 
-- the Logstash server. 
stash 
    :: MonadIO m 
    => BSL.ByteString 
    -> ReaderT LogstashConnection m ()
stash :: ByteString -> ReaderT LogstashConnection m ()
stash ByteString
msg = ReaderT LogstashConnection m LogstashConnection
forall r (m :: * -> *). MonadReader r m => m r
ask ReaderT LogstashConnection m LogstashConnection
-> (LogstashConnection -> ReaderT LogstashConnection m ())
-> ReaderT LogstashConnection m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \LogstashConnection
con -> IO () -> ReaderT LogstashConnection m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ReaderT LogstashConnection m ())
-> IO () -> ReaderT LogstashConnection m ()
forall a b. (a -> b) -> a -> b
$ LogstashConnection -> ByteString -> IO ()
writeData LogstashConnection
con ByteString
msg

-- | `stashJsonLine` @timeout document@ is a computation which serialises
-- @document@ and sends it to the Logstash server. This function is intended
-- to be used with the `json_lines` codec.
stashJsonLine 
    :: (MonadIO m, ToJSON a) 
    => a 
    -> ReaderT LogstashConnection m ()
stashJsonLine :: a -> ReaderT LogstashConnection m ()
stashJsonLine = ByteString -> ReaderT LogstashConnection m ()
forall (m :: * -> *).
MonadIO m =>
ByteString -> ReaderT LogstashConnection m ()
stash (ByteString -> ReaderT LogstashConnection m ())
-> (a -> ByteString) -> a -> ReaderT LogstashConnection m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
"\n") (ByteString -> ByteString) -> (a -> ByteString) -> a -> ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> ByteString
forall a. ToJSON a => a -> ByteString
encode

--------------------------------------------------------------------------------

-- | A type of exceptions that can occur related to Logstash connections.
data LogstashException = LogstashTimeout
    deriving (LogstashException -> LogstashException -> Bool
(LogstashException -> LogstashException -> Bool)
-> (LogstashException -> LogstashException -> Bool)
-> Eq LogstashException
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: LogstashException -> LogstashException -> Bool
$c/= :: LogstashException -> LogstashException -> Bool
== :: LogstashException -> LogstashException -> Bool
$c== :: LogstashException -> LogstashException -> Bool
Eq, Int -> LogstashException -> ShowS
[LogstashException] -> ShowS
LogstashException -> String
(Int -> LogstashException -> ShowS)
-> (LogstashException -> String)
-> ([LogstashException] -> ShowS)
-> Show LogstashException
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [LogstashException] -> ShowS
$cshowList :: [LogstashException] -> ShowS
show :: LogstashException -> String
$cshow :: LogstashException -> String
showsPrec :: Int -> LogstashException -> ShowS
$cshowsPrec :: Int -> LogstashException -> ShowS
Show) 

instance Exception LogstashException where 
    displayException :: LogstashException -> String
displayException LogstashException
_ = String
"Writing to Logstash timed out."

-- | A type class of types that provide Logstash connections.
class LogstashContext ctx where 
    runLogstash 
        :: (MonadMask m, MonadUnliftIO m) 
        => ctx 
        -> RetryPolicyM m
        -> Integer
        -> (RetryStatus -> ReaderT LogstashConnection m a) 
        -> m a

instance LogstashContext (Acquire LogstashConnection) where 
    runLogstash :: Acquire LogstashConnection
-> RetryPolicyM m
-> Integer
-> (RetryStatus -> ReaderT LogstashConnection m a)
-> m a
runLogstash = Acquire LogstashConnection
-> RetryPolicyM m
-> Integer
-> (RetryStatus -> ReaderT LogstashConnection m a)
-> m a
forall (m :: * -> *) a.
(MonadMask m, MonadUnliftIO m) =>
Acquire LogstashConnection
-> RetryPolicyM m
-> Integer
-> (RetryStatus -> ReaderT LogstashConnection m a)
-> m a
runLogstashConn

instance LogstashContext LogstashPool where
    runLogstash :: LogstashPool
-> RetryPolicyM m
-> Integer
-> (RetryStatus -> ReaderT LogstashConnection m a)
-> m a
runLogstash = LogstashPool
-> RetryPolicyM m
-> Integer
-> (RetryStatus -> ReaderT LogstashConnection m a)
-> m a
forall (m :: * -> *) a.
(MonadMask m, MonadUnliftIO m) =>
LogstashPool
-> RetryPolicyM m
-> Integer
-> (RetryStatus -> ReaderT LogstashConnection m a)
-> m a
runLogstashPool

-- | `runLogstashConn` @connectionAcquire computation@ runs @computation@ using
-- a Logstash connection produced by @connectionAcquire@.
runLogstashConn 
    :: (MonadMask m, MonadUnliftIO m)
    => Acquire LogstashConnection
    -> RetryPolicyM m
    -> Integer 
    -> (RetryStatus -> ReaderT LogstashConnection m a) 
    -> m a 
runLogstashConn :: Acquire LogstashConnection
-> RetryPolicyM m
-> Integer
-> (RetryStatus -> ReaderT LogstashConnection m a)
-> m a
runLogstashConn Acquire LogstashConnection
acq RetryPolicyM m
policy Integer
time RetryStatus -> ReaderT LogstashConnection m a
action = 
    RetryPolicyM m -> (RetryStatus -> m a) -> m a
forall (m :: * -> *) a.
(MonadIO m, MonadMask m) =>
RetryPolicyM m -> (RetryStatus -> m a) -> m a
recoverAll RetryPolicyM m
policy ((RetryStatus -> m a) -> m a) -> (RetryStatus -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \RetryStatus
s -> 
    ((forall a. m a -> IO a) -> IO a) -> m a
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO (((forall a. m a -> IO a) -> IO a) -> m a)
-> ((forall a. m a -> IO a) -> IO a) -> m a
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
runInIO -> do 
    -- run the Logstash action with the specified timeout
    Maybe a
mr <- Integer -> IO a -> IO (Maybe a)
forall α. Integer -> IO α -> IO (Maybe α)
timeout Integer
time (IO a -> IO (Maybe a)) -> IO a -> IO (Maybe a)
forall a b. (a -> b) -> a -> b
$ m a -> IO a
forall a. m a -> IO a
runInIO (m a -> IO a) -> m a -> IO a
forall a b. (a -> b) -> a -> b
$ Acquire LogstashConnection -> (LogstashConnection -> m a) -> m a
forall (m :: * -> *) a b.
MonadUnliftIO m =>
Acquire a -> (a -> m b) -> m b
withAcquire Acquire LogstashConnection
acq ((LogstashConnection -> m a) -> m a)
-> (LogstashConnection -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ ReaderT LogstashConnection m a -> LogstashConnection -> m a
forall r (m :: * -> *) a. ReaderT r m a -> r -> m a
runReaderT (RetryStatus -> ReaderT LogstashConnection m a
action RetryStatus
s)
    
    -- raise an exception if a timeout occurred
    IO a -> (a -> IO a) -> Maybe a -> IO a
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (LogstashException -> IO a
forall a e. Exception e => e -> a
throw LogstashException
LogstashTimeout) a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe a
mr

-- | `runLogstashPool` @pool computation@ takes a `LogstashConnection` from
-- @pool@ and runs @computation@ with it.
runLogstashPool 
    :: (MonadMask m, MonadUnliftIO m) 
    => LogstashPool 
    -> RetryPolicyM m
    -> Integer
    -> (RetryStatus -> ReaderT LogstashConnection m a)
    -> m a
runLogstashPool :: LogstashPool
-> RetryPolicyM m
-> Integer
-> (RetryStatus -> ReaderT LogstashConnection m a)
-> m a
runLogstashPool LogstashPool
pool RetryPolicyM m
policy Integer
time RetryStatus -> ReaderT LogstashConnection m a
action =
    RetryPolicyM m -> (RetryStatus -> m a) -> m a
forall (m :: * -> *) a.
(MonadIO m, MonadMask m) =>
RetryPolicyM m -> (RetryStatus -> m a) -> m a
recoverAll RetryPolicyM m
policy ((RetryStatus -> m a) -> m a) -> (RetryStatus -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \RetryStatus
s -> 
    ((forall a. m a -> IO a) -> IO a) -> m a
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO (((forall a. m a -> IO a) -> IO a) -> m a)
-> ((forall a. m a -> IO a) -> IO a) -> m a
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
runInIO -> 
    ((forall a. IO a -> IO a) -> IO a) -> IO a
forall b. ((forall a. IO a -> IO a) -> IO b) -> IO b
mask (((forall a. IO a -> IO a) -> IO a) -> IO a)
-> ((forall a. IO a -> IO a) -> IO a) -> IO a
forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
restore -> do
        -- acquire a connection from the resource pool
        (LogstashConnection
resource, LocalPool LogstashConnection
local) <- LogstashPool
-> IO (LogstashConnection, LocalPool LogstashConnection)
forall a. Pool a -> IO (a, LocalPool a)
takeResource LogstashPool
pool

        Maybe a
mr <- IO (Maybe a) -> IO (Maybe a)
forall a. IO a -> IO a
restore (Integer -> IO a -> IO (Maybe a)
forall α. Integer -> IO α -> IO (Maybe α)
timeout Integer
time (IO a -> IO (Maybe a)) -> IO a -> IO (Maybe a)
forall a b. (a -> b) -> a -> b
$ m a -> IO a
forall a. m a -> IO a
runInIO (m a -> IO a) -> m a -> IO a
forall a b. (a -> b) -> a -> b
$ ReaderT LogstashConnection m a -> LogstashConnection -> m a
forall r (m :: * -> *) a. ReaderT r m a -> r -> m a
runReaderT (RetryStatus -> ReaderT LogstashConnection m a
action RetryStatus
s) LogstashConnection
resource) 
            IO (Maybe a) -> IO () -> IO (Maybe a)
forall a b. IO a -> IO b -> IO a
`onException` LogstashPool
-> LocalPool LogstashConnection -> LogstashConnection -> IO ()
forall a. Pool a -> LocalPool a -> a -> IO ()
destroyResource LogstashPool
pool LocalPool LogstashConnection
local LogstashConnection
resource

        -- return the resource to the pool
        LocalPool LogstashConnection -> LogstashConnection -> IO ()
forall a. LocalPool a -> a -> IO ()
putResource LocalPool LogstashConnection
local LogstashConnection
resource

        -- raise an exception if a timeout occurred
        IO a -> (a -> IO a) -> Maybe a -> IO a
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (LogstashException -> IO a
forall a e. Exception e => e -> a
throw LogstashException
LogstashTimeout) a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe a
mr

--------------------------------------------------------------------------------

-- | Configurations for `withLogstashQueue` which control the general Logstash
-- configuration as well as the size of the bounded queue and the number of
-- worker threads.
data LogstashQueueCfg ctx = MkLogstashQueueCfg {
    -- | The connection context for the worker threads.
    LogstashQueueCfg ctx -> ctx
logstashQueueContext :: ctx,
    -- | The size of the queue.
    LogstashQueueCfg ctx -> Int
logstashQueueSize :: Int,
    -- | The number of workers. This must be at least 1.
    LogstashQueueCfg ctx -> Int
logstashQueueWorkers :: Int,
    -- | The retry policy.
    LogstashQueueCfg ctx -> RetryPolicyM IO
logstashQueueRetryPolicy :: RetryPolicyM IO,
    -- | The timeout for each attempt at sending data to the Logstash server.
    LogstashQueueCfg ctx -> Integer
logstashQueueTimeout :: Integer
}

-- | `defaultLogstashQueueCfg` @ctx@ constructs a `LogstashQueueCfg` with
-- some default values for the Logstash context given by @ctx@:
-- 
-- - A queue size limited to 1000 entries
-- - Two worker threads
-- - 25ms exponential backoff with a maximum of five retries as retry policy
-- - 1s timeout for each logging attempt
--
-- You may wish to customise these settings to suit your needs.
defaultLogstashQueueCfg :: LogstashContext ctx => ctx -> LogstashQueueCfg ctx
defaultLogstashQueueCfg :: ctx -> LogstashQueueCfg ctx
defaultLogstashQueueCfg ctx
ctx = MkLogstashQueueCfg :: forall ctx.
ctx
-> Int -> Int -> RetryPolicyM IO -> Integer -> LogstashQueueCfg ctx
MkLogstashQueueCfg{
    logstashQueueContext :: ctx
logstashQueueContext = ctx
ctx,
    logstashQueueSize :: Int
logstashQueueSize = Int
1000,
    logstashQueueWorkers :: Int
logstashQueueWorkers = Int
2,
    logstashQueueRetryPolicy :: RetryPolicyM IO
logstashQueueRetryPolicy = Int -> RetryPolicy
exponentialBackoff Int
25000 RetryPolicyM IO -> RetryPolicyM IO -> RetryPolicyM IO
forall a. Semigroup a => a -> a -> a
<> Int -> RetryPolicy
limitRetries Int
5,
    logstashQueueTimeout :: Integer
logstashQueueTimeout = Integer
1000000
}

-- | `withLogstashQueue` @cfg codec exceptionHandlers action@ initialises a
-- queue with space for a finite number of log messages given by @cfg@ to allow 
-- for log messages to be sent to Logstash asynchronously. This addresses the 
-- following issues with synchronous logging:
--
-- - Since writing log messages to Logstash involves network I/O, this may 
--   be slower than queueing up messages in memory and therefore synchronously 
--   sending messages may delay the computation that originated the log 
--   message.
-- - With a finite number of Logstash connections, synchronous logging may also
--   get blocked until a connection is available.
--
-- The queue is read from by a configurable number of worker threads which 
-- use Logstash connections from a `LogstashContext`. The queue is given
-- to @action@ as an argument. The @retryPolicy@ and @timeout@ parameters
-- serve the same purpose as for `runLogstash`. We recommend that, if the
-- `LogstashContext` is a `LogstashPool`, it should contain at least as many 
-- connections as the number of works to avoid contention between worker 
-- threads.
--
-- @codec@ is the handler for how messages should be sent to the Logstash 
-- server, this is typically a codec like `stashJsonLine`. The `RetryStatus`
-- is provided as an additional argument to @codec@ in case a handler wishes
-- to inspect this.
withLogstashQueue 
    :: (LogstashContext ctx, MonadUnliftIO m)
    => LogstashQueueCfg ctx
    -> (RetryStatus -> item -> ReaderT LogstashConnection IO ())
    -> [item -> Handler ()]
    -> (TBMQueue item -> m a)
    -> m a
withLogstashQueue :: LogstashQueueCfg ctx
-> (RetryStatus -> item -> ReaderT LogstashConnection IO ())
-> [item -> Handler ()]
-> (TBMQueue item -> m a)
-> m a
withLogstashQueue MkLogstashQueueCfg{ctx
Int
Integer
RetryPolicyM IO
logstashQueueTimeout :: Integer
logstashQueueRetryPolicy :: RetryPolicyM IO
logstashQueueWorkers :: Int
logstashQueueSize :: Int
logstashQueueContext :: ctx
logstashQueueTimeout :: forall ctx. LogstashQueueCfg ctx -> Integer
logstashQueueRetryPolicy :: forall ctx. LogstashQueueCfg ctx -> RetryPolicyM IO
logstashQueueWorkers :: forall ctx. LogstashQueueCfg ctx -> Int
logstashQueueSize :: forall ctx. LogstashQueueCfg ctx -> Int
logstashQueueContext :: forall ctx. LogstashQueueCfg ctx -> ctx
..} RetryStatus -> item -> ReaderT LogstashConnection IO ()
dispatch [item -> Handler ()]
handlers TBMQueue item -> m a
action = 
    ((forall a. m a -> IO a) -> IO a) -> m a
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO (((forall a. m a -> IO a) -> IO a) -> m a)
-> ((forall a. m a -> IO a) -> IO a) -> m a
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
runInIO -> do
        -- initialise a bounded queue with the specified size
        TBMQueue item
queue <- Int -> IO (TBMQueue item)
forall a. Int -> IO (TBMQueue a)
newTBMQueueIO Int
logstashQueueSize

        let worker :: IO ()
worker = do 
                -- [blocking] read the next item from the queue
                Maybe item
mitem <- STM (Maybe item) -> IO (Maybe item)
forall a. STM a -> IO a
atomically (STM (Maybe item) -> IO (Maybe item))
-> STM (Maybe item) -> IO (Maybe item)
forall a b. (a -> b) -> a -> b
$ TBMQueue item -> STM (Maybe item)
forall a. TBMQueue a -> STM (Maybe a)
readTBMQueue TBMQueue item
queue 

                -- the item will be Nothing if the queue is empty and has
                -- been closed; otherwise we have an item that should be
                -- dispatched to Logstash
                case Maybe item
mitem of 
                    Maybe item
Nothing -> () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
                    Just item
item -> do 
                        -- dispatch the item using the policies from the
                        -- configuration and the provided dispatch handler
                        -- this may fail if the retry policy is exhausted,
                        -- in which case we use the provided exception
                        -- handlers to try and catch the exception
                        ctx
-> RetryPolicyM IO
-> Integer
-> (RetryStatus -> ReaderT LogstashConnection IO ())
-> IO ()
forall ctx (m :: * -> *) a.
(LogstashContext ctx, MonadMask m, MonadUnliftIO m) =>
ctx
-> RetryPolicyM m
-> Integer
-> (RetryStatus -> ReaderT LogstashConnection m a)
-> m a
runLogstash ctx
logstashQueueContext 
                                    RetryPolicyM IO
logstashQueueRetryPolicy 
                                    Integer
logstashQueueTimeout 
                                    (\RetryStatus
status -> RetryStatus -> item -> ReaderT LogstashConnection IO ()
dispatch RetryStatus
status item
item)
                            IO () -> [Handler ()] -> IO ()
forall a. IO a -> [Handler a] -> IO a
`catches` (((item -> Handler ()) -> Handler ())
-> [item -> Handler ()] -> [Handler ()]
forall a b. (a -> b) -> [a] -> [b]
map ((item -> Handler ()) -> item -> Handler ()
forall a b. (a -> b) -> a -> b
$ item
item) [item -> Handler ()]
handlers)
                        
                        -- loop
                        IO ()
worker

        -- initialise the requested number of worker threads
        [Async ()]
workers <- Int -> IO (Async ()) -> IO [Async ()]
forall (m :: * -> *) a. Applicative m => Int -> m a -> m [a]
replicateM Int
logstashQueueWorkers (IO (Async ()) -> IO [Async ()]) -> IO (Async ()) -> IO [Async ()]
forall a b. (a -> b) -> a -> b
$ IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async IO ()
worker

        -- run the main computation in the current thread with its original
        -- monad stack and give it the the queue to write log messages to
        m a -> IO a
forall a. m a -> IO a
runInIO (TBMQueue item -> m a
action TBMQueue item
queue) IO a -> IO () -> IO a
forall a b. IO a -> IO b -> IO a
`finally` do 
            -- close the queue to allow the worker threads to gracefully 
            -- shut down
            STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TBMQueue item -> STM ()
forall a. TBMQueue a -> STM ()
closeTBMQueue TBMQueue item
queue

            -- wait for the worker threads to terminate; we use `waitCatch` 
            -- here instead of `wait` because `waitCatch` does not raise any 
            -- exceptions that may occur in the worker threads in this thread
            (Async () -> IO (Either SomeException ())) -> [Async ()] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ Async () -> IO (Either SomeException ())
forall a. Async a -> IO (Either SomeException a)
waitCatch [Async ()]
workers

--------------------------------------------------------------------------------