module Logstash (
module Logstash.Connection,
LogstashContext(..),
runLogstashConn,
runLogstashPool,
LogstashQueueCfg(..),
defaultLogstashQueueCfg,
withLogstashQueue,
stash,
stashJsonLine
) where
import Control.Concurrent
import Control.Concurrent.Async
import Control.Concurrent.STM
import Control.Concurrent.STM.TBMQueue
import Control.Concurrent.Timeout
import Control.Exception
import Control.Monad.Catch (MonadMask)
import Control.Monad.Reader
import Control.Monad.Trans.Control
import Control.Retry
import Data.Aeson
import Data.Acquire
import qualified Data.ByteString.Lazy as BSL
import Data.Either (isRight)
import Data.Pool
import UnliftIO (MonadUnliftIO(..))
import Logstash.Connection
stash
:: MonadIO m
=> BSL.ByteString
-> ReaderT LogstashConnection m ()
stash :: ByteString -> ReaderT LogstashConnection m ()
stash ByteString
msg = ReaderT LogstashConnection m LogstashConnection
forall r (m :: * -> *). MonadReader r m => m r
ask ReaderT LogstashConnection m LogstashConnection
-> (LogstashConnection -> ReaderT LogstashConnection m ())
-> ReaderT LogstashConnection m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \LogstashConnection
con -> IO () -> ReaderT LogstashConnection m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ReaderT LogstashConnection m ())
-> IO () -> ReaderT LogstashConnection m ()
forall a b. (a -> b) -> a -> b
$ LogstashConnection -> ByteString -> IO ()
writeData LogstashConnection
con ByteString
msg
stashJsonLine
:: (MonadIO m, ToJSON a)
=> a
-> ReaderT LogstashConnection m ()
stashJsonLine :: a -> ReaderT LogstashConnection m ()
stashJsonLine = ByteString -> ReaderT LogstashConnection m ()
forall (m :: * -> *).
MonadIO m =>
ByteString -> ReaderT LogstashConnection m ()
stash (ByteString -> ReaderT LogstashConnection m ())
-> (a -> ByteString) -> a -> ReaderT LogstashConnection m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
"\n") (ByteString -> ByteString) -> (a -> ByteString) -> a -> ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> ByteString
forall a. ToJSON a => a -> ByteString
encode
data LogstashException = LogstashTimeout
deriving (LogstashException -> LogstashException -> Bool
(LogstashException -> LogstashException -> Bool)
-> (LogstashException -> LogstashException -> Bool)
-> Eq LogstashException
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: LogstashException -> LogstashException -> Bool
$c/= :: LogstashException -> LogstashException -> Bool
== :: LogstashException -> LogstashException -> Bool
$c== :: LogstashException -> LogstashException -> Bool
Eq, Int -> LogstashException -> ShowS
[LogstashException] -> ShowS
LogstashException -> String
(Int -> LogstashException -> ShowS)
-> (LogstashException -> String)
-> ([LogstashException] -> ShowS)
-> Show LogstashException
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [LogstashException] -> ShowS
$cshowList :: [LogstashException] -> ShowS
show :: LogstashException -> String
$cshow :: LogstashException -> String
showsPrec :: Int -> LogstashException -> ShowS
$cshowsPrec :: Int -> LogstashException -> ShowS
Show)
instance Exception LogstashException where
displayException :: LogstashException -> String
displayException LogstashException
_ = String
"Writing to Logstash timed out."
class LogstashContext ctx where
runLogstash
:: (MonadMask m, MonadUnliftIO m)
=> ctx
-> RetryPolicyM m
-> Integer
-> (RetryStatus -> ReaderT LogstashConnection m a)
-> m a
instance LogstashContext (Acquire LogstashConnection) where
runLogstash :: Acquire LogstashConnection
-> RetryPolicyM m
-> Integer
-> (RetryStatus -> ReaderT LogstashConnection m a)
-> m a
runLogstash = Acquire LogstashConnection
-> RetryPolicyM m
-> Integer
-> (RetryStatus -> ReaderT LogstashConnection m a)
-> m a
forall (m :: * -> *) a.
(MonadMask m, MonadUnliftIO m) =>
Acquire LogstashConnection
-> RetryPolicyM m
-> Integer
-> (RetryStatus -> ReaderT LogstashConnection m a)
-> m a
runLogstashConn
instance LogstashContext LogstashPool where
runLogstash :: LogstashPool
-> RetryPolicyM m
-> Integer
-> (RetryStatus -> ReaderT LogstashConnection m a)
-> m a
runLogstash = LogstashPool
-> RetryPolicyM m
-> Integer
-> (RetryStatus -> ReaderT LogstashConnection m a)
-> m a
forall (m :: * -> *) a.
(MonadMask m, MonadUnliftIO m) =>
LogstashPool
-> RetryPolicyM m
-> Integer
-> (RetryStatus -> ReaderT LogstashConnection m a)
-> m a
runLogstashPool
runLogstashConn
:: (MonadMask m, MonadUnliftIO m)
=> Acquire LogstashConnection
-> RetryPolicyM m
-> Integer
-> (RetryStatus -> ReaderT LogstashConnection m a)
-> m a
runLogstashConn :: Acquire LogstashConnection
-> RetryPolicyM m
-> Integer
-> (RetryStatus -> ReaderT LogstashConnection m a)
-> m a
runLogstashConn Acquire LogstashConnection
acq RetryPolicyM m
policy Integer
time RetryStatus -> ReaderT LogstashConnection m a
action =
RetryPolicyM m -> (RetryStatus -> m a) -> m a
forall (m :: * -> *) a.
(MonadIO m, MonadMask m) =>
RetryPolicyM m -> (RetryStatus -> m a) -> m a
recoverAll RetryPolicyM m
policy ((RetryStatus -> m a) -> m a) -> (RetryStatus -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \RetryStatus
s ->
((forall a. m a -> IO a) -> IO a) -> m a
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO (((forall a. m a -> IO a) -> IO a) -> m a)
-> ((forall a. m a -> IO a) -> IO a) -> m a
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
runInIO -> do
Maybe a
mr <- Integer -> IO a -> IO (Maybe a)
forall α. Integer -> IO α -> IO (Maybe α)
timeout Integer
time (IO a -> IO (Maybe a)) -> IO a -> IO (Maybe a)
forall a b. (a -> b) -> a -> b
$ m a -> IO a
forall a. m a -> IO a
runInIO (m a -> IO a) -> m a -> IO a
forall a b. (a -> b) -> a -> b
$ Acquire LogstashConnection -> (LogstashConnection -> m a) -> m a
forall (m :: * -> *) a b.
MonadUnliftIO m =>
Acquire a -> (a -> m b) -> m b
withAcquire Acquire LogstashConnection
acq ((LogstashConnection -> m a) -> m a)
-> (LogstashConnection -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ ReaderT LogstashConnection m a -> LogstashConnection -> m a
forall r (m :: * -> *) a. ReaderT r m a -> r -> m a
runReaderT (RetryStatus -> ReaderT LogstashConnection m a
action RetryStatus
s)
IO a -> (a -> IO a) -> Maybe a -> IO a
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (LogstashException -> IO a
forall a e. Exception e => e -> a
throw LogstashException
LogstashTimeout) a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe a
mr
runLogstashPool
:: (MonadMask m, MonadUnliftIO m)
=> LogstashPool
-> RetryPolicyM m
-> Integer
-> (RetryStatus -> ReaderT LogstashConnection m a)
-> m a
runLogstashPool :: LogstashPool
-> RetryPolicyM m
-> Integer
-> (RetryStatus -> ReaderT LogstashConnection m a)
-> m a
runLogstashPool LogstashPool
pool RetryPolicyM m
policy Integer
time RetryStatus -> ReaderT LogstashConnection m a
action =
RetryPolicyM m -> (RetryStatus -> m a) -> m a
forall (m :: * -> *) a.
(MonadIO m, MonadMask m) =>
RetryPolicyM m -> (RetryStatus -> m a) -> m a
recoverAll RetryPolicyM m
policy ((RetryStatus -> m a) -> m a) -> (RetryStatus -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \RetryStatus
s ->
((forall a. m a -> IO a) -> IO a) -> m a
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO (((forall a. m a -> IO a) -> IO a) -> m a)
-> ((forall a. m a -> IO a) -> IO a) -> m a
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
runInIO ->
((forall a. IO a -> IO a) -> IO a) -> IO a
forall b. ((forall a. IO a -> IO a) -> IO b) -> IO b
mask (((forall a. IO a -> IO a) -> IO a) -> IO a)
-> ((forall a. IO a -> IO a) -> IO a) -> IO a
forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
restore -> do
(LogstashConnection
resource, LocalPool LogstashConnection
local) <- LogstashPool
-> IO (LogstashConnection, LocalPool LogstashConnection)
forall a. Pool a -> IO (a, LocalPool a)
takeResource LogstashPool
pool
Maybe a
mr <- IO (Maybe a) -> IO (Maybe a)
forall a. IO a -> IO a
restore (Integer -> IO a -> IO (Maybe a)
forall α. Integer -> IO α -> IO (Maybe α)
timeout Integer
time (IO a -> IO (Maybe a)) -> IO a -> IO (Maybe a)
forall a b. (a -> b) -> a -> b
$ m a -> IO a
forall a. m a -> IO a
runInIO (m a -> IO a) -> m a -> IO a
forall a b. (a -> b) -> a -> b
$ ReaderT LogstashConnection m a -> LogstashConnection -> m a
forall r (m :: * -> *) a. ReaderT r m a -> r -> m a
runReaderT (RetryStatus -> ReaderT LogstashConnection m a
action RetryStatus
s) LogstashConnection
resource)
IO (Maybe a) -> IO () -> IO (Maybe a)
forall a b. IO a -> IO b -> IO a
`onException` LogstashPool
-> LocalPool LogstashConnection -> LogstashConnection -> IO ()
forall a. Pool a -> LocalPool a -> a -> IO ()
destroyResource LogstashPool
pool LocalPool LogstashConnection
local LogstashConnection
resource
LocalPool LogstashConnection -> LogstashConnection -> IO ()
forall a. LocalPool a -> a -> IO ()
putResource LocalPool LogstashConnection
local LogstashConnection
resource
IO a -> (a -> IO a) -> Maybe a -> IO a
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (LogstashException -> IO a
forall a e. Exception e => e -> a
throw LogstashException
LogstashTimeout) a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe a
mr
data LogstashQueueCfg ctx = MkLogstashQueueCfg {
LogstashQueueCfg ctx -> ctx
logstashQueueContext :: ctx,
LogstashQueueCfg ctx -> Int
logstashQueueSize :: Int,
LogstashQueueCfg ctx -> Int
logstashQueueWorkers :: Int,
LogstashQueueCfg ctx -> RetryPolicyM IO
logstashQueueRetryPolicy :: RetryPolicyM IO,
LogstashQueueCfg ctx -> Integer
logstashQueueTimeout :: Integer
}
defaultLogstashQueueCfg :: LogstashContext ctx => ctx -> LogstashQueueCfg ctx
defaultLogstashQueueCfg :: ctx -> LogstashQueueCfg ctx
defaultLogstashQueueCfg ctx
ctx = MkLogstashQueueCfg :: forall ctx.
ctx
-> Int -> Int -> RetryPolicyM IO -> Integer -> LogstashQueueCfg ctx
MkLogstashQueueCfg{
logstashQueueContext :: ctx
logstashQueueContext = ctx
ctx,
logstashQueueSize :: Int
logstashQueueSize = Int
1000,
logstashQueueWorkers :: Int
logstashQueueWorkers = Int
2,
logstashQueueRetryPolicy :: RetryPolicyM IO
logstashQueueRetryPolicy = Int -> RetryPolicy
exponentialBackoff Int
25000 RetryPolicyM IO -> RetryPolicyM IO -> RetryPolicyM IO
forall a. Semigroup a => a -> a -> a
<> Int -> RetryPolicy
limitRetries Int
5,
logstashQueueTimeout :: Integer
logstashQueueTimeout = Integer
1000000
}
withLogstashQueue
:: (LogstashContext ctx, MonadUnliftIO m)
=> LogstashQueueCfg ctx
-> (RetryStatus -> item -> ReaderT LogstashConnection IO ())
-> [item -> Handler ()]
-> (TBMQueue item -> m a)
-> m a
withLogstashQueue :: LogstashQueueCfg ctx
-> (RetryStatus -> item -> ReaderT LogstashConnection IO ())
-> [item -> Handler ()]
-> (TBMQueue item -> m a)
-> m a
withLogstashQueue MkLogstashQueueCfg{ctx
Int
Integer
RetryPolicyM IO
logstashQueueTimeout :: Integer
logstashQueueRetryPolicy :: RetryPolicyM IO
logstashQueueWorkers :: Int
logstashQueueSize :: Int
logstashQueueContext :: ctx
logstashQueueTimeout :: forall ctx. LogstashQueueCfg ctx -> Integer
logstashQueueRetryPolicy :: forall ctx. LogstashQueueCfg ctx -> RetryPolicyM IO
logstashQueueWorkers :: forall ctx. LogstashQueueCfg ctx -> Int
logstashQueueSize :: forall ctx. LogstashQueueCfg ctx -> Int
logstashQueueContext :: forall ctx. LogstashQueueCfg ctx -> ctx
..} RetryStatus -> item -> ReaderT LogstashConnection IO ()
dispatch [item -> Handler ()]
handlers TBMQueue item -> m a
action =
((forall a. m a -> IO a) -> IO a) -> m a
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO (((forall a. m a -> IO a) -> IO a) -> m a)
-> ((forall a. m a -> IO a) -> IO a) -> m a
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
runInIO -> do
TBMQueue item
queue <- Int -> IO (TBMQueue item)
forall a. Int -> IO (TBMQueue a)
newTBMQueueIO Int
logstashQueueSize
let worker :: IO ()
worker = do
Maybe item
mitem <- STM (Maybe item) -> IO (Maybe item)
forall a. STM a -> IO a
atomically (STM (Maybe item) -> IO (Maybe item))
-> STM (Maybe item) -> IO (Maybe item)
forall a b. (a -> b) -> a -> b
$ TBMQueue item -> STM (Maybe item)
forall a. TBMQueue a -> STM (Maybe a)
readTBMQueue TBMQueue item
queue
case Maybe item
mitem of
Maybe item
Nothing -> () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Just item
item -> do
ctx
-> RetryPolicyM IO
-> Integer
-> (RetryStatus -> ReaderT LogstashConnection IO ())
-> IO ()
forall ctx (m :: * -> *) a.
(LogstashContext ctx, MonadMask m, MonadUnliftIO m) =>
ctx
-> RetryPolicyM m
-> Integer
-> (RetryStatus -> ReaderT LogstashConnection m a)
-> m a
runLogstash ctx
logstashQueueContext
RetryPolicyM IO
logstashQueueRetryPolicy
Integer
logstashQueueTimeout
(\RetryStatus
status -> RetryStatus -> item -> ReaderT LogstashConnection IO ()
dispatch RetryStatus
status item
item)
IO () -> [Handler ()] -> IO ()
forall a. IO a -> [Handler a] -> IO a
`catches` (((item -> Handler ()) -> Handler ())
-> [item -> Handler ()] -> [Handler ()]
forall a b. (a -> b) -> [a] -> [b]
map ((item -> Handler ()) -> item -> Handler ()
forall a b. (a -> b) -> a -> b
$ item
item) [item -> Handler ()]
handlers)
IO ()
worker
[Async ()]
workers <- Int -> IO (Async ()) -> IO [Async ()]
forall (m :: * -> *) a. Applicative m => Int -> m a -> m [a]
replicateM Int
logstashQueueWorkers (IO (Async ()) -> IO [Async ()]) -> IO (Async ()) -> IO [Async ()]
forall a b. (a -> b) -> a -> b
$ IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async IO ()
worker
m a -> IO a
forall a. m a -> IO a
runInIO (TBMQueue item -> m a
action TBMQueue item
queue) IO a -> IO () -> IO a
forall a b. IO a -> IO b -> IO a
`finally` do
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TBMQueue item -> STM ()
forall a. TBMQueue a -> STM ()
closeTBMQueue TBMQueue item
queue
(Async () -> IO (Either SomeException ())) -> [Async ()] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ Async () -> IO (Either SomeException ())
forall a. Async a -> IO (Either SomeException a)
waitCatch [Async ()]
workers