Safe Haskell | None |
---|
A client library for AMQP servers implementing the 0-8 spec; currently only supports RabbitMQ (see http://www.rabbitmq.com)
A good introduction to AMQP can be found here (though it uses Python): http://blogs.digitar.com/jjww/2009/01/rabbits-and-warrens/
Example:
Connect to a server, declare a queue and an exchange and setup a callback for messages coming in on the queue. Then publish a single message to our new exchange
{-# LANGUAGE OverloadedStrings #-} import Network.AMQP import qualified Data.ByteString.Lazy.Char8 as BL main = do conn <- openConnection "127.0.0.1" "/" "guest" "guest" chan <- openChannel conn -- declare a queue, exchange and binding declareQueue chan newQueue {queueName = "myQueue"} declareExchange chan newExchange {exchangeName = "myExchange", exchangeType = "direct"} bindQueue chan "myQueue" "myExchange" "myKey" -- subscribe to the queue consumeMsgs chan "myQueue" Ack myCallback -- publish a message to our new exchange publishMsg chan "myExchange" "myKey" newMsg {msgBody = (BL.pack "hello world"), msgDeliveryMode = Just Persistent} getLine -- wait for keypress closeConnection conn putStrLn "connection closed" myCallback :: (Message,Envelope) -> IO () myCallback (msg, env) = do putStrLn $ "received message: "++(BL.unpack $ msgBody msg) -- acknowledge receiving the message ackEnv env
Exception handling:
Some function calls can make the AMQP server throw an AMQP exception, which has the side-effect of closing the connection or channel. The AMQP exceptions are raised as Haskell exceptions (see AMQPException
). So upon receiving an AMQPException
you may have to reopen the channel or connection.
- data Connection
- openConnection :: String -> Text -> Text -> Text -> IO Connection
- openConnection' :: String -> PortNumber -> Text -> Text -> Text -> IO Connection
- closeConnection :: Connection -> IO ()
- addConnectionClosedHandler :: Connection -> Bool -> IO () -> IO ()
- data Channel
- openChannel :: Connection -> IO Channel
- qos :: Channel -> Word32 -> Word16 -> IO ()
- data ExchangeOpts = ExchangeOpts {}
- newExchange :: ExchangeOpts
- declareExchange :: Channel -> ExchangeOpts -> IO ()
- deleteExchange :: Channel -> Text -> IO ()
- data QueueOpts = QueueOpts {}
- newQueue :: QueueOpts
- declareQueue :: Channel -> QueueOpts -> IO (Text, Int, Int)
- bindQueue :: Channel -> Text -> Text -> Text -> IO ()
- bindQueue' :: Channel -> Text -> Text -> Text -> FieldTable -> IO ()
- purgeQueue :: Channel -> Text -> IO Word32
- deleteQueue :: Channel -> Text -> IO Word32
- data Message = Message {}
- data DeliveryMode
- newMsg :: Message
- data Envelope = Envelope {}
- type ConsumerTag = Text
- data Ack
- consumeMsgs :: Channel -> Text -> Ack -> ((Message, Envelope) -> IO ()) -> IO ConsumerTag
- cancelConsumer :: Channel -> ConsumerTag -> IO ()
- publishMsg :: Channel -> Text -> Text -> Message -> IO ()
- getMsg :: Channel -> Ack -> Text -> IO (Maybe (Message, Envelope))
- rejectMsg :: Channel -> LongLongInt -> Bool -> IO ()
- recoverMsgs :: Channel -> Bool -> IO ()
- ackMsg :: Channel -> LongLongInt -> Bool -> IO ()
- ackEnv :: Envelope -> IO ()
- txSelect :: Channel -> IO ()
- txCommit :: Channel -> IO ()
- txRollback :: Channel -> IO ()
- flow :: Channel -> Bool -> IO ()
- data AMQPException
Connection
data Connection Source
openConnection :: String -> Text -> Text -> Text -> IO ConnectionSource
openConnection hostname virtualHost loginName loginPassword
opens a connection to an AMQP server running on hostname
.
virtualHost
is used as a namespace for AMQP resources (default is "/"), so different applications could use multiple virtual hosts on the same AMQP server.
You must call closeConnection
before your program exits to ensure that all published messages are received by the server.
NOTE: If the login name, password or virtual host are invalid, this method will throw a ConnectionClosedException
. The exception will not contain a reason why the connection was closed, so you'll have to find out yourself.
openConnection' :: String -> PortNumber -> Text -> Text -> Text -> IO ConnectionSource
same as openConnection
but allows you to specify a non-default port-number as the 2nd parameter
closeConnection :: Connection -> IO ()Source
closes a connection
Make sure to call this function before your program exits to ensure that all published messages are received by the server.
addConnectionClosedHandler :: Connection -> Bool -> IO () -> IO ()Source
addConnectionClosedHandler conn ifClosed handler
adds a handler
that will be called after the connection is closed (either by calling closeConnection
or by an exception). If the ifClosed
parameter is True and the connection is already closed, the handler will be called immediately. If ifClosed == False
and the connection is already closed, the handler will never be called
Channel
A connection to an AMQP server is made up of separate channels. It is recommended to use a separate channel for each thread in your application that talks to the AMQP server (but you don't have to as channels are thread-safe)
openChannel :: Connection -> IO ChannelSource
opens a new channel on the connection
There's currently no closeChannel method, but you can always just close the connection (the maximum number of channels is 65535).
qos :: Channel -> Word32 -> Word16 -> IO ()Source
qos chan prefetchSize prefetchCount
limits the amount of data the server
delivers before requiring acknowledgements. prefetchSize
specifies the
number of bytes and prefetchCount
the number of messages. In both cases
the value 0 means unlimited.
NOTE: RabbitMQ does not implement prefetchSize and will throw an exception if it doesn't equal 0.
Exchanges
data ExchangeOpts Source
A record that contains the fields needed when creating a new exhange using declareExchange
. The default values apply when you use newExchange
.
ExchangeOpts | |
|
newExchange :: ExchangeOptsSource
an ExchangeOpts
with defaults set; you must override at least the exchangeName
and exchangeType
fields.
declareExchange :: Channel -> ExchangeOpts -> IO ()Source
declares a new exchange on the AMQP server. Can be used like this: declareExchange channel newExchange {exchangeName = "myExchange", exchangeType = "fanout"}
deleteExchange :: Channel -> Text -> IO ()Source
deletes the exchange with the provided name
Queues
A record that contains the fields needed when creating a new queue using declareQueue
. The default values apply when you use newQueue
.
QueueOpts | |
|
declareQueue :: Channel -> QueueOpts -> IO (Text, Int, Int)Source
creates a new queue on the AMQP server; can be used like this: declareQueue channel newQueue {queueName = "myQueue"}
.
Returns a tuple (queueName, messageCount, consumerCount)
.
queueName
is the name of the new queue (if you don't specify a queueName the server will autogenerate one).
messageCount
is the number of messages in the queue, which will be zero for newly-created queues. consumerCount
is the number of active consumers for the queue.
bindQueue :: Channel -> Text -> Text -> Text -> IO ()Source
bindQueue chan queueName exchangeName routingKey
binds the queue to the exchange using the provided routing key
bindQueue' :: Channel -> Text -> Text -> Text -> FieldTable -> IO ()Source
an extended version of bindQueue
that allows you to include arbitrary arguments. This is useful to use the headers
exchange-type.
purgeQueue :: Channel -> Text -> IO Word32Source
remove all messages from the queue; returns the number of messages that were in the queue
deleteQueue :: Channel -> Text -> IO Word32Source
deletes the queue; returns the number of messages that were in the queue before deletion
Messaging
An AMQP message
Message | |
|
data DeliveryMode Source
Persistent | the message will survive server restarts (if the queue is durable) |
NonPersistent | the message may be lost after server restarts |
contains meta-information of a delivered message (through getMsg
or consumeMsgs
)
type ConsumerTag = TextSource
specifies whether you have to acknowledge messages that you receive from consumeMsgs
or getMsg
. If you use Ack
, you have to call ackMsg
or ackEnv
after you have processed a message, otherwise it might be delivered again in the future
consumeMsgs :: Channel -> Text -> Ack -> ((Message, Envelope) -> IO ()) -> IO ConsumerTagSource
consumeMsgs chan queueName ack callback
subscribes to the given queue and returns a consumerTag. For any incoming message, the callback will be run. If ack ==
you will have to acknowledge all incoming messages (see Ack
ackMsg
and ackEnv
)
NOTE: The callback will be run on the same thread as the channel thread (every channel spawns its own thread to listen for incoming data) so DO NOT perform any request on chan
inside the callback (however, you CAN perform requests on other open channels inside the callback, though I wouldn't recommend it).
Functions that can safely be called on chan
are ackMsg
, ackEnv
, rejectMsg
, recoverMsgs
. If you want to perform anything more complex, it's a good idea to wrap it inside forkIO
.
cancelConsumer :: Channel -> ConsumerTag -> IO ()Source
stops a consumer that was started with consumeMsgs
publishMsg :: Channel -> Text -> Text -> Message -> IO ()Source
publishMsg chan exchangeName routingKey msg
publishes msg
to the exchange with the provided exchangeName
. The effect of routingKey
depends on the type of the exchange
NOTE: This method may temporarily block if the AMQP server requested us to stop sending content data (using the flow control mechanism). So don't rely on this method returning immediately
getMsg :: Channel -> Ack -> Text -> IO (Maybe (Message, Envelope))Source
getMsg chan ack queueName
gets a message from the specified queue. If ack==
, you have to call Ack
ackMsg
or ackEnv
for any message that you get, otherwise it might be delivered again in the future (by calling recoverMsgs
)
rejectMsg :: Channel -> LongLongInt -> Bool -> IO ()Source
rejectMsg chan deliveryTag requeue
allows a client to reject a message. It can be used to interrupt and cancel large incoming messages, or return untreatable messages to their original queue. If requeue==False
, the message will be discarded. If it is True
, the server will attempt to requeue the message.
NOTE: RabbitMQ 1.7 doesn't implement this command
recoverMsgs :: Channel -> Bool -> IO ()Source
recoverMsgs chan requeue
asks the broker to redeliver all messages that were received but not acknowledged on the specified channel.
If requeue==False
, the message will be redelivered to the original recipient. If requeue==True
, the server will attempt to requeue the message, potentially then delivering it to an alternative subscriber.
ackMsg :: Channel -> LongLongInt -> Bool -> IO ()Source
ackMsg chan deliveryTag multiple
acknowledges one or more messages.
if multiple==True
, the deliverTag
is treated as "up to and including", so that the client can acknowledge multiple messages with a single method call. If multiple==False
, deliveryTag
refers to a single message.
If multiple==True
, and deliveryTag==0
, tells the server to acknowledge all outstanding mesages.
Transactions
txSelect :: Channel -> IO ()Source
This method sets the channel to use standard transactions. The client must use this method at least once on a channel before using the Commit or Rollback methods.
txCommit :: Channel -> IO ()Source
This method commits all messages published and acknowledged in the current transaction. A new transaction starts immediately after a commit.
txRollback :: Channel -> IO ()Source
This method abandons all messages published and acknowledged in the current transaction. A new transaction starts immediately after a rollback.
Flow Control
flow :: Channel -> Bool -> IO ()Source
flow chan active
tells the AMQP server to pause or restart the flow of content
data. This is a simple flow-control mechanism that a peer can use
to avoid overflowing its queues or otherwise finding itself receiving
more messages than it can process.
If active==True
the server will start sending content data, if active==False
the server will stop sending content data.
A new channel is always active by default.
NOTE: RabbitMQ 1.7 doesn't implement this command.
Exceptions
data AMQPException Source
ChannelClosedException String | the |
ConnectionClosedException String | String may contain a reason |