{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE LambdaCase #-}
module Network.Monitoring.Riemann.BatchClient where
import Control.Concurrent (MVar, forkIO, newEmptyMVar, putMVar, takeMVar)
import qualified Control.Concurrent.Chan.Unagi as Unagi
import Control.Concurrent.KazuraQueue
( Queue
, lengthQueue
, newQueue
, readQueue
, tryReadQueue
, writeQueue
)
import Control.Monad.IO.Class (MonadIO, liftIO)
import qualified Data.Sequence as Seq
import Data.Sequence (Seq, (|>))
import qualified Data.Sequence.Extra as Seq
import Network.Monitoring.Riemann.Client (Client, close, sendEvent)
import qualified Network.Monitoring.Riemann.Proto.Event as PE
import qualified Network.Monitoring.Riemann.TCP as TCP
import Network.Socket (HostName)
import System.IO (hPutStrLn, stderr)
newtype BatchClient =
BatchClient (Unagi.InChan LogCommand)
newtype BatchClientNoBuffer =
BatchClientNoBuffer (Queue LogCommand)
data LogCommand
= Event PE.Event
| Stop (MVar ())
batchClient ::
HostName -> TCP.Port -> Int -> Int -> (PE.Event -> IO ()) -> IO BatchClient
batchClient hostname port bufferSize batchSize overflow
| batchSize <= 0 = error "Batch Size must be positive"
| otherwise = do
connection <- TCP.tcpConnection hostname port
(inChan, outChan) <- Unagi.newChan
queue <- newQueue
_ <- forkIO $ overflowConsumer outChan queue bufferSize overflow
_ <- forkIO $ riemannConsumer batchSize queue connection
pure $ BatchClient inChan
bufferlessBatchClient :: HostName -> TCP.Port -> Int -> IO BatchClientNoBuffer
bufferlessBatchClient hostname port batchSize
| batchSize <= 0 = error "Batch Size must be positive"
| otherwise = do
connection <- TCP.tcpConnection hostname port
queue <- newQueue
_ <- forkIO $ riemannConsumer batchSize queue connection
pure $ BatchClientNoBuffer queue
overflowConsumer ::
Unagi.OutChan LogCommand
-> Queue LogCommand
-> Int
-> (PE.Event -> IO ())
-> IO ()
overflowConsumer outChan queue bufferSize f = loop
where
loop = do
cmd <- Unagi.readChan outChan
case cmd of
Event event -> do
qSize <- lengthQueue queue
if qSize >= bufferSize
then do
f event
loop
else do
writeQueue queue cmd
loop
Stop _ -> do
hPutStrLn stderr "stopping log consumer"
writeQueue queue cmd
drainAll :: Queue a -> Int -> IO (Seq a)
drainAll queue limit = do
msg <- readQueue queue
loop (pure msg) (limit - 1)
where
loop msgs 0 = pure msgs
loop msgs n =
tryReadQueue queue >>= \case
Nothing -> pure msgs
Just msg -> loop (msgs |> msg) (n - 1)
riemannConsumer :: Int -> Queue LogCommand -> TCP.TCPConnection -> IO ()
riemannConsumer batchSize queue connection = loop
where
loop = do
cmds <- drainAll queue batchSize
let (events, stops) =
Seq.separate
(\case
Event e -> Left e
Stop s -> Right s)
cmds
TCP.sendEvents connection events
if Seq.null stops
then loop
else let s = Seq.index stops 0
in do hPutStrLn stderr "stopping riemann consumer"
putMVar s ()
instance MonadIO m => Client m BatchClient where
sendEvent (BatchClient inChan) event =
liftIO . Unagi.writeChan inChan $ Event event
close (BatchClient inChan) =
liftIO $ do
s <- newEmptyMVar
Unagi.writeChan inChan (Stop s)
takeMVar s
instance MonadIO m => Client m BatchClientNoBuffer where
sendEvent (BatchClientNoBuffer queue) event =
liftIO . writeQueue queue $ Event event
close (BatchClientNoBuffer queue) =
liftIO $ do
s <- newEmptyMVar
writeQueue queue (Stop s)
takeMVar s