Portability | portable |
---|---|
Stability | experimental |
Safe Haskell | None |
The Stomp Protocol specifies message-oriented interoperability. Applications connect to a message broker to send (publish) or receive (subscribe) messages through queues. Interoperating applications do not know the location or internal structure of each other. They only see interfaces, i.e. the messages published and subscribed through the broker.
The Stompl Client library implements
a Stomp client using abstractions like
Connection
, Transaction
and
queues in terms of Reader
and Writer
.
- withConnection :: String -> Int -> [Copt] -> [Header] -> (Con -> IO a) -> IO a
- data Con
- type Heart = (Int, Int)
- data Copt
- = OWaitBroker Int
- | OMaxRecv Int
- | OHeartBeat Heart
- | OAuth String String
- | OClientId String
- | OStomp
- data Reader a
- data Writer a
- newReader :: Con -> String -> String -> [Qopt] -> [Header] -> InBound a -> IO (Reader a)
- newWriter :: Con -> String -> String -> [Qopt] -> [Header] -> OutBound a -> IO (Writer a)
- withReader :: Con -> String -> String -> [Qopt] -> [Header] -> InBound i -> (Reader i -> IO r) -> IO r
- withWriter :: Con -> String -> String -> [Qopt] -> [Header] -> OutBound o -> (Writer o -> IO r) -> IO r
- withPair :: Con -> String -> ReaderDesc i -> WriterDesc o -> ((Reader i, Writer o) -> IO r) -> IO r
- type ReaderDesc i = (String, [Qopt], [Header], InBound i)
- type WriterDesc o = (String, [Qopt], [Header], OutBound o)
- data Qopt
- = OWithReceipt
- | OWaitReceipt
- | OMode AckMode
- | OAck
- | OForceTx
- data AckMode
- = Auto
- | Client
- | ClientIndi
- type InBound a = Type -> Int -> [Header] -> ByteString -> IO a
- type OutBound a = a -> IO ByteString
- readQ :: Reader a -> IO (Message a)
- writeQ :: Writer a -> Type -> [Header] -> a -> IO ()
- writeQWith :: Writer a -> Type -> [Header] -> a -> IO Receipt
- writeAdHoc :: Writer a -> String -> Type -> [Header] -> a -> IO ()
- writeAdHocWith :: Writer a -> String -> Type -> [Header] -> a -> IO Receipt
- data Message a
- msgContent :: Message a -> a
- msgRaw :: Message a -> ByteString
- msgType :: Message a -> Type
- msgLen :: Message a -> Int
- msgHdrs :: Message a -> [Header]
- data Rec
- type Receipt = Rec
- waitReceipt :: Con -> Receipt -> IO ()
- data Tx
- withTransaction :: Con -> [Topt] -> (Tx -> IO a) -> IO a
- data Topt
- = OTimeout Int
- | OWithReceipts
- | OAbortMissingAcks
- abort :: String -> IO ()
- ack :: Con -> Message a -> IO ()
- ackWith :: Con -> Message a -> IO ()
- nack :: Con -> Message a -> IO ()
- nackWith :: Con -> Message a -> IO ()
- module Network.Mom.Stompl.Client.Exception
Connections
The Stomp protocol is connection-oriented and usually implemented on top of TCP/IP. The client initialises the connection by sending a connect message which is answered by the broker by confirming or rejecting the connection. The connection is authenticated by user and passcode. The authentication mechanism, however, varies among brokers.
During the connection phase, the protocol version and a heartbeat that defines the frequency of alive messages exchanged between broker and client are negotiated.
The connection remains active until either the client disconnects voluntarily or the broker disconnects in consequence of a protocol error.
The details of the connection, including protocol version and heartbeats are handled internally by the Stompl Client library.
withConnection :: String -> Int -> [Copt] -> [Header] -> (Con -> IO a) -> IO a
Initialises a connection and executes an IO
action.
The connection life time is the scope of this action.
The connection handle, Con
, that is passed to the action
should not be returned from withConnection
.
Connections, however, can be shared among threads.
In this case, the programmer has to take care
not to terminate the action before all other threads
working on the connection have finished.
Since Connection
is a heavy data type,
you should try to reduce the number of connections
to the same broker within the same process -
there is ideally only one connection per broker
in one process.
Paramter:
-
String
: The broker's hostname or IP-address -
Int
: The broker's port -
Copt
: Control options passed to the connection (including user/password) -
Header
: List of additional, broker-specific headers - (
Con
->IO
a): The action to execute. The action receives the connection handle and returns a value of type a in theIO
monad.
withConnection
returns the result of the action passed into it.
withConnection
will always disconnect from the broker
when the action has terminated, even if an exception is raised.
Example:
withConnection "localhost" 61613 [] [] $ \c -> do
This would connect to a broker listening to the loopback interface, port number 61613. The action is defined after the hanging do.
Internally, connections use concurrent threads;
errors are communicated by throwing exceptions
to the owner of the connection, where
the owner is the thread that created the connection
by calling withConnection
.
It is therefore advisable to start different connections
in different threads, so that each thread will receive
only exceptions related to the connection it has opened.
Example:
t <- forkIO $ withConnection "127.0.0.1" 61613 [] [] $ \c -> do
data Con
Opaque Connection handle. Only valid within the action passed to withConnection.
type Heart = (Int, Int)
Heart-beat configuration;
the first Int
of the pair represents the frequency
in which the sender wants to send heart-beats;
the second represents the highest frequency
in which the sender can accept heart-beats.
The frequency is expressed as
the period in milliseconds between two heart-beats.
For details on negotiating heart-beats,
please refer to the Stomp specification.
data Copt
Options passed to a connection
OWaitBroker Int | Tells the connection to wait n milliseconds for the |
OMaxRecv Int | The maximum size of TCP/IP packets. Indirectly, this options also defines the maximum message size which is 10 * maxReceive. By default, the maximum packet size is 1024 bytes. |
OHeartBeat Heart | This option defines the client's bid
for negotiating heart beats (see |
OAuth String String | Authentication: user and password |
OClientId String | Identification: specifies the JMS Client ID for persistant connections |
OStomp | With this option set, connect will use a STOMP frame instead of a CONNECT frame |
Queues
Stomp program interoperability is based on queues. Queues are communication channels of arbitrary size that may be written by any client currently connected to the broker. Messages in the queue are stored in FIFO order. The process of adding messages to the queue is called send. In order to read from a queue, a client has to subscribe to it. After having subscribed to a queue, a client will receive all message sent to it. Brokers may implement additional selection criteria by means of selectors that are expressed in some query language, such as SQL or XPath.
There are two different flavours of queues distinguished by their communication pattern: Queues are either one-to-one channels, this is, a message published in this queue is sent to exactly one subscriber and then removed from it; or queues may be one-to-many, i.e., a message published in this queue is sent to all current subscribers of the queue. This type of queues is sometimes called topic. Which pattern is supported and how patterns are controlled, depends on the broker.
From the perspective of the Stomp protocol, the content of messages in a queue has no format. The Protocol describes only those aspects of messages that are related to their handling; this can be seen as a syntactic level of interoperability. Introducing meaning to message contents is entirely left to applications. Message- or service-oriented frameworks, usually, define formats and encodings to describe messages and higher-level communication patterns built on top of them, to add more syntactic formalism or to raise interoperability to a semantic or even pragmatic level.
The Stompl library stresses the importance
of adding meaning to the message content
by adding types to queues.
From the perspective of the client Haskell program,
a queue is a communication channel
that allows sending and receiving messages of a given type.
This adds type-safety to Stompl queues, which,
otherwise, would just return plain bytes.
It is, on the other hand, always possible
to ignore this feature by declaring queues
as '()' or ByteString
.
In the first case, the raw bytestring
may be read from the Message
;
in the second case, the contents of the Message
will be a ByteString
.
In the Stompl library, queues
are unidirectional communication channels
either for reading or writing.
This is captured by implementing queues
with two different data types,
a Reader
and a Writer
.
On creating a queue a set of parameters can be defined
to control the behaviour of the queue.
newReader :: Con -> String -> String -> [Qopt] -> [Header] -> InBound a -> IO (Reader a)
Creates a Reader
with the life time of the connection Con
.
Creating a receiving queue involves interaction with the broker;
this may result in preempting the calling thread,
depending on the options [Qopt
].
Parameters:
- The connection handle
Con
- A queue name that should be unique in your application. The queue name is useful for debugging, since it appears in error messages.
- The Stomp destination, i.e. the name of the queue as it is known to the broker and other applications.
- A list of options (
Qopt
). - A list of headers (
Header
), which will be passed to the broker. theHeader
parameter is actually a breach in the abstraction from the Stomp protocol. A header may be, for instance, a selector that restricts the subscription to this queue, such that only messages with certain attributes (i.e. specific headers) are sent to the subscribing client. Selectors are broker-specific and typically expressed as SQL or XPath. - An in-bound converter.
A usage example to create a Reader
with Connection
c and the in-bound converter
iconv would be:
q <- newReader c "TestQ" "/queue/test" [] [] iconv
A call to newReader
may result in preemption when
one of the options OWaitReceipt
or OWithReceipt
are given;
an example for such a call
with tmo an Int
value representing a timeout
in microseconds and
the result mbQ of type Maybe
is:
mbQ <- timeout tmo $ newReader c "TestQ" "/queue/test" [OWaitReceipt] [] oconv case mbQ of Nothing -> -- handle error Just q -> do -- ...
newWriter :: Con -> String -> String -> [Qopt] -> [Header] -> OutBound a -> IO (Writer a)
Creates a Writer
with the life time of the connection Con
.
Creating a sending queue does not involve interaction with the broker
and will not preempt the calling thread.
A sending queue may be created like in the following code fragment, where oconv is an already defined out-bound converter:
q <- newWriter c "TestQ" "/queue/test" [] [] oconv
withReader :: Con -> String -> String -> [Qopt] -> [Header] -> InBound i -> (Reader i -> IO r) -> IO r
Creates a Reader
with limited life time.
The queue will live only in the scope of the action
that is passed as last parameter.
The function is useful for readers
that are used only temporarly, e.g. during initialisation.
When the action terminates, the client unsubscribes from
the broker queue - even if an exception is raised.
withReader
returns the result of the action.
Since the life time of the queue is limited to the action,
it should not be returned.
Any operation on a reader created by withReader
outside the action will raise QueueException
.
A usage example is:
x <- withReader c "TestQ" "/queue/test" [] [] iconv $ \q -> do
withWriter :: Con -> String -> String -> [Qopt] -> [Header] -> OutBound o -> (Writer o -> IO r) -> IO r
Creates a Writer
with limited life time.
The queue will live only in the scope of the action
that is passed as last parameter.
The function is useful for writers
that are used only temporarly, e.g. during initialisation.
withWriter
returns the result of the action.
Since the life time of the queue is limited to the action,
it should not be returned.
Any operation on a writer created by withWriter
outside the action will raise a QueueException
.
withPair :: Con -> String -> ReaderDesc i -> WriterDesc o -> ((Reader i, Writer o) -> IO r) -> IO r
Creates a pair of (Reader
i, Writer
o) with limited lifetime.
The pair will live only in the scope of the action
that is passed as last parameter.
The function is useful for readers/writers
used in combination, e.g. to emulate a client/server
kind of communication.
withPair
returns the result of the action passed in.
The parameters are:
- The connection handle
Con
- The name of the pair; the reader will be identified by a string with "_r" added to this name, the writer by a string with "_w" added to this name.
- The description of the
Reader
,ReaderDesc
- The description of the
Writer
,WriterDesc
- The application-defined action
The reason for introducing the reader and writer description
is to provide error detection at compile time:
It is this way much more difficult to accidently confuse
the writer's and the reader's parameters (e.g.
passing the writer's Qopt
s to the reader).
type ReaderDesc i = (String, [Qopt], [Header], InBound i)
type WriterDesc o = (String, [Qopt], [Header], OutBound o)
data Qopt
OWithReceipt | A queue created with |
OWaitReceipt | A queue created with On writing a message, this is not always the preferred
method. You may want to fire and forget - and check
for the confirmation of the receipt only later.
In this case, you will create the
It is good practice to use timeout with all calls
that may wait for receipts,
ie |
OMode AckMode | The option defines the If For more details, see |
OAck | Expression often used by René Artois.
Furthermore, if |
OForceTx | A queue created with |
data AckMode
Auto | A successfully sent message is automatically considered ack'd |
Client | The client is expected to explicitly confirm the receipt
of a message by sending an |
ClientIndi | Non-cumulative ack:
The client is expected to explicitly confirm the receipt
of a message by sending an |
type InBound a = Type -> Int -> [Header] -> ByteString -> IO a
Converters are user-defined actions passed to
newReader
(InBound
) and
newWriter
(OutBound
)
that convert a ByteString
to a value of type a (InBound
) or
a value of type a to ByteString
(OutBound
).
Converters are, hence, similar to put and get in the Binary
monad.
The reason for using explicit, user-defined converters
instead of Binary encode and decode
is that the conversion with queues
may be much more complex, involving reading configurations
or other IO
actions.
Furthermore, we have to distinguish between data types and
there binary encoding when sent over the network.
This distinction is made by MIME types.
Two applications may send the same data type,
but one encodes this type as "text/plain",
the other as "text/xml".
InBound
conversions have to consider the MIME type
and, hence, need more input parameters than provided by decode.
encode and decode, however,
can be used internally by user-defined converters.
The parameters expected by an InBound
converter are:
- the MIME type of the content
- the content size
- the list of
Header
coming with the message - the contents encoded as
ByteString
.
The simplest possible in-bound converter for plain strings may be created like this:
let iconv _ _ _ = return . toString
type OutBound a = a -> IO ByteString
Out-bound converters are much simpler.
Since the application developer knows,
which encoding to use, the MIME type is not needed.
The converter receives only the value of type a
and converts it into a ByteString
.
A simple example to create an out-bound converter
for plain strings could be:
let oconv = return . fromString
readQ :: Reader a -> IO (Message a)
Removes the oldest message from the queue
and returns it as Message
.
The message cannot be read from the queue
by another call to readQ
within the same connection.
Wether other connections will receive the message as well
depends on the broker and the queue patterns it implements.
If the queue is currently empty,
the thread will preempt until a message arrives.
If the queue was created with
OMode
other than Auto
and with OAck
, then an ack
will be automatically sent to the broker;
if OAck
was not set,
the message will be registered as pending ack.
Note that, when readQ
sends an ack internally,
it will not request a receipt from the broker.
The rationale for this design is simplicity.
If the function expected a receipt,
it would have to either wait for the receipt
or return it.
In the first case, it would be difficult
for the programmer to distinguish, on a timeout, between
no message available and
no receipt arrived.
In the second case, the receipt
would need to be returned.
This would unnecessarily blow up the interface.
If you need the reliability of receipts,
you should create the queue without OAck
and use ackWith
to acknowledge
the message explicitly.
writeQ :: Writer a -> Type -> [Header] -> a -> IO ()
Adds the value a as message at the end of the queue. The Mime type as well as the headers are added to the message.
If the queue was created with the option
OWithReceipt
,
writeQ
will request a receipt from the broker.
If the queue was additionally created with
OWaitReceipt
,
writeQ
will preempt until the receipt is confirmed.
The Stomp headers are useful for brokers
that provide selectors on subscribe,
see newReader
for details.
A usage example for a Writer
q of type String
may be (nullType is defined as text/plain in Codec.MIME):
writeQ q nullType [] "hello world!"
For a Writer
that was created
with OWithReceipt
and OWaitReceipt
,
the function should be called with timeout:
mbR <- timeout tmo $ writeQ q nullType [] "hello world!" case mbR of Nothing -> -- error handling Just r -> do -- ...
writeQWith :: Writer a -> Type -> [Header] -> a -> IO Receipt
This is a variant of writeQ
that is particularly useful for queues
created with OWithReceipt
, but without OWaitReceipt
.
It returns the Receipt
, so that it can be waited for
later, using waitReceipt
.
Note that the behaviour of writeQWith
,
besides of returning the receipt, is the same as writeQ
,
i.e., on a queue with OWithReceipt
and OWaitReceipt
writeQWith
will wait for the receipt being confirmed.
In this case, the returned receipt is, in fact,
of no further use for the application.
The function is used like:
r <- writeQWith q nullType [] "hello world!"
writeAdHoc :: Writer a -> String -> Type -> [Header] -> a -> IO ()
This is a variant of writeQ
that overwrites the destination queue defined in the writer queue.
It can be used for ad hoc communication and
for emulations of client/server-like protocols:
the client would pass the name of the queue
where it expects the server response in a header;
the server would send the resply to the queue
indicated in the header using writeAdHoc
.
The additional String
parameter contains the destination.
writeAdHocWith :: Writer a -> String -> Type -> [Header] -> a -> IO Receipt
This is a variant of writeAdHoc
that is particularly useful for queues
created with OWithReceipt
, but without OWaitReceipt
.
It returns the Receipt
, so that it can be waited for
later, using waitReceipt
.
Please refer to writeQWith
for more details.
Messages
data Message a
Any content received from a queue is wrapped in a message. It is, in particular, the return value of readQ.
msgContent :: Message a -> a
Returns the content of the message in the format produced by an in-bound converter
Receipts
Receipts are identifiers unique during the life time of an application; receipts can be added to all kinds of messages sent to the broker. The broker, in its turn, uses receipts to acknowledge received messages. Receipts, hence, are useful to make a session more reliable. When the broker has confirmed the receipt of a frame sent to it, the client application can be sure that it has arrived. What kind of additional guarantees are made, e.g. that the frame is saved to disk or has already been sent to the subscriber(s), depends on the broker.
Receipts are handled internally by the library.
The application, however, decides where receipts should
be requested, i.e. on subcribing to a queue,
on sending a message, on sending acks and on
starting and ending transactions.
On sending messages,
receipt handling can be made explict.
The function writeQWith
requests a receipt to the message
and returns it to the caller.
The application can then, later,
explicitly wait for the receipt, using waitReceipt
.
Otherwise, receipt handling remains
inivisible in the application code.
data Rec
This is a receipt.
Rec Int | A valid receipt |
NoRec | No receipt was sent with this interaction.
Receiving a |
waitReceipt :: Con -> Receipt -> IO ()
Waits for the Receipt
to be confirmed by the broker.
Since the thread will preempt, the call should be protected
with timeout, e.g.:
mb_ <- timeout tmo $ waitReceipt c r case mb_ of Nothing -> -- error handling Just _ -> do -- ...
Transactions
Transactions are units of interactions with a Stomp broker, including sending messages to queues and acknowledging the receipt of messages. All messages sent during a transaction are buffered in the broker. Only when the application terminates the transaction with commit the messages will be eventually processed. If an error occurs during the transaction, it can be aborted by the client. Transactions, in consequence, can be used to ensure atomicity, i.e. either all single steps are performed or no step is performed.
In the Stompl Client library, transactions are sequences of Stompl actions, queue operations as well as nested transactions, that are committed at the end or aborted, whenever an error condition becomes true. Error conditions are uncaught exceptions and conditions defined by options passed to the transaction, for example that all receipts requested during the transaction, have been confirmed by the broker.
To enforce atomicity, threads are not allowed to share transactions.
withTransaction :: Con -> [Topt] -> (Tx -> IO a) -> IO a
Starts a transaction and executes the action
in the last parameter.
After the action has finished,
the transaction will be either committed or aborted
even if an exception has been raised.
Note that, depending on the options,
the way a transaction is terminated may vary,
refer to Topt
for details.
Transactions cannot be shared among threads. Transactions are internally protected against access from any thread but the one that has actually started the transaction.
It is not advisable to use withTransaction
with timeout.
It is preferred to use timeout on the
the actions executed within this transaction.
Whether and how much time the transaction itself
shall wait for the completion of on-going interactions with the broker,
in particular pending receipts,
shall be controlled
by the OTimeout
option.
withTransaction
returns the result of the action.
The simplest usage example with a Connection
c is:
r <- withTransaction c [] $ \_ -> do
If the transaction shall use receipts and, before terminating, wait 100ms
for all receipts to be confirmed by the broker
withTransaction
is called like:
eiR <- try $ withTransaction c [OTimeout 100, OWithReceipts] \_ -> do case eiR of Left e -> -- error handling Right x -> do -- ..
Note that try
is used to catch any StomplException
.
data Topt
Options passed to a transaction.
OTimeout Int | The timeout in milliseconds (not microseconds!)
to wait for pending receipts.
If receipts are pending, when the transaction
is ready to terminate,
and no timeout or a timeout <= 0 is given,
and the option |
OWithReceipts | This option has two effects: 1) Internal interactions of the transaction with the broker will request receipts; 2) before ending the transaction, the library will check for receipts that have not yet been confirmed by the broker (including receipts requested by user calls such as writeQ or ackWith). If receipts are pending, when the transaction
is ready to terminate and |
OAbortMissingAcks | If a message has been received from a
queue with |
abort :: String -> IO ()
Aborts the transaction immediately by raising AppException
.
The string passed in to abort
will be added to the
exception message.
Acknowledgements
Acknowledgements are used by the client to confirm the receipt of a message. The Stomp protocol foresees three different acknowledgement modes, defined when the client subscribes to a queues. A subscription may use auto mode, i.e. a message is considered acknowledged when it has been sent to the subscriber; client mode, i.e. a message is considered acknowledged only when an ack message has been sent back from the client. Note that client mode is cumulative, that means, the broker will consider all messages acknowledged that have been sent from the previous ack up to the acknowledged message; or client-individual mode, i.e. non-cumulative client mode.
A message may also be negatively acknowledged (nack). How the broker handles a nack, however, is not further specified by the Stomp protocol.
ackWith :: Con -> Message a -> IO ()
Acknowledges the arrival of Message
to the broker,
requests a receipt and waits until it is confirmed.
Since it preempts the calling thread,
it is usually used with timeout,
for a Connection
c, a Message
x
and a timeout in microseconds tmo like:
mbR <- timeout tmo $ ackWith c x case mbR of Nothing -> -- error handling Just _ -> do -- ...
Exceptions
Complete Example
import Network.Mom.Stompl.Client.Queue import System.Environment (getArgs) import Network.Socket (withSocketsDo) import Control.Monad (forever) import Control.Concurrent (threadDelay) import qualified Data.ByteString.UTF8 as U import Data.Char(toUpper) import Codec.MIME.Type (nullType) main :: IO () main = do os <- getArgs case os of [q] -> withSocketsDo $ ping q _ -> putStrLn "I need a queue name!" -- error handling... data Ping = Ping | Pong deriving (Show) strToPing :: String -> IO Ping strToPing s = case map toUpper s of "PING" -> return Ping "PONG" -> return Pong _ -> convertError $ "Not a Ping: '" ++ s ++ "'" ping :: String -> IO () ping qn = withConnection "localhost" 61613 [] [] $ \c -> do let iconv _ _ _ = strToPing . U.toString let oconv = return . U.fromString . show inQ <- newReader c "Q-IN" qn [] [] iconv outQ <- newWriter c "Q-OUT" qn [] [] oconv writeQ outQ nullType [] Pong listen inQ outQ listen :: Reader Ping -> Writer Ping -> IO () listen iQ oQ = forever $ do eiM <- try $ readQ iQ case eiM of Left e -> do putStrLn $ "Error: " ++ show e -- error handling ... Right m -> do let p = case msgContent m of Ping -> Pong Pong -> Ping putStrLn $ show p writeQ oQ nullType [] p threadDelay 10000