stomp-queue-0.1.2: Stompl Client Library

Safe HaskellNone




The Stomp Protocol specifies message-oriented interoperability. Applications connect to a message broker to send (publish) or receive (subscribe) messages through queues. Interoperating applications do not know the location or internal structure of each other. They only see interfaces, i.e. the messages published and subscribed through the broker.

The Stompl Client library implements a Stomp client using abstractions like Connection, Transaction and queues in terms of Reader and Writer.



The Stomp protocol is connection-oriented and usually implemented on top of TCP/IP. The client initialises the connection by sending a connect message which is answered by the broker by confirming or rejecting the connection. The connection is authenticated by user and passcode. The authentication mechanism, however, varies among brokers.

During the connection phase, the protocol version and a heartbeat that defines the frequency of alive messages exchanged between broker and client are negotiated.

The connection remains active until either the client disconnects voluntarily or the broker disconnects in consequence of a protocol error.

The details of the connection, including protocol version and heartbeats are handled internally by the Stompl Client library.

withConnection :: String -> Int -> [Copt] -> [Header] -> (Con -> IO a) -> IO a

Initialises a connection and executes an IO action. The connection life time is the scope of this action. The connection handle, Con, that is passed to the action should not be returned from withConnection. Connections, however, can be shared among threads. In this case, the programmer has to take care not to terminate the action before all other threads working on the connection have finished.

Since Connection is a heavy data type, you should try to reduce the number of connections to the same broker within the same process - there is ideally only one connection per broker in one process.


  • String: The broker's hostname or IP-address
  • Int: The broker's port
  • Copt: Control options passed to the connection (including user/password)
  • Header: List of additional, broker-specific headers
  • (Con -> IO a): The action to execute. The action receives the connection handle and returns a value of type a in the IO monad.

withConnection returns the result of the action passed into it.

withConnection will always disconnect from the broker when the action has terminated, even if an exception is raised.


 withConnection "localhost" 61613 [] [] $ \c -> do

This would connect to a broker listening to the loopback interface, port number 61613. The action is defined after the hanging do.

Internally, connections use concurrent threads; errors are communicated by throwing exceptions to the owner of the connection, where the owner is the thread that created the connection by calling withConnection. It is therefore advisable to start different connections in different threads, so that each thread will receive only exceptions related to the connection it has opened.


 t <- forkIO $ withConnection "" 61613 [] [] $ \c -> do

data Con

Opaque Connection handle. Only valid within the action passed to withConnection.


Eq Con 
Show Con 

type Heart = (Int, Int)

data Copt

Options passed to a connection


OWaitBroker Int

Tells the connection to wait n milliseconds for the Receipt sent with Disconnect at the end of the session. The Stomp protocol advises to request a receipt and to wait for it before actually closing the socket. Many brokers, however, do not implement this feature (or implement it inappropriately, closing the connection immediately after having sent the receipt). withConnection, for this reason, ignores the receipt by default and simply closes the socket after having sent the Disconnect frame. If your broker shows a correct behaviour, it is advisable to use this option.

OMaxRecv Int

The maximum size of TCP/IP packets. Indirectly, this options also defines the maximum message size which is 10 * maxReceive. By default, the maximum packet size is 1024 bytes.

OHeartBeat Heart

This option defines the client's bid for negotiating heart beats (see HeartBeat). By default, no heart beats are sent or accepted

OAuth String String

Authentication: user and password

OClientId String

Identification: specifies the JMS Client ID for persistant connections


With this option set, connect will use a STOMP frame instead of a CONNECT frame


Eq Copt 
Show Copt 


Stomp program interoperability is based on queues. Queues are communication channels of arbitrary size that may be written by any client currently connected to the broker. Messages in the queue are stored in FIFO order. The process of adding messages to the queue is called send. In order to read from a queue, a client has to subscribe to it. After having subscribed to a queue, a client will receive all message sent to it. Brokers may implement additional selection criteria by means of selectors that are expressed in some query language, such as SQL or XPath.

There are two different flavours of queues distinguished by their communication pattern: Queues are either one-to-one channels, this is, a message published in this queue is sent to exactly one subscriber and then removed from it; or queues may be one-to-many, i.e., a message published in this queue is sent to all current subscribers of the queue. This type of queues is sometimes called topic. Which pattern is supported and how patterns are controlled, depends on the broker.

From the perspective of the Stomp protocol, the content of messages in a queue has no format. The Protocol describes only those aspects of messages that are related to their handling; this can be seen as a syntactic level of interoperability. Introducing meaning to message contents is entirely left to applications. Message- or service-oriented frameworks, usually, define formats and encodings to describe messages and higher-level communication patterns built on top of them, to add more syntactic formalism or to raise interoperability to a semantic or even pragmatic level.

The Stompl library stresses the importance of adding meaning to the message content by adding types to queues. From the perspective of the client Haskell program, a queue is a communication channel that allows sending and receiving messages of a given type. This adds type-safety to Stompl queues, which, otherwise, would just return plain bytes. It is, on the other hand, always possible to ignore this feature by declaring queues as '()' or ByteString. In the first case, the raw bytestring may be read from the Message; in the second case, the contents of the Message will be a ByteString.

In the Stompl library, queues are unidirectional communication channels either for reading or writing. This is captured by implementing queues with two different data types, a Reader and a Writer. On creating a queue a set of parameters can be defined to control the behaviour of the queue.

data Reader a

A Queue for receiving messages


Eq (Reader a) 

data Writer a

A Queue for sending messages.


Eq (Writer a) 

newReader :: Con -> String -> String -> [Qopt] -> [Header] -> InBound a -> IO (Reader a)

Creates a Reader with the life time of the connection Con. Creating a receiving queue involves interaction with the broker; this may result in preempting the calling thread, depending on the options [Qopt].


  • The connection handle Con
  • A queue name that should be unique in your application. The queue name is useful for debugging, since it appears in error messages.
  • The Stomp destination, i.e. the name of the queue as it is known to the broker and other applications.
  • A list of options (Qopt).
  • A list of headers (Header), which will be passed to the broker. the Header parameter is actually a breach in the abstraction from the Stomp protocol. A header may be, for instance, a selector that restricts the subscription to this queue, such that only messages with certain attributes (i.e. specific headers) are sent to the subscribing client. Selectors are broker-specific and typically expressed as SQL or XPath.
  • An in-bound converter.

A usage example to create a Reader with Connection c and the in-bound converter iconv would be:

 q <- newReader c "TestQ" "/queue/test" [] [] iconv

A call to newReader may result in preemption when one of the options OWaitReceipt or OWithReceipt are given; an example for such a call with tmo an Int value representing a timeout in microseconds and the result mbQ of type Maybe is:

 mbQ <- timeout tmo $ newReader c "TestQ" "/queue/test" [OWaitReceipt] [] oconv
 case mbQ of
   Nothing -> -- handle error
   Just q  -> do -- ...

newWriter :: Con -> String -> String -> [Qopt] -> [Header] -> OutBound a -> IO (Writer a)

Creates a Writer with the life time of the connection Con. Creating a sending queue does not involve interaction with the broker and will not preempt the calling thread.

A sending queue may be created like in the following code fragment, where oconv is an already defined out-bound converter:

 q <- newWriter c "TestQ" "/queue/test" [] [] oconv

withReader :: Con -> String -> String -> [Qopt] -> [Header] -> InBound i -> (Reader i -> IO r) -> IO r

Creates a Reader with limited life time. The queue will live only in the scope of the action that is passed as last parameter. The function is useful for readers that are used only temporarly, e.g. during initialisation. When the action terminates, the client unsubscribes from the broker queue - even if an exception is raised.

withReader returns the result of the action. Since the life time of the queue is limited to the action, it should not be returned. Any operation on a reader created by withReader outside the action will raise QueueException.

A usage example is:

 x <- withReader c "TestQ" "/queue/test" [] [] iconv $ \q -> do

withWriter :: Con -> String -> String -> [Qopt] -> [Header] -> OutBound o -> (Writer o -> IO r) -> IO r

Creates a Writer with limited life time. The queue will live only in the scope of the action that is passed as last parameter. The function is useful for writers that are used only temporarly, e.g. during initialisation.

withWriter returns the result of the action. Since the life time of the queue is limited to the action, it should not be returned. Any operation on a writer created by withWriter outside the action will raise a QueueException.

withPair :: Con -> String -> ReaderDesc i -> WriterDesc o -> ((Reader i, Writer o) -> IO r) -> IO r

Creates a pair of (Reader i, Writer o) with limited lifetime. The pair will live only in the scope of the action that is passed as last parameter. The function is useful for readers/writers used in combination, e.g. to emulate a client/server kind of communication.

withPair returns the result of the action passed in.

The parameters are:

  • The connection handle Con
  • The name of the pair; the reader will be identified by a string with "_r" added to this name, the writer by a string with "_w" added to this name.
  • The description of the Reader, ReaderDesc
  • The description of the Writer, WriterDesc
  • The application-defined action

The reason for introducing the reader and writer description is to provide error detection at compile time: It is this way much more difficult to accidently confuse the writer's and the reader's parameters (e.g. passing the writer's Qopts to the reader).

type ReaderDesc i = (String, [Qopt], [Header], InBound i)

The Reader parameters of withPair:

  • The reader's queue name
  • The reader's Qopts
  • The reader's Headers
  • The reader's (inbound) converter

type WriterDesc o = (String, [Qopt], [Header], OutBound o)

The Writer parameters of withPair

  • The writer's queue name
  • The writer's Qopts
  • The writer's Headers
  • The writer's (outbound) converter

data Qopt

Options that may be passed to newReader and newWriter and their variants.



A queue created with OWithReceipt will request a receipt on all interactions with the broker. The handling of receipts is usually transparent to applications, but, in the case of sending message, may be made visible by using writeQWith instead of writeQ. writeQWith return the receipt identifier and the application can later invoke waitReceipt to wait for the broker confirming this receipt. Note that a Reader created with OWithReceipt will issue a request for receipt when subscribing to a Stomp queue.


A queue created with OWaitReceipt will wait for the receipt before returning from a call that has issued a request for receipt. This implies that the current thread will yield the processor. writeQ will internally create a request for receipt and wait for the broker to confirm the receipt before returning. Note that, for newReader, there is no difference between OWaitReceipt and OWithReceipt. Either option will cause the thread to preempt until the receipt is confirmed.

On writing a message, this is not always the preferred method. You may want to fire and forget - and check for the confirmation of the receipt only later. In this case, you will create the Writer with OWithReceipt only and, later, after having sent a message with writeQWith, wait for the receipt using waitReceipt. Note that OWaitReceipt without OWithReceipt has no meaning with writeQ and writeQWith. If you want to request a receipt with a message and wait for the broker to confirm it, you have to use both options.

It is good practice to use timeout with all calls that may wait for receipts, ie newReader and withReader with options OWithReceipt or OWaitReceipt, or writeQ and writeQWith with options OWaitReceipt, or ackWith and nackWith.

OMode AckMode

The option defines the AckMode of the queue, which is relevant for Reader only. AckMode is one of: Auto, Client, ClientIndi.

If OMode is not given, Auto is assumed as default.

For more details, see AckMode.


Expression often used by René Artois. Furthermore, if OMode is either Client or ClientIndi, then this option forces readQ to send an acknowledgement automatically when a message has been read from the queue.


A queue created with OForceTx will throw QueueException when used outside a Transaction.


Eq Qopt 
Read Qopt 
Show Qopt 

data AckMode




Eq AckMode 
Read AckMode 
Show AckMode 

type InBound a = Type -> Int -> [Header] -> ByteString -> IO a

Converters are user-defined actions passed to newReader (InBound) and newWriter (OutBound) that convert a ByteString to a value of type a (InBound) or a value of type a to ByteString (OutBound). Converters are, hence, similar to put and get in the Binary monad.

The reason for using explicit, user-defined converters instead of Binary encode and decode is that the conversion with queues may be much more complex, involving reading configurations or other IO actions. Furthermore, we have to distinguish between data types and there binary encoding when sent over the network. This distinction is made by MIME types. Two applications may send the same data type, but one encodes this type as "text/plain", the other as "text/xml". InBound conversions have to consider the MIME type and, hence, need more input parameters than provided by decode. encode and decode, however, can be used internally by user-defined converters.

The parameters expected by an InBound converter are:

  • the MIME type of the content
  • the content size
  • the list of Header coming with the message
  • the contents encoded as ByteString.

The simplest possible in-bound converter for plain strings may be created like this:

 let iconv _ _ _ = return . toString

type OutBound a = a -> IO ByteString

Out-bound converters are much simpler. Since the application developer knows, which encoding to use, the MIME type is not needed. The converter receives only the value of type a and converts it into a ByteString. A simple example to create an out-bound converter for plain strings could be:

 let oconv = return . fromString

readQ :: Reader a -> IO (Message a)

Removes the oldest message from the queue and returns it as Message. The message cannot be read from the queue by another call to readQ within the same connection. Wether other connections will receive the message as well depends on the broker and the queue patterns it implements. If the queue is currently empty, the thread will preempt until a message arrives.

If the queue was created with OMode other than Auto and with OAck, then an ack will be automatically sent to the broker; if OAck was not set, the message will be registered as pending ack.

Note that, when readQ sends an ack internally, it will not request a receipt from the broker. The rationale for this design is simplicity. If the function expected a receipt, it would have to either wait for the receipt or return it. In the first case, it would be difficult for the programmer to distinguish, on a timeout, between no message available and no receipt arrived. In the second case, the receipt would need to be returned. This would unnecessarily blow up the interface. If you need the reliability of receipts, you should create the queue without OAck and use ackWith to acknowledge the message explicitly.

writeQ :: Writer a -> Type -> [Header] -> a -> IO ()

Adds the value a as message at the end of the queue. The Mime type as well as the headers are added to the message.

If the queue was created with the option OWithReceipt, writeQ will request a receipt from the broker. If the queue was additionally created with OWaitReceipt, writeQ will preempt until the receipt is confirmed.

The Stomp headers are useful for brokers that provide selectors on subscribe, see newReader for details.

A usage example for a Writer q of type String may be (nullType is defined as text/plain in Codec.MIME):

 writeQ q nullType [] "hello world!"

For a Writer that was created with OWithReceipt and OWaitReceipt, the function should be called with timeout:

 mbR <- timeout tmo $ writeQ q nullType [] "hello world!"
 case mbR of
   Nothing -> -- error handling
   Just r  -> do -- ...

writeQWith :: Writer a -> Type -> [Header] -> a -> IO Receipt

This is a variant of writeQ that is particularly useful for queues created with OWithReceipt, but without OWaitReceipt. It returns the Receipt, so that it can be waited for later, using waitReceipt.

Note that the behaviour of writeQWith, besides of returning the receipt, is the same as writeQ, i.e., on a queue with OWithReceipt and OWaitReceipt writeQWith will wait for the receipt being confirmed. In this case, the returned receipt is, in fact, of no further use for the application.

The function is used like:

 r <- writeQWith q nullType [] "hello world!"

writeAdHoc :: Writer a -> String -> Type -> [Header] -> a -> IO ()

This is a variant of writeQ that overwrites the destination queue defined in the writer queue. It can be used for ad hoc communication and for emulations of client/server-like protocols: the client would pass the name of the queue where it expects the server response in a header; the server would send the resply to the queue indicated in the header using writeAdHoc. The additional String parameter contains the destination.

writeAdHocWith :: Writer a -> String -> Type -> [Header] -> a -> IO Receipt

This is a variant of writeAdHoc that is particularly useful for queues created with OWithReceipt, but without OWaitReceipt. It returns the Receipt, so that it can be waited for later, using waitReceipt. Please refer to writeQWith for more details.


data Message a

Any content received from a queue is wrapped in a message. It is, in particular, the return value of readQ.

msgContent :: Message a -> a

Returns the content of the message in the format produced by an in-bound converter

msgRaw :: Message a -> ByteString

The encoded content

msgType :: Message a -> Type

The MIME type of the content

msgLen :: Message a -> Int

The length of the encoded content

msgHdrs :: Message a -> [Header]

The Stomp headers that came with the message


Receipts are identifiers unique during the life time of an application; receipts can be added to all kinds of messages sent to the broker. The broker, in its turn, uses receipts to acknowledge received messages. Receipts, hence, are useful to make a session more reliable. When the broker has confirmed the receipt of a frame sent to it, the client application can be sure that it has arrived. What kind of additional guarantees are made, e.g. that the frame is saved to disk or has already been sent to the subscriber(s), depends on the broker.

Receipts are handled internally by the library. The application, however, decides where receipts should be requested, i.e. on subcribing to a queue, on sending a message, on sending acks and on starting and ending transactions. On sending messages, receipt handling can be made explict. The function writeQWith requests a receipt to the message and returns it to the caller. The application can then, later, explicitly wait for the receipt, using waitReceipt. Otherwise, receipt handling remains inivisible in the application code.

data Rec

This is a receipt.


Rec Int

A valid receipt


No receipt was sent with this interaction. Receiving a NoRec is not an error, but the result of an inconsistent - but harmless - use of writeQWith on a queue that does not send receipts. An application should, of course, not try to wait for a NoRec. It will never be confirmed.


Eq Rec 
Show Rec 

type Receipt = Rec

Just a nicer word for Rec

waitReceipt :: Con -> Receipt -> IO ()

Waits for the Receipt to be confirmed by the broker. Since the thread will preempt, the call should be protected with timeout, e.g.:

 mb_ <- timeout tmo $ waitReceipt c r
 case mb_ of
  Nothing -> -- error handling
  Just _  -> do -- ...


Transactions are units of interactions with a Stomp broker, including sending messages to queues and acknowledging the receipt of messages. All messages sent during a transaction are buffered in the broker. Only when the application terminates the transaction with commit the messages will be eventually processed. If an error occurs during the transaction, it can be aborted by the client. Transactions, in consequence, can be used to ensure atomicity, i.e. either all single steps are performed or no step is performed.

In the Stompl Client library, transactions are sequences of Stompl actions, queue operations as well as nested transactions, that are committed at the end or aborted, whenever an error condition becomes true. Error conditions are uncaught exceptions and conditions defined by options passed to the transaction, for example that all receipts requested during the transaction, have been confirmed by the broker.

To enforce atomicity, threads are not allowed to share transactions.

data Tx


Eq Tx 
Show Tx 

withTransaction :: Con -> [Topt] -> (Tx -> IO a) -> IO a

Starts a transaction and executes the action in the last parameter. After the action has finished, the transaction will be either committed or aborted even if an exception has been raised. Note that, depending on the options, the way a transaction is terminated may vary, refer to Topt for details.

Transactions cannot be shared among threads. Transactions are internally protected against access from any thread but the one that has actually started the transaction.

It is not advisable to use withTransaction with timeout. It is preferred to use timeout on the the actions executed within this transaction. Whether and how much time the transaction itself shall wait for the completion of on-going interactions with the broker, in particular pending receipts, shall be controlled by the OTimeout option.

withTransaction returns the result of the action.

The simplest usage example with a Connection c is:

 r <- withTransaction c [] $ \_ -> do

If the transaction shall use receipts and, before terminating, wait 100ms for all receipts to be confirmed by the broker withTransaction is called like:

 eiR <- try $ withTransaction c [OTimeout 100, OWithReceipts] \_ -> do
 case eiR of
   Left e  -> -- error handling
   Right x -> do -- ..

Note that try is used to catch any StomplException.

data Topt

Options passed to a transaction.


OTimeout Int

The timeout in milliseconds (not microseconds!) to wait for pending receipts. If receipts are pending, when the transaction is ready to terminate, and no timeout or a timeout <= 0 is given, and the option OWithReceipts was passed to withTransaction, the transaction will be aborted with TxException; otherwise it will wait until all pending ineractions with the broker have terminated or the timeout has expired - whatever comes first. If the timeout expires first, TxException is raised.


This option has two effects: 1) Internal interactions of the transaction with the broker will request receipts; 2) before ending the transaction, the library will check for receipts that have not yet been confirmed by the broker (including receipts requested by user calls such as writeQ or ackWith).

If receipts are pending, when the transaction is ready to terminate and OTimeout with a value > 0 is given, the transaction will wait for pending receipts; otherwise the transaction will be aborted with TxException. Note that it, usually, does not make sense to use this options without OTimeout, since it is in all probability that a receipt has not yet been confirmed when the transaction terminates.


If a message has been received from a queue with OMode option other than Auto and this message has not yet been acknowledged when the transaction is ready to terminate, the ack is missing. With this option, the transaction will not commit with missing acks, but abort and raise TxException.


Eq Topt 
Show Topt 

abort :: String -> IO ()

Aborts the transaction immediately by raising AppException. The string passed in to abort will be added to the exception message.


Acknowledgements are used by the client to confirm the receipt of a message. The Stomp protocol foresees three different acknowledgement modes, defined when the client subscribes to a queues. A subscription may use auto mode, i.e. a message is considered acknowledged when it has been sent to the subscriber; client mode, i.e. a message is considered acknowledged only when an ack message has been sent back from the client. Note that client mode is cumulative, that means, the broker will consider all messages acknowledged that have been sent from the previous ack up to the acknowledged message; or client-individual mode, i.e. non-cumulative client mode.

A message may also be negatively acknowledged (nack). How the broker handles a nack, however, is not further specified by the Stomp protocol.

ack :: Con -> Message a -> IO ()

Acknowledges the arrival of Message to the broker. It is used with a Connection c and a Message x like:

 ack c x

ackWith :: Con -> Message a -> IO ()

Acknowledges the arrival of Message to the broker, requests a receipt and waits until it is confirmed. Since it preempts the calling thread, it is usually used with timeout, for a Connection c, a Message x and a timeout in microseconds tmo like:

 mbR <- timeout tmo $ ackWith c x   
 case mbR of
   Nothing -> -- error handling
   Just _  -> do -- ...

nack :: Con -> Message a -> IO ()

Negatively acknowledges the arrival of Message to the broker. For more details see ack.

nackWith :: Con -> Message a -> IO ()

Negatively acknowledges the arrival of Message to the broker, requests a receipt and waits until it is confirmed. For more details see ackWith.


Complete Example

 import Network.Mom.Stompl.Client.Queue

 import System.Environment (getArgs)
 import Network.Socket (withSocketsDo)
 import Control.Monad (forever)
 import Control.Concurrent (threadDelay)
 import qualified Data.ByteString.UTF8  as U
 import Data.Char(toUpper)
 import Codec.MIME.Type (nullType)

 main :: IO ()
 main = do
   os <- getArgs
   case os of
     [q] -> withSocketsDo $ ping q
     _   -> putStrLn "I need a queue name!"
            -- error handling...
 data Ping = Ping | Pong
   deriving (Show)

 strToPing :: String -> IO Ping
 strToPing s = case map toUpper s of
                 "PING" -> return Ping
                 "PONG" -> return Pong
                 _      -> convertError $ "Not a Ping: '" ++ s ++ "'"

 ping :: String -> IO ()
 ping qn = 
   withConnection "localhost" 61613 [] [] $ \c -> do
     let iconv _ _ _ = strToPing . U.toString
     let oconv       = return    . U.fromString . show
     inQ  <- newReader c "Q-IN"  qn [] [] iconv
     outQ <- newWriter c "Q-OUT" qn [] [] oconv
     writeQ outQ nullType [] Pong
     listen inQ outQ

 listen  :: Reader Ping -> Writer Ping -> IO ()
 listen iQ oQ = forever $ do
   eiM <- try $ readQ iQ 
   case eiM of
     Left  e -> do
       putStrLn $ "Error: " ++ show e
       -- error handling ...
     Right m -> do
       let p = case msgContent m of
                 Ping -> Pong
                 Pong -> Ping
       putStrLn $ show p
       writeQ oQ nullType [] p
       threadDelay 10000