module Network.Monitoring.Riemann.BatchClient where
import Control.Concurrent
import Control.Concurrent.Chan.Unagi as Unagi
import Control.Concurrent.KazuraQueue
import Data.Sequence as Seq
import Network.Monitoring.Riemann.Client
import Network.Monitoring.Riemann.Event as Event
import qualified Network.Monitoring.Riemann.Proto.Event as PE
import Network.Monitoring.Riemann.TCP as TCP
import Network.Socket
data BatchClient = BatchClient (Unagi.InChan LogCommand)
data BatchClientNoBuffer = BatchClientNoBuffer (Queue LogCommand)
data LogCommand = Event PE.Event
| Stop (MVar ())
batchClient :: HostName
-> Port
-> Int
-> Int
-> (PE.Event -> IO ())
-> IO BatchClient
batchClient hostname port bufferSize batchSize overflow
| batchSize <= 0 = error "Batch Size must be positive"
| otherwise = do
connection <- TCP.tcpConnection hostname port
(inChan, outChan) <- Unagi.newChan
q <- newQueue
forkIO $ overflowConsumer outChan q bufferSize overflow
forkIO $ riemannConsumer batchSize q connection
return $ BatchClient inChan
bc :: HostName -> Port -> Int -> (PE.Event -> IO ()) -> IO BatchClientNoBuffer
bc hostname port batchSize overflow
| batchSize <= 0 = error "Batch Size must be positive"
| otherwise = do
connection <- TCP.tcpConnection hostname port
q <- newQueue
forkIO $ riemannConsumer batchSize q connection
return $ BatchClientNoBuffer q
overflowConsumer :: Unagi.OutChan LogCommand
-> Queue LogCommand
-> Int
-> (PE.Event -> IO ())
-> IO ()
overflowConsumer outChan q bufferSize f =
loop
where
loop = do
cmd <- Unagi.readChan outChan
case cmd of
Event event -> do
qSize <- lengthQueue q
if qSize >= bufferSize
then do
f event
loop
else do
writeQueue q cmd
loop
Stop s -> do
putStrLn "stopping log consumer"
writeQueue q cmd
drainAll :: Queue a -> Int -> IO (Seq a)
drainAll q n = do
msg <- readQueue q
loop $ singleton msg
where
loop msgs = if Seq.length msgs >= n
then return msgs
else do
msg <- tryReadQueue q
case msg of
Just m -> loop $ msgs |> m
Nothing -> return msgs
riemannConsumer :: Int -> Queue LogCommand -> TCPConnection -> IO ()
riemannConsumer batchSize q connection =
loop
where
loop = do
cmds <- drainAll q batchSize
let (events', stops') = Seq.partition (\cmd -> case cmd of
Event event -> True
Stop s -> False)
cmds
events = fmap (\(Event e) -> e) events'
stops = fmap (\(Stop s) -> s) stops'
TCP.sendEvents connection events
if Seq.null stops
then loop
else let s = Seq.index stops 0
in do
putStrLn "stopping riemann consumer"
putMVar s ()
instance Client BatchClient where
sendEvent (BatchClient inChan) event =
Unagi.writeChan inChan $ Event event
close (BatchClient inChan) = do
s <- newEmptyMVar
Unagi.writeChan inChan (Stop s)
takeMVar s
instance Client BatchClientNoBuffer where
sendEvent (BatchClientNoBuffer q) event =
writeQueue q $ Event event
close (BatchClientNoBuffer q) = do
s <- newEmptyMVar
writeQueue q (Stop s)
takeMVar s