module Logstash (
module Logstash.Connection,
LogstashContext(..),
runLogstashConn,
runLogstashPool,
LogstashQueueCfg(..),
defaultLogstashQueueCfg,
withLogstashQueue,
stash,
stashJsonLine
) where
import Control.Concurrent
import Control.Concurrent.Async
import Control.Concurrent.STM
import Control.Concurrent.STM.TBMQueue
import Control.Concurrent.Timeout
import Control.Exception
import Control.Monad (replicateM)
import Control.Monad.Catch (MonadMask)
import Control.Monad.Reader
import Control.Monad.Trans.Control
import Control.Retry
import Data.Aeson
import Data.Acquire
import qualified Data.ByteString.Lazy as BSL
import Data.Either (isRight)
import Data.Pool
import UnliftIO (MonadUnliftIO(..))
import Logstash.Connection
stash
:: MonadIO m
=> BSL.ByteString
-> ReaderT LogstashConnection m ()
stash :: forall (m :: * -> *).
MonadIO m =>
ByteString -> ReaderT LogstashConnection m ()
stash ByteString
msg = forall r (m :: * -> *). MonadReader r m => m r
ask forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \LogstashConnection
con -> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ LogstashConnection -> ByteString -> IO ()
writeData LogstashConnection
con ByteString
msg
stashJsonLine
:: (MonadIO m, ToJSON a)
=> a
-> ReaderT LogstashConnection m ()
stashJsonLine :: forall (m :: * -> *) a.
(MonadIO m, ToJSON a) =>
a -> ReaderT LogstashConnection m ()
stashJsonLine = forall (m :: * -> *).
MonadIO m =>
ByteString -> ReaderT LogstashConnection m ()
stash forall b c a. (b -> c) -> (a -> b) -> a -> c
. (forall a. Semigroup a => a -> a -> a
<> ByteString
"\n") forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. ToJSON a => a -> ByteString
encode
data LogstashException = LogstashTimeout
deriving (LogstashException -> LogstashException -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: LogstashException -> LogstashException -> Bool
$c/= :: LogstashException -> LogstashException -> Bool
== :: LogstashException -> LogstashException -> Bool
$c== :: LogstashException -> LogstashException -> Bool
Eq, Int -> LogstashException -> ShowS
[LogstashException] -> ShowS
LogstashException -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [LogstashException] -> ShowS
$cshowList :: [LogstashException] -> ShowS
show :: LogstashException -> String
$cshow :: LogstashException -> String
showsPrec :: Int -> LogstashException -> ShowS
$cshowsPrec :: Int -> LogstashException -> ShowS
Show)
instance Exception LogstashException where
displayException :: LogstashException -> String
displayException LogstashException
_ = String
"Writing to Logstash timed out."
class LogstashContext ctx where
runLogstash
:: (MonadMask m, MonadUnliftIO m)
=> ctx
-> RetryPolicyM m
-> Integer
-> (RetryStatus -> ReaderT LogstashConnection m a)
-> m a
instance LogstashContext (Acquire LogstashConnection) where
runLogstash :: forall (m :: * -> *) a.
(MonadMask m, MonadUnliftIO m) =>
Acquire LogstashConnection
-> RetryPolicyM m
-> Integer
-> (RetryStatus -> ReaderT LogstashConnection m a)
-> m a
runLogstash = forall (m :: * -> *) a.
(MonadMask m, MonadUnliftIO m) =>
Acquire LogstashConnection
-> RetryPolicyM m
-> Integer
-> (RetryStatus -> ReaderT LogstashConnection m a)
-> m a
runLogstashConn
instance LogstashContext LogstashPool where
runLogstash :: forall (m :: * -> *) a.
(MonadMask m, MonadUnliftIO m) =>
LogstashPool
-> RetryPolicyM m
-> Integer
-> (RetryStatus -> ReaderT LogstashConnection m a)
-> m a
runLogstash = forall (m :: * -> *) a.
(MonadMask m, MonadUnliftIO m) =>
LogstashPool
-> RetryPolicyM m
-> Integer
-> (RetryStatus -> ReaderT LogstashConnection m a)
-> m a
runLogstashPool
runLogstashConn
:: (MonadMask m, MonadUnliftIO m)
=> Acquire LogstashConnection
-> RetryPolicyM m
-> Integer
-> (RetryStatus -> ReaderT LogstashConnection m a)
-> m a
runLogstashConn :: forall (m :: * -> *) a.
(MonadMask m, MonadUnliftIO m) =>
Acquire LogstashConnection
-> RetryPolicyM m
-> Integer
-> (RetryStatus -> ReaderT LogstashConnection m a)
-> m a
runLogstashConn Acquire LogstashConnection
acq RetryPolicyM m
policy Integer
time RetryStatus -> ReaderT LogstashConnection m a
action =
forall (m :: * -> *) a.
(MonadIO m, MonadMask m) =>
RetryPolicyM m -> (RetryStatus -> m a) -> m a
recoverAll RetryPolicyM m
policy forall a b. (a -> b) -> a -> b
$ \RetryStatus
s ->
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
runInIO -> do
Maybe a
mr <- forall α. Integer -> IO α -> IO (Maybe α)
timeout Integer
time forall a b. (a -> b) -> a -> b
$ forall a. m a -> IO a
runInIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b.
MonadUnliftIO m =>
Acquire a -> (a -> m b) -> m b
withAcquire Acquire LogstashConnection
acq forall a b. (a -> b) -> a -> b
$ forall r (m :: * -> *) a. ReaderT r m a -> r -> m a
runReaderT (RetryStatus -> ReaderT LogstashConnection m a
action RetryStatus
s)
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (forall a e. Exception e => e -> a
throw LogstashException
LogstashTimeout) forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe a
mr
runLogstashPool
:: (MonadMask m, MonadUnliftIO m)
=> LogstashPool
-> RetryPolicyM m
-> Integer
-> (RetryStatus -> ReaderT LogstashConnection m a)
-> m a
runLogstashPool :: forall (m :: * -> *) a.
(MonadMask m, MonadUnliftIO m) =>
LogstashPool
-> RetryPolicyM m
-> Integer
-> (RetryStatus -> ReaderT LogstashConnection m a)
-> m a
runLogstashPool LogstashPool
pool RetryPolicyM m
policy Integer
time RetryStatus -> ReaderT LogstashConnection m a
action =
forall (m :: * -> *) a.
(MonadIO m, MonadMask m) =>
RetryPolicyM m -> (RetryStatus -> m a) -> m a
recoverAll RetryPolicyM m
policy forall a b. (a -> b) -> a -> b
$ \RetryStatus
s ->
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
runInIO ->
forall b. ((forall a. IO a -> IO a) -> IO b) -> IO b
mask forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
restore -> do
(LogstashConnection
resource, LocalPool LogstashConnection
local) <- forall a. Pool a -> IO (a, LocalPool a)
takeResource LogstashPool
pool
Maybe a
mr <- forall a. IO a -> IO a
restore (forall α. Integer -> IO α -> IO (Maybe α)
timeout Integer
time forall a b. (a -> b) -> a -> b
$ forall a. m a -> IO a
runInIO forall a b. (a -> b) -> a -> b
$ forall r (m :: * -> *) a. ReaderT r m a -> r -> m a
runReaderT (RetryStatus -> ReaderT LogstashConnection m a
action RetryStatus
s) LogstashConnection
resource)
forall a b. IO a -> IO b -> IO a
`onException` forall a. Pool a -> LocalPool a -> a -> IO ()
destroyResource LogstashPool
pool LocalPool LogstashConnection
local LogstashConnection
resource
forall a. LocalPool a -> a -> IO ()
putResource LocalPool LogstashConnection
local LogstashConnection
resource
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (forall a e. Exception e => e -> a
throw LogstashException
LogstashTimeout) forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe a
mr
data LogstashQueueCfg ctx = MkLogstashQueueCfg {
forall ctx. LogstashQueueCfg ctx -> ctx
logstashQueueContext :: ctx,
forall ctx. LogstashQueueCfg ctx -> Int
logstashQueueSize :: Int,
forall ctx. LogstashQueueCfg ctx -> Int
logstashQueueWorkers :: Int,
forall ctx. LogstashQueueCfg ctx -> RetryPolicyM IO
logstashQueueRetryPolicy :: RetryPolicyM IO,
forall ctx. LogstashQueueCfg ctx -> Integer
logstashQueueTimeout :: Integer
}
defaultLogstashQueueCfg :: LogstashContext ctx => ctx -> LogstashQueueCfg ctx
defaultLogstashQueueCfg :: forall ctx. LogstashContext ctx => ctx -> LogstashQueueCfg ctx
defaultLogstashQueueCfg ctx
ctx = MkLogstashQueueCfg{
logstashQueueContext :: ctx
logstashQueueContext = ctx
ctx,
logstashQueueSize :: Int
logstashQueueSize = Int
1000,
logstashQueueWorkers :: Int
logstashQueueWorkers = Int
2,
logstashQueueRetryPolicy :: RetryPolicyM IO
logstashQueueRetryPolicy = forall (m :: * -> *). Monad m => Int -> RetryPolicyM m
exponentialBackoff Int
25000 forall a. Semigroup a => a -> a -> a
<> Int -> RetryPolicy
limitRetries Int
5,
logstashQueueTimeout :: Integer
logstashQueueTimeout = Integer
1000000
}
withLogstashQueue
:: (LogstashContext ctx, MonadUnliftIO m)
=> LogstashQueueCfg ctx
-> (RetryStatus -> item -> ReaderT LogstashConnection IO ())
-> [item -> Handler ()]
-> (TBMQueue item -> m a)
-> m a
withLogstashQueue :: forall ctx (m :: * -> *) item a.
(LogstashContext ctx, MonadUnliftIO m) =>
LogstashQueueCfg ctx
-> (RetryStatus -> item -> ReaderT LogstashConnection IO ())
-> [item -> Handler ()]
-> (TBMQueue item -> m a)
-> m a
withLogstashQueue MkLogstashQueueCfg{ctx
Int
Integer
RetryPolicyM IO
logstashQueueTimeout :: Integer
logstashQueueRetryPolicy :: RetryPolicyM IO
logstashQueueWorkers :: Int
logstashQueueSize :: Int
logstashQueueContext :: ctx
logstashQueueTimeout :: forall ctx. LogstashQueueCfg ctx -> Integer
logstashQueueRetryPolicy :: forall ctx. LogstashQueueCfg ctx -> RetryPolicyM IO
logstashQueueWorkers :: forall ctx. LogstashQueueCfg ctx -> Int
logstashQueueSize :: forall ctx. LogstashQueueCfg ctx -> Int
logstashQueueContext :: forall ctx. LogstashQueueCfg ctx -> ctx
..} RetryStatus -> item -> ReaderT LogstashConnection IO ()
dispatch [item -> Handler ()]
handlers TBMQueue item -> m a
action =
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
runInIO -> do
TBMQueue item
queue <- forall a. Int -> IO (TBMQueue a)
newTBMQueueIO Int
logstashQueueSize
let worker :: IO ()
worker = do
Maybe item
mitem <- forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TBMQueue a -> STM (Maybe a)
readTBMQueue TBMQueue item
queue
case Maybe item
mitem of
Maybe item
Nothing -> forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Just item
item -> do
forall ctx (m :: * -> *) a.
(LogstashContext ctx, MonadMask m, MonadUnliftIO m) =>
ctx
-> RetryPolicyM m
-> Integer
-> (RetryStatus -> ReaderT LogstashConnection m a)
-> m a
runLogstash ctx
logstashQueueContext
RetryPolicyM IO
logstashQueueRetryPolicy
Integer
logstashQueueTimeout
(\RetryStatus
status -> RetryStatus -> item -> ReaderT LogstashConnection IO ()
dispatch RetryStatus
status item
item)
forall a. IO a -> [Handler a] -> IO a
`catches` (forall a b. (a -> b) -> [a] -> [b]
map (forall a b. (a -> b) -> a -> b
$ item
item) [item -> Handler ()]
handlers)
IO ()
worker
[Async ()]
workers <- forall (m :: * -> *) a. Applicative m => Int -> m a -> m [a]
replicateM Int
logstashQueueWorkers forall a b. (a -> b) -> a -> b
$ forall a. IO a -> IO (Async a)
async IO ()
worker
forall a. m a -> IO a
runInIO (TBMQueue item -> m a
action TBMQueue item
queue) forall a b. IO a -> IO b -> IO a
`finally` do
forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TBMQueue a -> STM ()
closeTBMQueue TBMQueue item
queue
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ forall a. Async a -> IO (Either SomeException a)
waitCatch [Async ()]
workers