module Network.Monitoring.Riemann.TCP
( tcpConnection
, sendEvents
, sendMsg
, TCPConnection
, Port
) where
import Control.Concurrent
import Data.Binary.Put as Put
import qualified Data.ByteString.Lazy as BS
import qualified Data.ByteString.Lazy.Char8 as BC
import qualified Data.Maybe as Maybe
import qualified Data.Sequence as Seq
import qualified Network.Monitoring.Riemann.Event as Event
import qualified Network.Monitoring.Riemann.Proto.Event as PE
import qualified Network.Monitoring.Riemann.Proto.Msg as Msg
import Network.Socket
import Network.Socket.ByteString.Lazy as NSB
import qualified Text.ProtocolBuffers.Header as P'
import Text.ProtocolBuffers.WireMessage as WM
type ClientInfo = (HostName, Port, TCPStatus)
type TCPConnection = MVar ClientInfo
data TCPStatus = CnxClosed
| CnxOpen (Socket, AddrInfo)
deriving (Show)
type Port = Int
tcpConnection :: HostName -> Port -> IO TCPConnection
tcpConnection h p = do
connection <- doConnect h p
newMVar (h, p, CnxOpen connection)
getConnection :: ClientInfo -> IO (Socket, AddrInfo)
getConnection (_, _, CnxOpen (s, a)) =
return (s, a)
getConnection (h, p, CnxClosed) =
doConnect h p
doConnect :: HostName -> Port -> IO (Socket, AddrInfo)
doConnect hn po = do
addrs <- getAddrInfo (Just $
defaultHints { addrFlags = [ AI_NUMERICSERV ] })
(Just hn)
(Just $ show po)
case addrs of
[] -> fail ("No accessible addresses in " ++ show addrs)
(addy : _) -> do
s <- socket AF_INET Stream defaultProtocol
connect s (addrAddress addy)
return (s, addy)
msgToByteString :: Msg.Msg -> BC.ByteString
msgToByteString msg = Put.runPut $ do
Put.putWord32be $ fromIntegral $ WM.messageSize msg
messagePutM msg
decodeMsg :: BC.ByteString -> Either String Msg.Msg
decodeMsg bs = let result = messageGet (BS.drop 4 bs)
in
case result of
Left e -> Left e
Right (m, _) -> if Maybe.isNothing (Msg.ok m)
then Left "error"
else Right m
sendMsg :: TCPConnection -> Msg.Msg -> IO (Either Msg.Msg Msg.Msg)
sendMsg client msg = do
clientInfo@(h, p, _) <- takeMVar client
(s, _) <- getConnection clientInfo
sendAll s $ msgToByteString msg
bs <- NSB.recv s 4096
result <- pure $ decodeMsg bs
case result of
Right m -> do
putMVar client clientInfo
return $ Right m
Left _ -> do
putMVar client (h, p, CnxClosed)
return $ Left msg
sendEvents :: TCPConnection -> Seq.Seq PE.Event -> IO ()
sendEvents connection events = do
eventsWithDefaults <- Event.withDefaults events
result <- sendMsg connection $
P'.defaultValue { Msg.events = eventsWithDefaults }
case result of
Left msg -> print $ "failed to send" ++ show msg
Right _ -> return ()