{-# LANGUAGE FlexibleInstances #-} {-# LANGUAGE MultiParamTypeClasses #-} {-# LANGUAGE LambdaCase #-} module Network.Monitoring.Riemann.BatchClient where import Control.Concurrent (MVar, forkIO, newEmptyMVar, putMVar, takeMVar) import qualified Control.Concurrent.Chan.Unagi as Unagi import Control.Concurrent.KazuraQueue ( Queue , lengthQueue , newQueue , readQueue , tryReadQueue , writeQueue ) import Control.Monad.IO.Class (MonadIO, liftIO) import qualified Data.Sequence as Seq import Data.Sequence (Seq, (|>)) import qualified Data.Sequence.Extra as Seq import Network.Monitoring.Riemann.Client (Client, close, sendEvent) import qualified Network.Monitoring.Riemann.Proto.Event as PE import qualified Network.Monitoring.Riemann.TCP as TCP import Network.Socket (HostName) import System.IO (hPutStrLn, stderr) newtype BatchClient = BatchClient (Unagi.InChan LogCommand) newtype BatchClientNoBuffer = BatchClientNoBuffer (Queue LogCommand) data LogCommand = Event PE.Event | Stop (MVar ()) {-| A new BatchClient The BatchClient is a 'Client' that will do the following * Batch events into a specified batch size * If events are produced more quickly than Riemann can cope with, they will be passed to the overflow function Batching events is important for throughput, see It is important to deal with back pressure, if the buffer of events to be sent to Riemann fills up, they will be passed to the overflow function until the buffer has space again. This overflow function can be as simple as 'print' Current time and host name will be set if not provided. '''Note''': We never use IPv6 address resolved for given hostname. -} batchClient :: HostName -> TCP.Port -> Int -> Int -> (PE.Event -> IO ()) -> IO BatchClient batchClient hostname port bufferSize batchSize overflow | batchSize <= 0 = error "Batch Size must be positive" | otherwise = do connection <- TCP.tcpConnection hostname port (inChan, outChan) <- Unagi.newChan queue <- newQueue _ <- forkIO $ overflowConsumer outChan queue bufferSize overflow _ <- forkIO $ riemannConsumer batchSize queue connection pure $ BatchClient inChan bufferlessBatchClient :: HostName -> TCP.Port -> Int -> IO BatchClientNoBuffer bufferlessBatchClient hostname port batchSize | batchSize <= 0 = error "Batch Size must be positive" | otherwise = do connection <- TCP.tcpConnection hostname port queue <- newQueue _ <- forkIO $ riemannConsumer batchSize queue connection pure $ BatchClientNoBuffer queue overflowConsumer :: Unagi.OutChan LogCommand -> Queue LogCommand -> Int -> (PE.Event -> IO ()) -> IO () overflowConsumer outChan queue bufferSize f = loop where loop = do cmd <- Unagi.readChan outChan case cmd of Event event -> do qSize <- lengthQueue queue if qSize >= bufferSize then do f event loop else do writeQueue queue cmd loop Stop _ -> do hPutStrLn stderr "stopping log consumer" writeQueue queue cmd drainAll :: Queue a -> Int -> IO (Seq a) drainAll queue limit = do msg <- readQueue queue loop (pure msg) (limit - 1) where loop msgs 0 = pure msgs loop msgs n = tryReadQueue queue >>= \case Nothing -> pure msgs Just msg -> loop (msgs |> msg) (n - 1) riemannConsumer :: Int -> Queue LogCommand -> TCP.TCPConnection -> IO () riemannConsumer batchSize queue connection = loop where loop = do cmds <- drainAll queue batchSize let (events, stops) = Seq.separate (\case Event e -> Left e Stop s -> Right s) cmds TCP.sendEvents connection events if Seq.null stops then loop else let s = Seq.index stops 0 in do hPutStrLn stderr "stopping riemann consumer" putMVar s () instance MonadIO m => Client m BatchClient where sendEvent (BatchClient inChan) event = liftIO . Unagi.writeChan inChan $ Event event close (BatchClient inChan) = liftIO $ do s <- newEmptyMVar Unagi.writeChan inChan (Stop s) takeMVar s instance MonadIO m => Client m BatchClientNoBuffer where sendEvent (BatchClientNoBuffer queue) event = liftIO . writeQueue queue $ Event event close (BatchClientNoBuffer queue) = liftIO $ do s <- newEmptyMVar writeQueue queue (Stop s) takeMVar s