{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE ViewPatterns #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE TypeSynonymInstances #-}

module Network.Monitoring.Riemann.TCP
  ( tcpConnection
  , sendEvents
  , sendMsg
  , TCPConnection
  , Port
  ) where

import Control.Concurrent.STM (atomically)
import Control.Concurrent.STM.TVar (TVar, newTVarIO, readTVarIO, writeTVar)
import Control.Exception (IOException, try)
import Data.Bifunctor (first)
import qualified Data.Binary.Put as Put
import qualified Data.ByteString.Lazy as BS
import qualified Data.ByteString.Lazy.Char8 as BC
import qualified Data.Maybe as Maybe
import Data.Monoid ((<>))
import Data.Sequence (Seq)
import qualified Network.Monitoring.Riemann.Event as Event
import qualified Network.Monitoring.Riemann.Proto.Event as PE
import qualified Network.Monitoring.Riemann.Proto.Msg as Msg
import Network.Socket
  ( AddrInfo
  , AddrInfoFlag(AI_NUMERICSERV)
  , Family(AF_INET)
  , HostName
  , Socket
  , SocketType(Stream)
  , addrAddress
  , addrFamily
  , addrFlags
  , connect
  , defaultHints
  , defaultProtocol
  , getAddrInfo
  , socket
  )
import qualified Network.Socket.ByteString.Lazy as NSB
import System.IO (hPutStrLn, stderr)
import qualified Text.ProtocolBuffers.Header as P'
import qualified Text.ProtocolBuffers.WireMessage as WM

data ClientInfo = ClientInfo
  { ClientInfo -> HostName
_hostname :: HostName
  , ClientInfo -> Port
_port :: Port
  , ClientInfo -> TCPStatus
_status :: TCPStatus
  }

type TCPConnection = TVar ClientInfo

data TCPStatus
  = CnxClosed
  | CnxOpen (Socket, AddrInfo)
  deriving (Port -> TCPStatus -> ShowS
[TCPStatus] -> ShowS
TCPStatus -> HostName
(Port -> TCPStatus -> ShowS)
-> (TCPStatus -> HostName)
-> ([TCPStatus] -> ShowS)
-> Show TCPStatus
forall a.
(Port -> a -> ShowS) -> (a -> HostName) -> ([a] -> ShowS) -> Show a
showList :: [TCPStatus] -> ShowS
$cshowList :: [TCPStatus] -> ShowS
show :: TCPStatus -> HostName
$cshow :: TCPStatus -> HostName
showsPrec :: Port -> TCPStatus -> ShowS
$cshowsPrec :: Port -> TCPStatus -> ShowS
Show)

type Port = Int

tcpConnection :: HostName -> Port -> IO TCPConnection
tcpConnection :: HostName -> Port -> IO TCPConnection
tcpConnection HostName
_hostname Port
_port = do
  ClientInfo
clientInfo <- ClientInfo -> IO ClientInfo
doConnect (ClientInfo -> IO ClientInfo) -> ClientInfo -> IO ClientInfo
forall a b. (a -> b) -> a -> b
$ ClientInfo :: HostName -> Port -> TCPStatus -> ClientInfo
ClientInfo {_status :: TCPStatus
_status = TCPStatus
CnxClosed, Port
HostName
_port :: Port
_hostname :: HostName
_port :: Port
_hostname :: HostName
..}
  ClientInfo -> IO TCPConnection
forall a. a -> IO (TVar a)
newTVarIO ClientInfo
clientInfo

doConnect :: ClientInfo -> IO ClientInfo
doConnect :: ClientInfo -> IO ClientInfo
doConnect clientInfo :: ClientInfo
clientInfo@(ClientInfo -> TCPStatus
_status -> CnxOpen (Socket, AddrInfo)
_) = ClientInfo -> IO ClientInfo
forall (f :: * -> *) a. Applicative f => a -> f a
pure ClientInfo
clientInfo
doConnect ClientInfo
clientInfo = do
  Handle -> HostName -> IO ()
hPutStrLn Handle
stderr (HostName -> IO ()) -> HostName -> IO ()
forall a b. (a -> b) -> a -> b
$
    HostName
"(Re)connecting to Riemann: " HostName -> ShowS
forall a. Semigroup a => a -> a -> a
<> ClientInfo -> HostName
_hostname ClientInfo
clientInfo HostName -> ShowS
forall a. Semigroup a => a -> a -> a
<> HostName
":" HostName -> ShowS
forall a. Semigroup a => a -> a -> a
<>
    Port -> HostName
forall a. Show a => a -> HostName
show (ClientInfo -> Port
_port ClientInfo
clientInfo)
  Either IOError [AddrInfo]
addrs <-
    IO [AddrInfo] -> IO (Either IOError [AddrInfo])
forall e a. Exception e => IO a -> IO (Either e a)
try (IO [AddrInfo] -> IO (Either IOError [AddrInfo]))
-> IO [AddrInfo] -> IO (Either IOError [AddrInfo])
forall a b. (a -> b) -> a -> b
$
    Maybe AddrInfo -> Maybe HostName -> Maybe HostName -> IO [AddrInfo]
getAddrInfo
      (AddrInfo -> Maybe AddrInfo
forall a. a -> Maybe a
Just (AddrInfo -> Maybe AddrInfo) -> AddrInfo -> Maybe AddrInfo
forall a b. (a -> b) -> a -> b
$ AddrInfo
defaultHints {addrFlags :: [AddrInfoFlag]
addrFlags = [AddrInfoFlag
AI_NUMERICSERV], addrFamily :: Family
addrFamily = Family
AF_INET})
      (HostName -> Maybe HostName
forall a. a -> Maybe a
Just (ClientInfo -> HostName
_hostname ClientInfo
clientInfo))
      (HostName -> Maybe HostName
forall a. a -> Maybe a
Just (HostName -> Maybe HostName)
-> (ClientInfo -> HostName) -> ClientInfo -> Maybe HostName
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Port -> HostName
forall a. Show a => a -> HostName
show (Port -> HostName)
-> (ClientInfo -> Port) -> ClientInfo -> HostName
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ClientInfo -> Port
_port (ClientInfo -> Maybe HostName) -> ClientInfo -> Maybe HostName
forall a b. (a -> b) -> a -> b
$ ClientInfo
clientInfo)
  let family :: Family
family = Family
AF_INET
  case Either IOError [AddrInfo]
addrs of
    Right [] -> HostName -> IO ClientInfo
forall (m :: * -> *) a. MonadFail m => HostName -> m a
fail (HostName
"No accessible addresses in " HostName -> ShowS
forall a. [a] -> [a] -> [a]
++ Either IOError [AddrInfo] -> HostName
forall a. Show a => a -> HostName
show Either IOError [AddrInfo]
addrs)
    Right (AddrInfo
addy:[AddrInfo]
_) -> do
      Socket
s <- Family -> SocketType -> ProtocolNumber -> IO Socket
socket Family
family SocketType
Stream ProtocolNumber
defaultProtocol
      Either IOError ()
result <- IO () -> IO (Either IOError ())
forall e a. Exception e => IO a -> IO (Either e a)
try (IO () -> IO (Either IOError ()))
-> IO () -> IO (Either IOError ())
forall a b. (a -> b) -> a -> b
$ Socket -> SockAddr -> IO ()
connect Socket
s (AddrInfo -> SockAddr
addrAddress AddrInfo
addy)
      case Either IOError ()
result of
        Left IOError
err -> IOError -> IO ClientInfo
handleError IOError
err
        Right () -> ClientInfo -> IO ClientInfo
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ClientInfo -> IO ClientInfo) -> ClientInfo -> IO ClientInfo
forall a b. (a -> b) -> a -> b
$ ClientInfo
clientInfo {_status :: TCPStatus
_status = (Socket, AddrInfo) -> TCPStatus
CnxOpen (Socket
s, AddrInfo
addy)}
    Left IOError
err -> IOError -> IO ClientInfo
handleError IOError
err
  where
    handleError :: IOError -> IO ClientInfo
    handleError :: IOError -> IO ClientInfo
handleError IOError
err = do
      Handle -> HostName -> IO ()
hPutStrLn Handle
stderr (HostName -> IO ()) -> HostName -> IO ()
forall a b. (a -> b) -> a -> b
$ HostName
"Connection to Riemann failed: " HostName -> ShowS
forall a. Semigroup a => a -> a -> a
<> IOError -> HostName
forall a. Show a => a -> HostName
show IOError
err
      ClientInfo -> IO ClientInfo
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ClientInfo -> IO ClientInfo) -> ClientInfo -> IO ClientInfo
forall a b. (a -> b) -> a -> b
$ ClientInfo
clientInfo {_status :: TCPStatus
_status = TCPStatus
CnxClosed}

msgToByteString :: Msg.Msg -> BC.ByteString
msgToByteString :: Msg -> ByteString
msgToByteString Msg
msg =
  Put -> ByteString
Put.runPut (Put -> ByteString) -> Put -> ByteString
forall a b. (a -> b) -> a -> b
$ do
    Word32 -> Put
Put.putWord32be (Word32 -> Put) -> Word32 -> Put
forall a b. (a -> b) -> a -> b
$ WireSize -> Word32
forall a b. (Integral a, Num b) => a -> b
fromIntegral (WireSize -> Word32) -> WireSize -> Word32
forall a b. (a -> b) -> a -> b
$ Msg -> WireSize
forall msg. (ReflectDescriptor msg, Wire msg) => msg -> WireSize
WM.messageSize Msg
msg
    Msg -> Put
forall msg. (ReflectDescriptor msg, Wire msg) => msg -> Put
WM.messagePutM Msg
msg

decodeMsg :: BC.ByteString -> Either String Msg.Msg
decodeMsg :: ByteString -> Either HostName Msg
decodeMsg ByteString
bs =
  let result :: Either HostName (Msg, ByteString)
result = ByteString -> Either HostName (Msg, ByteString)
forall msg.
(ReflectDescriptor msg, Wire msg) =>
ByteString -> Either HostName (msg, ByteString)
WM.messageGet (WireSize -> ByteString -> ByteString
BS.drop WireSize
4 ByteString
bs)
   in case Either HostName (Msg, ByteString)
result of
        Left HostName
e -> HostName -> Either HostName Msg
forall a b. a -> Either a b
Left HostName
e
        Right (Msg
m, ByteString
_) ->
          if Maybe Bool -> Bool
forall a. Maybe a -> Bool
Maybe.isNothing (Msg -> Maybe Bool
Msg.ok Msg
m)
            then HostName -> Either HostName Msg
forall a b. a -> Either a b
Left HostName
"error"
            else Msg -> Either HostName Msg
forall a b. b -> Either a b
Right Msg
m

{-| Attempts to send a message and return the response.

If the connection is down, this function will trigger one reconnection attempt.
If that succeeds the message will be sent.
If it fails, the message is dropped and will need to be resent by you.
-}
sendMsg :: TCPConnection -> Msg.Msg -> IO (Either Msg.Msg Msg.Msg)
sendMsg :: TCPConnection -> Msg -> IO (Either Msg Msg)
sendMsg TCPConnection
client Msg
msg = Bool -> IO (Either Msg Msg)
go Bool
True
  where
    go :: Bool -> IO (Either Msg Msg)
go Bool
reconnect = do
      ClientInfo
clientInfo <- TCPConnection -> IO ClientInfo
forall a. TVar a -> IO a
readTVarIO TCPConnection
client
      case (ClientInfo -> TCPStatus
_status ClientInfo
clientInfo, Bool
reconnect) of
        (TCPStatus
CnxClosed, Bool
True) -> do
          ClientInfo
newInfo <- ClientInfo -> IO ClientInfo
doConnect ClientInfo
clientInfo
          STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TCPConnection -> ClientInfo -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TCPConnection
client ClientInfo
newInfo
          Bool -> IO (Either Msg Msg)
go Bool
False
        (TCPStatus
CnxClosed, Bool
False) -> Either Msg Msg -> IO (Either Msg Msg)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either Msg Msg -> IO (Either Msg Msg))
-> Either Msg Msg -> IO (Either Msg Msg)
forall a b. (a -> b) -> a -> b
$ Msg -> Either Msg Msg
forall a b. a -> Either a b
Left Msg
msg
        (CnxOpen (Socket
s, AddrInfo
_), Bool
_) -> do
          Either HostName ByteString
response <-
            (IOError -> HostName)
-> Either IOError ByteString -> Either HostName ByteString
forall (p :: * -> * -> *) a b c.
Bifunctor p =>
(a -> b) -> p a c -> p b c
first (IOError -> HostName
forall a. Show a => a -> HostName
show :: IOException -> String) (Either IOError ByteString -> Either HostName ByteString)
-> IO (Either IOError ByteString)
-> IO (Either HostName ByteString)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
            IO ByteString -> IO (Either IOError ByteString)
forall e a. Exception e => IO a -> IO (Either e a)
try
              (do Socket -> ByteString -> IO ()
NSB.sendAll Socket
s (ByteString -> IO ()) -> ByteString -> IO ()
forall a b. (a -> b) -> a -> b
$ Msg -> ByteString
msgToByteString Msg
msg
                  Socket -> WireSize -> IO ByteString
NSB.recv Socket
s WireSize
4096)
          case ByteString -> Either HostName Msg
decodeMsg (ByteString -> Either HostName Msg)
-> Either HostName ByteString -> Either HostName Msg
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Either HostName ByteString
response of
            Left HostName
_ -> do
              STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TCPConnection -> ClientInfo -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TCPConnection
client (ClientInfo
clientInfo {_status :: TCPStatus
_status = TCPStatus
CnxClosed})
              Either Msg Msg -> IO (Either Msg Msg)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either Msg Msg -> IO (Either Msg Msg))
-> Either Msg Msg -> IO (Either Msg Msg)
forall a b. (a -> b) -> a -> b
$ Msg -> Either Msg Msg
forall a b. a -> Either a b
Left Msg
msg
            Right Msg
m -> Either Msg Msg -> IO (Either Msg Msg)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either Msg Msg -> IO (Either Msg Msg))
-> Either Msg Msg -> IO (Either Msg Msg)
forall a b. (a -> b) -> a -> b
$ Msg -> Either Msg Msg
forall a b. b -> Either a b
Right Msg
m

{-|
    Send a list of Riemann events

    Host and Time will be added if they do not exist on the Event
-}
sendEvents :: TCPConnection -> Seq PE.Event -> IO ()
sendEvents :: TCPConnection -> Seq Event -> IO ()
sendEvents TCPConnection
connection Seq Event
events = do
  Seq Event
eventsWithDefaults <- Seq Event -> IO (Seq Event)
Event.withDefaults Seq Event
events
  Either Msg Msg
result <-
    TCPConnection -> Msg -> IO (Either Msg Msg)
sendMsg TCPConnection
connection (Msg -> IO (Either Msg Msg)) -> Msg -> IO (Either Msg Msg)
forall a b. (a -> b) -> a -> b
$ Msg
forall a. Default a => a
P'.defaultValue {events :: Seq Event
Msg.events = Seq Event
eventsWithDefaults}
  case Either Msg Msg
result of
    Left Msg
msg -> Handle -> HostName -> IO ()
hPutStrLn Handle
stderr (HostName -> IO ()) -> HostName -> IO ()
forall a b. (a -> b) -> a -> b
$ HostName
"failed to send" HostName -> ShowS
forall a. [a] -> [a] -> [a]
++ Msg -> HostName
forall a. Show a => a -> HostName
show Msg
msg
    Right Msg
_ -> () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()