{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE TypeSynonymInstances #-}
module Network.Monitoring.Riemann.TCP
( tcpConnection
, sendEvents
, sendMsg
, TCPConnection
, Port
) where
import Control.Concurrent (MVar, newMVar, putMVar, takeMVar)
import qualified Data.Binary.Put as Put
import qualified Data.ByteString.Lazy as BS
import qualified Data.ByteString.Lazy.Char8 as BC
import qualified Data.Maybe as Maybe
import Data.Sequence (Seq)
import qualified Network.Monitoring.Riemann.Event as Event
import qualified Network.Monitoring.Riemann.Proto.Event as PE
import qualified Network.Monitoring.Riemann.Proto.Msg as Msg
import Network.Socket
( AddrInfo
, AddrInfoFlag(AI_NUMERICSERV)
, Family(AF_INET)
, HostName
, Socket
, SocketType(Stream)
, addrAddress
, addrFlags
, addrFamily
, connect
, defaultHints
, defaultProtocol
, getAddrInfo
, socket
)
import qualified Network.Socket.ByteString.Lazy as NSB
import System.IO (hPutStrLn, stderr)
import qualified Text.ProtocolBuffers.Header as P'
import qualified Text.ProtocolBuffers.WireMessage as WM
type ClientInfo = (HostName, Port, TCPStatus)
type TCPConnection = MVar ClientInfo
data TCPStatus
= CnxClosed
| CnxOpen (Socket, AddrInfo)
deriving (Show)
type Port = Int
tcpConnection :: HostName -> Port -> IO TCPConnection
tcpConnection h p = do
connection <- doConnect h p
newMVar (h, p, CnxOpen connection)
getConnection :: ClientInfo -> IO (Socket, AddrInfo)
getConnection (_, _, CnxOpen (s, a)) = pure (s, a)
getConnection (h, p, CnxClosed) = doConnect h p
doConnect :: HostName -> Port -> IO (Socket, AddrInfo)
doConnect hn po = do
addrs <-
getAddrInfo
(Just $ defaultHints {addrFlags = [AI_NUMERICSERV], addrFamily = AF_INET})
(Just hn)
(Just $ show po)
let family = AF_INET
case addrs of
[] -> fail ("No accessible addresses in " ++ show addrs)
(addy:_) -> do
s <- socket family Stream defaultProtocol
connect s (addrAddress addy)
pure (s, addy)
msgToByteString :: Msg.Msg -> BC.ByteString
msgToByteString msg =
Put.runPut $ do
Put.putWord32be $ fromIntegral $ WM.messageSize msg
WM.messagePutM msg
decodeMsg :: BC.ByteString -> Either String Msg.Msg
decodeMsg bs =
let result = WM.messageGet (BS.drop 4 bs)
in case result of
Left e -> Left e
Right (m, _) ->
if Maybe.isNothing (Msg.ok m)
then Left "error"
else Right m
sendMsg :: TCPConnection -> Msg.Msg -> IO (Either Msg.Msg Msg.Msg)
sendMsg client msg = do
clientInfo@(h, p, _) <- takeMVar client
(s, _) <- getConnection clientInfo
NSB.sendAll s $ msgToByteString msg
bs <- NSB.recv s 4096
case decodeMsg bs of
Right m -> do
putMVar client clientInfo
pure $ Right m
Left _ -> do
putMVar client (h, p, CnxClosed)
pure $ Left msg
sendEvents :: TCPConnection -> Seq PE.Event -> IO ()
sendEvents connection events = do
eventsWithDefaults <- Event.withDefaults events
result <-
sendMsg connection $ P'.defaultValue {Msg.events = eventsWithDefaults}
case result of
Left msg -> hPutStrLn stderr $ "failed to send" ++ show msg
Right _ -> pure ()