module Network.Monitoring.Riemann.BatchClient where
import Control.Concurrent
import Control.Concurrent.Chan.Unagi as Unagi
import Control.Concurrent.KazuraQueue
import Data.Sequence as Seq
import Network.Monitoring.Riemann.Client
import qualified Network.Monitoring.Riemann.Proto.Event as PE
import Network.Monitoring.Riemann.TCP as TCP
import Network.Socket
data BatchClient = BatchClient (Unagi.InChan LogCommand)
data BatchClientNoBuffer = BatchClientNoBuffer (Queue LogCommand)
data LogCommand = Event PE.Event
| Stop (MVar ())
batchClient :: HostName
-> Port
-> Int
-> Int
-> (PE.Event -> IO ())
-> IO BatchClient
batchClient hostname port bufferSize batchSize overflow
| batchSize <= 0 = error "Batch Size must be positive"
| otherwise = do
connection <- TCP.tcpConnection hostname port
(inChan, outChan) <- Unagi.newChan
q <- newQueue
_ <- forkIO $ overflowConsumer outChan q bufferSize overflow
_ <- forkIO $ riemannConsumer batchSize q connection
return $ BatchClient inChan
bufferlessBatchClient :: HostName -> Port -> Int -> IO BatchClientNoBuffer
bufferlessBatchClient hostname port batchSize
| batchSize <= 0 = error "Batch Size must be positive"
| otherwise = do
connection <- TCP.tcpConnection hostname port
q <- newQueue
_ <- forkIO $ riemannConsumer batchSize q connection
return $ BatchClientNoBuffer q
overflowConsumer :: Unagi.OutChan LogCommand
-> Queue LogCommand
-> Int
-> (PE.Event -> IO ())
-> IO ()
overflowConsumer outChan q bufferSize f =
loop
where
loop = do
cmd <- Unagi.readChan outChan
case cmd of
Event event -> do
qSize <- lengthQueue q
if qSize >= bufferSize
then do
f event
loop
else do
writeQueue q cmd
loop
Stop _ -> do
putStrLn "stopping log consumer"
writeQueue q cmd
drainAll :: Queue a -> Int -> IO (Seq a)
drainAll q n = do
msg <- readQueue q
loop $ singleton msg
where
loop msgs = if Seq.length msgs >= n
then return msgs
else do
msg <- tryReadQueue q
case msg of
Just m -> loop $ msgs |> m
Nothing -> return msgs
riemannConsumer :: Int -> Queue LogCommand -> TCPConnection -> IO ()
riemannConsumer batchSize q connection =
loop
where
loop = do
cmds <- drainAll q batchSize
let (events', stops') = Seq.partition (\cmd -> case cmd of
Event _ -> True
Stop _ -> False)
cmds
events = fmap (\(Event e) -> e) events'
stops = fmap (\(Stop s) -> s) stops'
TCP.sendEvents connection events
if Seq.null stops
then loop
else let s = Seq.index stops 0
in do
putStrLn "stopping riemann consumer"
putMVar s ()
instance Client BatchClient where
sendEvent (BatchClient inChan) event =
Unagi.writeChan inChan $ Event event
close (BatchClient inChan) = do
s <- newEmptyMVar
Unagi.writeChan inChan (Stop s)
takeMVar s
instance Client BatchClientNoBuffer where
sendEvent (BatchClientNoBuffer q) event =
writeQueue q $ Event event
close (BatchClientNoBuffer q) = do
s <- newEmptyMVar
writeQueue q (Stop s)
takeMVar s