amqp-0.2.9: Client library for AMQP servers (currently only RabbitMQ)

Network.AMQP

Contents

Description

A client library for AMQP servers implementing the 0-8 spec; currently only supports RabbitMQ (see http://www.rabbitmq.com)

A good introduction to AMQP can be found here (though it uses Python): http://blogs.digitar.com/jjww/2009/01/rabbits-and-warrens/

Example:

Connect to a server, declare a queue and an exchange and setup a callback for messages coming in on the queue. Then publish a single message to our new exchange

import Network.AMQP
import qualified Data.ByteString.Lazy.Char8 as BL

main = do
    conn <- openConnection "127.0.0.1" "/" "guest" "guest"
    chan <- openChannel conn
    
    -- declare a queue, exchange and binding
    declareQueue chan newQueue {queueName = "myQueue"}
    declareExchange chan newExchange {exchangeName = "myExchange", exchangeType = "direct"}
    bindQueue chan "myQueue" "myExchange" "myKey"

    -- subscribe to the queue
    consumeMsgs chan "myQueue" Ack myCallback

    -- publish a message to our new exchange
    publishMsg chan "myExchange" "myKey" 
        newMsg {msgBody = (BL.pack "hello world"), 
                msgDeliveryMode = Just Persistent}

    getLine -- wait for keypress
    closeConnection conn
    putStrLn "connection closed"

    
myCallback :: (Message,Envelope) -> IO ()
myCallback (msg, env) = do
    putStrLn $ "received message: "++(BL.unpack $ msgBody msg)
    -- acknowledge receiving the message
    ackEnv env

Exception handling:

Some function calls can make the AMQP server throw an AMQP exception, which has the side-effect of closing the connection or channel. The AMQP exceptions are raised as Haskell exceptions (see AMQPException). So upon receiving an AMQPException you may have to reopen the channel or connection.

Synopsis

Connection

openConnection :: String -> String -> String -> String -> IO ConnectionSource

openConnection hostname virtualHost loginName loginPassword opens a connection to an AMQP server running on hostname. virtualHost is used as a namespace for AMQP resources (default is "/"), so different applications could use multiple virtual hosts on the same AMQP server

NOTE: If the login name, password or virtual host are invalid, this method will throw a ConnectionClosedException. The exception will not contain a reason why the connection was closed, so you'll have to find out yourself.

openConnection' :: String -> PortNumber -> String -> String -> String -> IO ConnectionSource

same as openConnection but allows you to specify a non-default port-number as the 2nd parameter

closeConnection :: Connection -> IO ()Source

closes a connection

addConnectionClosedHandler :: Connection -> Bool -> IO () -> IO ()Source

addConnectionClosedHandler conn ifClosed handler adds a handler that will be called after the connection is closed (either by calling closeConnection or by an exception). If the ifClosed parameter is True and the connection is already closed, the handler will be called immediately. If ifClosed == False and the connection is already closed, the handler will never be called

Channel

data Channel Source

A connection to an AMQP server is made up of separate channels. It is recommended to use a separate channel for each thread in your application that talks to the AMQP server (but you don't have to as channels are thread-safe)

openChannel :: Connection -> IO ChannelSource

opens a new channel on the connection

There's currently no closeChannel method, but you can always just close the connection (the maximum number of channels is 65535).

Exchanges

data ExchangeOpts Source

A record that contains the fields needed when creating a new exhange using declareExchange. The default values apply when you use newExchange.

Constructors

ExchangeOpts 

Fields

exchangeName :: String

(must be set); the name of the exchange

exchangeType :: String

(must be set); the type of the exchange ("fanout", "direct", "topic")

exchangePassive :: Bool

(default False); If set, the server will not create the exchange. The client can use this to check whether an exchange exists without modifying the server state.

exchangeDurable :: Bool

(default True); If set when creating a new exchange, the exchange will be marked as durable. Durable exchanges remain active when a server restarts. Non-durable exchanges (transient exchanges) are purged if/when a server restarts.

exchangeAutoDelete :: Bool

(default False); If set, the exchange is deleted when all queues have finished using it.

exchangeInternal :: Bool

(default False); If set, the exchange may not be used directly by publishers, but only when bound to other exchanges. Internal exchanges are used to construct wiring that is not visible to applications.

newExchange :: ExchangeOptsSource

an ExchangeOpts with defaults set; you must override at least the exchangeName and exchangeType fields.

declareExchange :: Channel -> ExchangeOpts -> IO ()Source

declares a new exchange on the AMQP server. Can be used like this: declareExchange channel newExchange {exchangeName = "myExchange", exchangeType = "fanout"}

deleteExchange :: Channel -> String -> IO ()Source

deletes the exchange with the provided name

Queues

data QueueOpts Source

A record that contains the fields needed when creating a new queue using declareQueue. The default values apply when you use newQueue.

Constructors

QueueOpts 

Fields

queueName :: String

(default ""); the name of the queue; if left empty, the server will generate a new name and return it from the declareQueue method

queuePassive :: Bool

(default False); If set, the server will not create the queue. The client can use this to check whether a queue exists without modifying the server state.

queueDurable :: Bool

(default True); If set when creating a new queue, the queue will be marked as durable. Durable queues remain active when a server restarts. Non-durable queues (transient queues) are purged if/when a server restarts. Note that durable queues do not necessarily hold persistent messages, although it does not make sense to send persistent messages to a transient queue.

queueExclusive :: Bool

(default False); Exclusive queues may only be consumed from by the current connection. Setting the exclusive flag always implies 'auto-delete'.

queueAutoDelete :: Bool

(default False); If set, the queue is deleted when all consumers have finished using it. Last consumer can be cancelled either explicitly or because its channel is closed. If there was no consumer ever on the queue, it won't be deleted.

newQueue :: QueueOptsSource

a QueueOpts with defaults set; you should override at least queueName.

declareQueue :: Channel -> QueueOpts -> IO (String, Int, Int)Source

creates a new queue on the AMQP server; can be used like this: declareQueue channel newQueue {queueName = "myQueue"}.

Returns a tuple (queueName, messageCount, consumerCount). queueName is the name of the new queue (if you don't specify a queueName the server will autogenerate one). messageCount is the number of messages in the queue, which will be zero for newly-created queues. consumerCount is the number of active consumers for the queue.

bindQueue :: Channel -> String -> String -> String -> IO ()Source

bindQueue chan queueName exchangeName routingKey binds the queue to the exchange using the provided routing key

purgeQueue :: Channel -> String -> IO Word32Source

remove all messages from the queue; returns the number of messages that were in the queue

deleteQueue :: Channel -> String -> IO Word32Source

deletes the queue; returns the number of messages that were in the queue before deletion

Messaging

data Message Source

An AMQP message

Constructors

Message 

Fields

msgBody :: ByteString

the content of your message

msgDeliveryMode :: Maybe DeliveryMode

see DeliveryMode

msgTimestamp :: Maybe Timestamp

use in any way you like; this doesn't affect the way the message is handled

msgID :: Maybe String

use in any way you like; this doesn't affect the way the message is handled

msgContentType :: Maybe String
 
msgReplyTo :: Maybe String
 
msgCorrelationID :: Maybe String
 

Instances

data DeliveryMode Source

Constructors

Persistent

the message will survive server restarts (if the queue is durable)

NonPersistent

the message may be lost after server restarts

Instances

newMsg :: MessageSource

a Msg with defaults set; you should override at least msgBody

data Envelope Source

contains meta-information of a delivered message (through getMsg or consumeMsgs)

data Ack Source

specifies whether you have to acknowledge messages that you receive from consumeMsgs or getMsg. If you use Ack, you have to call ackMsg or ackEnv after you have processed a message, otherwise it might be delivered again in the future

Constructors

Ack 
NoAck 

consumeMsgs :: Channel -> String -> Ack -> ((Message, Envelope) -> IO ()) -> IO ConsumerTagSource

consumeMsgs chan queueName ack callback subscribes to the given queue and returns a consumerTag. For any incoming message, the callback will be run. If ack == Ack you will have to acknowledge all incoming messages (see ackMsg and ackEnv)

NOTE: The callback will be run on the same thread as the channel thread (every channel spawns its own thread to listen for incoming data) so DO NOT perform any request on chan inside the callback (however, you CAN perform requests on other open channels inside the callback, though I wouldn't recommend it). Functions that can safely be called on chan are ackMsg, ackEnv, rejectMsg, recoverMsgs. If you want to perform anything more complex, it's a good idea to wrap it inside forkIO.

cancelConsumer :: Channel -> ConsumerTag -> IO ()Source

stops a consumer that was started with consumeMsgs

publishMsg :: Channel -> String -> String -> Message -> IO ()Source

publishMsg chan exchangeName routingKey msg publishes msg to the exchange with the provided exchangeName. The effect of routingKey depends on the type of the exchange

NOTE: This method may temporarily block if the AMQP server requested us to stop sending content data (using the flow control mechanism). So don't rely on this method returning immediately

getMsg :: Channel -> Ack -> String -> IO (Maybe (Message, Envelope))Source

getMsg chan ack queueName gets a message from the specified queue. If ack==Ack, you have to call ackMsg or ackEnv for any message that you get, otherwise it might be delivered again in the future (by calling recoverMsgs)

rejectMsg :: Channel -> LongLongInt -> Bool -> IO ()Source

rejectMsg chan deliveryTag requeue allows a client to reject a message. It can be used to interrupt and cancel large incoming messages, or return untreatable messages to their original queue. If requeue==False, the message will be discarded. If it is True, the server will attempt to requeue the message.

NOTE: RabbitMQ 1.7 doesn't implement this command

recoverMsgs :: Channel -> Bool -> IO ()Source

recoverMsgs chan requeue asks the broker to redeliver all messages that were received but not acknowledged on the specified channel. If requeue==False, the message will be redelivered to the original recipient. If requeue==True, the server will attempt to requeue the message, potentially then delivering it to an alternative subscriber.

ackMsg :: Channel -> LongLongInt -> Bool -> IO ()Source

ackMsg chan deliveryTag multiple acknowledges one or more messages.

if multiple==True, the deliverTag is treated as "up to and including", so that the client can acknowledge multiple messages with a single method call. If multiple==False, deliveryTag refers to a single message.

If multiple==True, and deliveryTag==0, tells the server to acknowledge all outstanding mesages.

ackEnv :: Envelope -> IO ()Source

Acknowledges a single message. This is a wrapper for ackMsg in case you have the Envelope at hand.

Transactions

txSelect :: Channel -> IO ()Source

This method sets the channel to use standard transactions. The client must use this method at least once on a channel before using the Commit or Rollback methods.

txCommit :: Channel -> IO ()Source

This method commits all messages published and acknowledged in the current transaction. A new transaction starts immediately after a commit.

txRollback :: Channel -> IO ()Source

This method abandons all messages published and acknowledged in the current transaction. A new transaction starts immediately after a rollback.

Flow Control

flow :: Channel -> Bool -> IO ()Source

flow chan active tells the AMQP server to pause or restart the flow of content data. This is a simple flow-control mechanism that a peer can use to avoid overflowing its queues or otherwise finding itself receiving more messages than it can process.

If active==True the server will start sending content data, if active==False the server will stop sending content data.

A new channel is always active by default.

NOTE: RabbitMQ 1.7 doesn't implement this command.

Exceptions

data AMQPException Source

Constructors

ChannelClosedException String

the String contains the reason why the channel was closed

ConnectionClosedException String

String may contain a reason