module Network.Nats.Protocol ( Connection (..)
, Subject(..)
, SubscriptionId(..)
, QueueGroup
, defaultConnectionOptions
, defaultTimeout
, receiveMessage
, receiveServerBanner
, sendConnect
, sendPong
, sendPub
, sendSub
, sendUnsub
, module Network.Nats.Protocol.Message
, module Network.Nats.Protocol.Types
) where
import Control.Monad.Catch
import Control.Monad.IO.Class (MonadIO)
import Data.Aeson (encode)
import Data.ByteString.Builder
import Data.Default (def)
import Data.Monoid
import Network.Nats.Protocol.Message
import Network.Nats.Protocol.Types
import System.IO (Handle)
import qualified Data.ByteString.Char8 as BS
newtype QueueGroup = QueueGroup BS.ByteString deriving Show
class (Monad m, MonadIO m) => Connection m where
receiveRawMessage :: Handle -> Int -> m BS.ByteString
sendRawMessage :: Handle -> Builder -> m ()
instance Connection IO where
receiveRawMessage = BS.hGetSome
sendRawMessage = hPutBuilder
defaultConnectionOptions :: NatsConnectionOptions
defaultConnectionOptions = def NatsConnectionOptions
defaultTimeout :: Int
defaultTimeout = 1000000
data Command where
Connect :: NatsConnectionOptions -> Command
Publish :: Subject -> BS.ByteString -> Command
Subscribe :: Subject -> SubscriptionId -> Maybe QueueGroup -> Command
Unsubscribe :: SubscriptionId -> Maybe Int -> Command
Pong :: Command
deriving (Show)
render :: Command -> Builder
render (Connect opts) =
stringUtf8 "CONNECT "
<> lazyByteString (encode opts)
<> spaceBuilder
<> byteString lineTerminator
render (Publish subj payload) =
stringUtf8 "PUB "
<> renderSubject subj
<> spaceBuilder
<> renderPayload payload
<> byteString lineTerminator
render (Subscribe subj subId _qgroup) =
stringUtf8 "SUB "
<> renderSubject subj
<> spaceBuilder
<> renderSubscriptionId subId
<> spaceBuilder
<> byteString lineTerminator
render (Unsubscribe subId _maxMsgs) =
stringUtf8 "UNSUB "
<> renderSubscriptionId subId <> spaceBuilder
<> byteString lineTerminator
render Pong =
stringUtf8 "PONG "
<> byteString lineTerminator
spaceBuilder :: Builder
spaceBuilder = charUtf8 ' '
lineTerminator :: BS.ByteString
lineTerminator = "\r\n"
renderSubject :: Subject -> Builder
renderSubject (Subject s) = byteString s
renderSubscriptionId :: SubscriptionId -> Builder
renderSubscriptionId (SubscriptionId i) = byteString i
renderPayload :: BS.ByteString -> Builder
renderPayload p = intDec (BS.length p) <> byteString lineTerminator <> byteString p
sendCommand :: Connection m => Handle -> Command -> m ()
sendCommand h cmd = sendRawMessage h $ render cmd
receiveServerBanner :: Connection m => Handle -> m (Either String NatsServerInfo)
receiveServerBanner h = do
receiveRawMessage h maxBytes >>= return . parseServerBanner
where maxBytes = 10240
receiveMessage :: (MonadThrow m, Connection m) => Handle -> Int -> m Message
receiveMessage h maxBytes = do
m <- receiveRawMessage h maxBytes
parseMessage m
sendConnect :: Connection m => Handle -> NatsConnectionOptions -> m ()
sendConnect h opts = sendCommand h $ Connect opts
sendPub :: Connection m => Handle -> Subject -> Int -> BS.ByteString -> m ()
sendPub h subj _payloadLen payload = sendCommand h $ Publish subj payload
sendSub :: Connection m => Handle -> Subject -> SubscriptionId -> Maybe QueueGroup -> m ()
sendSub h subj subId qgroup = sendCommand h $ Subscribe subj subId qgroup
sendUnsub :: Connection m => Handle -> SubscriptionId -> Maybe Int -> m ()
sendUnsub h subId max_msgs = sendCommand h $ Unsubscribe subId max_msgs
sendPong :: Connection m => Handle -> m ()
sendPong h = sendCommand h $ Pong