Copyright | (c) 2016 Patrik Sandahl |
---|---|
License | MIT |
Maintainer | Patrik Sandahl <patrik.sandahl@gmail.com> |
Stability | experimental |
Portability | portable |
Safe Haskell | None |
Language | Haskell2010 |
A Haskell client for the NATS messaging system. See https://nats.io for general information and documentation.
- data Nats
- data Msg
- type Sid = Int64
- type Payload = ByteString
- type Topic = ByteString
- type QueueGroup = ByteString
- data MsgQueue
- data ManagerSettings = ManagerSettings {
- reconnectionAttempts :: !Int
- maxWaitTimeMS :: !Int
- serverSelect :: ([URI], Int) -> IO (URI, Int)
- connectedTo :: SockAddr -> IO ()
- disconnectedFrom :: SockAddr -> IO ()
- data NatsException
- data SockAddr :: *
- withNats :: ManagerSettings -> [String] -> (Nats -> IO a) -> IO a
- publish :: Nats -> Topic -> Maybe Topic -> Payload -> IO ()
- publishJson :: ToJSON a => Nats -> Topic -> Maybe Topic -> a -> IO ()
- subscribe :: Nats -> Topic -> Maybe QueueGroup -> IO (Sid, MsgQueue)
- subscribeAsync :: Nats -> Topic -> Maybe QueueGroup -> (Msg -> IO ()) -> IO Sid
- request :: Nats -> Topic -> Payload -> IO Msg
- requestJson :: ToJSON a => Nats -> Topic -> a -> IO Msg
- unsubscribe :: Nats -> Sid -> Maybe Int -> IO ()
- nextMsg :: MsgQueue -> IO Msg
- topic :: Msg -> Topic
- replyTo :: Msg -> Maybe Topic
- sid :: Msg -> Sid
- payload :: Msg -> Payload
- jsonPayload :: FromJSON a => Msg -> Maybe a
- jsonPayload' :: FromJSON a => Msg -> Maybe a
- defaultSettings :: ManagerSettings
Limitations in implementation
1) The current version of this library does not yet support TLS.
2) The current version of this library does not yet support authorization tokens (but support user names and passwords in the URI strings).
Simple messaging example.
This section gives a simple messaging example using this library. The example requires the presence of a NATS server, running on localhost using the default port 4222. If other host or port, adapt the example.
{-# LANGUAGE OverloadedStrings #-} module Main ( main ) where import Network.Nats import Text.Printf main :: IO () main = withNats defaultSettings ["nats://localhost"] $ \nats -> do -- Subscribe to the topic "foo". (s, q) <- subscribe nats "foo" Nothing -- Publish to topic "foo", do not request a reply. publish nats "foo" Nothing "Some payload" -- Wait for a message, print the message's payload msg <- nextMsg q printf "Received %s\n" (show $ payload msg) -- Unsubscribe from topic "foo". unsubscribe nats s Nothing
Ascyncronous message handling.
Beside from the subscription mode where messages, synchronously, are fetched from a queue there is also an asynchronous mode where each request is handled immediately in their own thread.
{-# LANGUAGE OverloadedStrings #-} module Main ( main ) where import Control.Monad import Data.Maybe import Network.Nats import Text.Printf main :: IO () main = withNats defaultSettings ["nats://localhost"] $ \nats -> do -- A simple - asynchronous - help service that will answer -- requesters that give a reply topic with "I can help". s1 <- subscribeAsync nats "help" Nothing $ \msg -> do printf "Help service received: %s\n" (show $ payload msg) when (isJust $ replyTo msg) $ publish nats (fromJust $ replyTo msg) Nothing "I can help" -- Subscribe to help replies. (s2, q) <- subscribe nats "help.reply" Nothing -- Request help. publish nats "help" (Just "help.reply") "Please ..." -- Wait for reply. msg <- nextMsg q printf "Received: %s\n" (show $ payload msg) -- Unsubscribe. unsubscribe nats s1 Nothing unsubscribe nats s2 Nothing
Convenience API for the request pattern.
In the example above there's a common request pattern. Sending a message to a topic, requesting a reply, subscribing to the reply topic, receiving the reply message and then unsubscribe from the reply topic.
This pattern can be handled more simply using the request
function.
{-# LANGUAGE OverloadedStrings #-} module Main ( main ) where import Control.Monad import Data.Maybe import Network.Nats import Text.Printf main :: IO () main = withNats defaultSettings ["nats://localhost"] $ \nats -> do -- A simple - asynchronous - help service that will answer -- requesters that give a reply topic with "I can help". s <- subscribeAsync nats "help" Nothing $ \msg -> do printf "Help service received: %s\n" (show $ payload msg) when (isJust $ replyTo msg) $ publish nats (fromJust $ replyTo msg) Nothing "I can help" -- Request help. msg <- request nats "help" "Please ..." printf "Received: %s\n" (show $ payload msg) -- Unsubscribing the help service only. unsubscribe nats s Nothing
Topic structure.
Topic structure is tree like similar to file systems, or the Haskell module structure, and components in the tree is separated by dots. A subscriber of a topic can use wildcards to specify patterns.
{-# LANGUAGE OverloadedStrings #-} module Main ( main ) where import Control.Monad import Data.Maybe import Network.Nats import Text.Printf main :: IO () main = withNats defaultSettings ["nats://localhost"] $ \nats -> do -- "*" matches any token, at any level of the subject. (_, queue1) <- subscribe nats "foo.*.baz" Nothing (_, queue2) <- subscribe nats "foo.bar.*" Nothing -- ">" matches any length of the tail of the subject, and can -- only be the last token. (_, queue3) <- subscribe nats "foo.>" Nothing -- This publishing matches all the above. publish nats "foo.bar.baz" Nothing "Hello world" -- Show that the message showed up on all queues. forM_ [queue1, queue2, queue3] $ \queue -> do msg <- nextMsg queue printf "Received: %s\n" (show $ payload msg)
The type of the handle used by the API. To the user this
type is opaque. The Nats handle is only valid within the scope of
withNats
function.
A NATS message as received by the user. The message itself is
opaque to the user, but the fields can be read by the API functions
topic
, replyTo
, sid
, payload
, jsonPayload
and
jsonPayload'
The numeric id for a subscription. An id shall be unique within
a NATS client. The value of the id will be generated automatically
by the API. Type alias for Int64
.
type Payload = ByteString Source #
The type of a message payload. Type alias for ByteString
.
type Topic = ByteString Source #
The type of a topic where to publish, or to subscribe on. Type
alias for ByteString
.
type QueueGroup = ByteString Source #
A Topic
subscriber can be part of a queue group, an entity
for load balancing in NATS. Type alias for ByteString
.
data ManagerSettings Source #
A set of parameters to guide the behavior of the connection manager.
A default set of parameters can be obtained by calling
defaultSettings
.
ManagerSettings | |
|
data NatsException Source #
Exceptions generated from within this library.
ConnectionGiveUpException | An exception thrown when all the configured connection attempts are consumed and the connection manager has been given up. |
AuthorizationException | The NATS server currently connected to has said that there are authorization violations. Don't try to survive, just tell the user that there are such errors. |
URIError !String | An exception caused by invalid URI strings given to the
|
The existence of a constructor does not necessarily imply that
that socket address type is supported on your system: see
isSupportedSockAddr
.
:: ManagerSettings | Settings for the connection manager. Default
|
-> [String] | A list of URI strings to specify the NATS servers
available. If any URI string is malformed an |
-> (Nats -> IO a) | The user provided action. Once the action is terminated the connection will close. |
-> IO a |
Run an IO action while connection towards NATS is maintained. If
a NATS connection is lost, the connection manager will try to
reconnect the same or one of the other NATS servers
(as specified by the provided URI strings).
Strategies for reconnection is specified
in the ManagerSettings
. All subscriptions will be automatically
replayed once a new connection is made.
publish :: Nats -> Topic -> Maybe Topic -> Payload -> IO () Source #
Publish some Payload
message to a Topic
. The NATS server will
distribute the message to subscribers of the Topic
.
publish nats "GREETINGS" Nothing "Hello, there!"
Will publish the string Hello, there! to subscribers of GREETINGS. No
reply-to Topic
is provided. To request a reply, provide a Topic
where the subscriber can publish a reply.
publish nats "GREETINGS" (Just "THANKS") "Hello, there!"
publishJson :: ToJSON a => Nats -> Topic -> Maybe Topic -> a -> IO () Source #
As publish
, but with JSON payload.
subscribe :: Nats -> Topic -> Maybe QueueGroup -> IO (Sid, MsgQueue) Source #
Subscribe to a Topic
. Optionally a subscription can be part of
a QueueGroup
. The function will immediately return with a tuple of
a Sid
for the subscription, and a SubQueue
from where messages can
be fetched using nextMsg
.
(sid, queue) <- subscribe nats "do.stuff" Nothing
Or
(sid, queue) <- subscribe nats "do.stuff" (Just "stuffworkers")
subscribeAsync :: Nats -> Topic -> Maybe QueueGroup -> (Msg -> IO ()) -> IO Sid Source #
Subscribe to a Topic
. Optionally a subscription can be part of
a QueueGroup
.
Subscriptions using this function will be asynchronous, and each
message will be handled in its own thread. A message handler is
an IO action taking a Msg
as its argument. The function return
the Sid
for the subscription.
sid <- subscribeAsync nats "do.stuff" Nothing $ \msg -> do -- Do stuff with the msg
Or
sid <- subscribeAsync nats "do.stuff" Nothing messageHandler messageHandler :: Msg -> IO () messageHandler msg = do -- Do stuff with the msg
unsubscribe :: Nats -> Sid -> Maybe Int -> IO () Source #
Unsubscribe from a subscription using its Sid
. Optionally a limit
for automatic unsubscription can be given. Unsubscription will happen
once the number of messages - the limit - has been reached.
unsubscribe nats sid Nothing
Or
unsubscribe nats sid (Just 100)
Read the subscription id for the subscription on which this message was received.
jsonPayload :: FromJSON a => Msg -> Maybe a Source #
Decode a message's payload as JSON. Is using decode
for
the decoding.
jsonPayload' :: FromJSON a => Msg -> Maybe a Source #
Decode a message's payload as JSON. Is using decode'
for
the decoding.
defaultSettings :: ManagerSettings Source #
Create a default ManagerSettings
.