Safe Haskell | Safe-Inferred |
---|---|
Language | Haskell2010 |
A client library for AMQP servers implementing the 0-9-1 spec; currently only supports RabbitMQ (see http://www.rabbitmq.com)
A good introduction to RabbitMQ and AMQP 0-9-1 (in various languages): http://www.rabbitmq.com/getstarted.html, http://www.rabbitmq.com/tutorials/amqp-concepts.html
Example
Connect to a server, declare a queue and an exchange and setup a callback for messages coming in on the queue. Then publish a single message to our new exchange
{-# LANGUAGE OverloadedStrings #-} import Network.AMQP import qualified Data.ByteString.Lazy.Char8 as BL main = do conn <- openConnection "127.0.0.1" "/" "guest" "guest" chan <- openChannel conn -- declare a queue, exchange and binding declareQueue chan newQueue {queueName = "myQueue"} declareExchange chan newExchange {exchangeName = "myExchange", exchangeType = "direct"} bindQueue chan "myQueue" "myExchange" "myKey" -- subscribe to the queue consumeMsgs chan "myQueue" Ack myCallback -- publish a message to our new exchange publishMsg chan "myExchange" "myKey" newMsg {msgBody = (BL.pack "hello world"), msgDeliveryMode = Just Persistent} getLine -- wait for keypress closeConnection conn putStrLn "connection closed" myCallback :: (Message,Envelope) -> IO () myCallback (msg, env) = do putStrLn $ "received message: " ++ (BL.unpack $ msgBody msg) -- acknowledge receiving the message ackEnv env
Exception handling notes
Some function calls can make the AMQP server throw an AMQP exception, which has the side-effect of closing the connection or channel. The AMQP exceptions are raised as Haskell exceptions (see AMQPException
). So upon receiving an AMQPException
you may have to reopen the channel or connection.
Debugging tips
If you need to debug a problem with e.g. channels being closed unexpectedly, here are some tips:
- The RabbitMQ log file often has helpful error-messages. The location of the log-file differs by OS. Look for RABBITMQ_LOGS in this page: https://www.rabbitmq.com/relocate.html
- The function
addChannelExceptionHandler
can be used to figure out when and why a channel was closed. - RabbitMQ has a browser-based management console, which allows you to see connections, channels, queues and more. Setup instructions are here: https://www.rabbitmq.com/management.html
Synopsis
- data Connection
- data ConnectionOpts = ConnectionOpts {
- coServers :: ![(String, PortNumber)]
- coVHost :: !Text
- coAuth :: ![SASLMechanism]
- coMaxFrameSize :: !(Maybe Word32)
- coHeartbeatDelay :: !(Maybe Word16)
- coMaxChannel :: !(Maybe Word16)
- coTLSSettings :: Maybe TLSSettings
- coName :: !(Maybe Text)
- data TLSSettings
- defaultConnectionOpts :: ConnectionOpts
- openConnection :: String -> Text -> Text -> Text -> IO Connection
- openConnection' :: String -> PortNumber -> Text -> Text -> Text -> IO Connection
- openConnection'' :: ConnectionOpts -> IO Connection
- closeChannel :: Channel -> IO ()
- closeConnection :: Connection -> IO ()
- addConnectionClosedHandler :: Connection -> Bool -> IO () -> IO ()
- addConnectionBlockedHandler :: Connection -> (Text -> IO ()) -> IO () -> IO ()
- getServerProperties :: Connection -> IO FieldTable
- data Channel
- openChannel :: Connection -> IO Channel
- addReturnListener :: Channel -> ((Message, PublishError) -> IO ()) -> IO ()
- addChannelExceptionHandler :: Channel -> (SomeException -> IO ()) -> IO ()
- isNormalChannelClose :: SomeException -> Bool
- qos :: Channel -> Word32 -> Word16 -> Bool -> IO ()
- data ExchangeOpts = ExchangeOpts {}
- newExchange :: ExchangeOpts
- declareExchange :: Channel -> ExchangeOpts -> IO ()
- bindExchange :: Channel -> Text -> Text -> Text -> IO ()
- bindExchange' :: Channel -> Text -> Text -> Text -> FieldTable -> IO ()
- unbindExchange :: Channel -> Text -> Text -> Text -> IO ()
- unbindExchange' :: Channel -> Text -> Text -> Text -> FieldTable -> IO ()
- deleteExchange :: Channel -> Text -> IO ()
- data QueueOpts = QueueOpts {}
- newQueue :: QueueOpts
- declareQueue :: Channel -> QueueOpts -> IO (Text, Int, Int)
- bindQueue :: Channel -> Text -> Text -> Text -> IO ()
- bindQueue' :: Channel -> Text -> Text -> Text -> FieldTable -> IO ()
- unbindQueue :: Channel -> Text -> Text -> Text -> IO ()
- unbindQueue' :: Channel -> Text -> Text -> Text -> FieldTable -> IO ()
- purgeQueue :: Channel -> Text -> IO Word32
- deleteQueue :: Channel -> Text -> IO Word32
- data Message = Message {
- msgBody :: ByteString
- msgDeliveryMode :: Maybe DeliveryMode
- msgTimestamp :: Maybe Timestamp
- msgID :: Maybe Text
- msgType :: Maybe Text
- msgUserID :: Maybe Text
- msgApplicationID :: Maybe Text
- msgClusterID :: Maybe Text
- msgContentType :: Maybe Text
- msgContentEncoding :: Maybe Text
- msgReplyTo :: Maybe Text
- msgPriority :: Maybe Octet
- msgCorrelationID :: Maybe Text
- msgExpiration :: Maybe Text
- msgHeaders :: Maybe FieldTable
- data DeliveryMode
- data PublishError = PublishError {}
- data ReturnReplyCode
- newMsg :: Message
- data Envelope = Envelope {}
- type ConsumerTag = Text
- data Ack
- consumeMsgs :: Channel -> Text -> Ack -> ((Message, Envelope) -> IO ()) -> IO ConsumerTag
- consumeMsgs' :: Channel -> Text -> Ack -> ((Message, Envelope) -> IO ()) -> (ConsumerTag -> IO ()) -> FieldTable -> IO ConsumerTag
- cancelConsumer :: Channel -> ConsumerTag -> IO ()
- publishMsg :: Channel -> Text -> Text -> Message -> IO (Maybe Int)
- publishMsg' :: Channel -> Text -> Text -> Bool -> Message -> IO (Maybe Int)
- getMsg :: Channel -> Ack -> Text -> IO (Maybe (Message, Envelope))
- rejectMsg :: Channel -> LongLongInt -> Bool -> IO ()
- rejectEnv :: Envelope -> Bool -> IO ()
- recoverMsgs :: Channel -> Bool -> IO ()
- ackMsg :: Channel -> LongLongInt -> Bool -> IO ()
- ackEnv :: Envelope -> IO ()
- nackMsg :: Channel -> LongLongInt -> Bool -> Bool -> IO ()
- nackEnv :: Envelope -> IO ()
- txSelect :: Channel -> IO ()
- txCommit :: Channel -> IO ()
- txRollback :: Channel -> IO ()
- confirmSelect :: Channel -> Bool -> IO ()
- waitForConfirms :: Channel -> IO ConfirmationResult
- waitForConfirmsUntil :: Channel -> Int -> IO ConfirmationResult
- addConfirmationListener :: Channel -> ((Word64, Bool, AckType) -> IO ()) -> IO ()
- data ConfirmationResult
- data AckType
- flow :: Channel -> Bool -> IO ()
- data SASLMechanism = SASLMechanism {
- saslName :: !Text
- saslInitialResponse :: !ByteString
- saslChallengeFunc :: !(Maybe (ByteString -> IO ByteString))
- plain :: Text -> Text -> SASLMechanism
- amqplain :: Text -> Text -> SASLMechanism
- rabbitCRdemo :: Text -> Text -> SASLMechanism
- data AMQPException
- data ChanThreadKilledException
- data CloseType
- fromURI :: String -> ConnectionOpts
Connection
data Connection Source #
data ConnectionOpts Source #
Represents the parameters to connect to a broker or a cluster of brokers.
See defaultConnectionOpts
.
ConnectionOpts | |
|
data TLSSettings Source #
Represents the kind of TLS connection to establish.
TLSTrusted | Require trusted certificates (Recommended). |
TLSUntrusted | Allow untrusted certificates (Discouraged. Vulnerable to man-in-the-middle attacks) |
TLSCustom TLSSettings | Provide your own custom TLS settings |
defaultConnectionOpts :: ConnectionOpts Source #
Constructs default connection options with the following settings :
- Broker:
amqp://guest:guest@localhost:5672/%2F
using thePLAIN
SASL mechanism - max frame size:
131072
- use the heartbeat delay suggested by the broker
- no limit on the number of used channels
openConnection :: String -> Text -> Text -> Text -> IO Connection Source #
openConnection hostname virtualHost loginName loginPassword
opens a connection to an AMQP server running on hostname
.
virtualHost
is used as a namespace for AMQP resources (default is "/"), so different applications could use multiple virtual hosts on the same AMQP server.
You must call closeConnection
before your program exits to ensure that all published messages are received by the server.
The loginName
and loginPassword
will be used to authenticate via the PLAIN
SASL mechanism.
NOTE: If the login name, password or virtual host are invalid, this method will throw a ConnectionClosedException
. The exception will not contain a reason why the connection was closed, so you'll have to find out yourself.
openConnection' :: String -> PortNumber -> Text -> Text -> Text -> IO Connection Source #
same as openConnection
but allows you to specify a non-default port-number as the 2nd parameter
openConnection'' :: ConnectionOpts -> IO Connection Source #
Opens a connection to a broker specified by the given ConnectionOpts
parameter.
closeChannel :: Channel -> IO () Source #
closes a channel. It is typically not necessary to manually call this as closing a connection will implicitly close all channels.
closeConnection :: Connection -> IO () Source #
closes a connection
Make sure to call this function before your program exits to ensure that all published messages are received by the server.
addConnectionClosedHandler :: Connection -> Bool -> IO () -> IO () Source #
addConnectionClosedHandler conn ifClosed handler
adds a handler
that will be called after the connection is closed (either by calling closeConnection
or by an exception). If the ifClosed
parameter is True and the connection is already closed, the handler will be called immediately. If ifClosed == False
and the connection is already closed, the handler will never be called
addConnectionBlockedHandler :: Connection -> (Text -> IO ()) -> IO () -> IO () Source #
addConnectionBlockedHandler conn blockedHandler unblockedHandler
adds handlers that will be called
when a connection gets blocked/unlocked due to server resource constraints.
More information: https://www.rabbitmq.com/connection-blocked.html
getServerProperties :: Connection -> IO FieldTable Source #
get the server properties sent in connection.start
Channel
A connection to an AMQP server is made up of separate channels. It is recommended to use a separate channel for each thread in your application that talks to the AMQP server (but you don't have to as channels are thread-safe)
openChannel :: Connection -> IO Channel Source #
opens a new channel on the connection
By default, if a channel is closed by an AMQP exception, this exception will be printed to stderr. You can prevent this behaviour by setting a custom exception handler (using addChannelExceptionHandler
).
Example of adding an exception-handler:
chan <- openChannel conn addChannelExceptionHandler chan $ \e -> do unless (isNormalChannelClose e) $ do putStrLn $ "channel exception: "++show e
addReturnListener :: Channel -> ((Message, PublishError) -> IO ()) -> IO () Source #
Registers a callback function that is called whenever a message is returned from the broker ('basic.return').
addChannelExceptionHandler :: Channel -> (SomeException -> IO ()) -> IO () Source #
Registers a callback function that is called whenever a channel is closed by an exception.
This method will always be called when a channel is closed, whether through normal means
(closeChannel
, closeConnection
) or by some AMQP exception.
You can use isNormalChannelClose
to figure out if the exception was normal or due to an
AMQP exception.
isNormalChannelClose :: SomeException -> Bool Source #
This can be used with the exception passed to addChannelExceptionHandler
.
Returns True if the argument is a ConnectionClosedException
or ChannelClosedException
that happened
normally (i.e. by the user calling closeChannel
or closeConnection
) and not due to some
AMQP exception.
qos :: Channel -> Word32 -> Word16 -> Bool -> IO () Source #
qos chan prefetchSize prefetchCount global
limits the amount of data the server
delivers before requiring acknowledgements. prefetchSize
specifies the
number of bytes and prefetchCount
the number of messages. In both cases
the value 0 means unlimited.
The meaning of the global
flag is explained here: http://www.rabbitmq.com/consumer-prefetch.html
NOTE: RabbitMQ does not implement prefetchSize and will throw an exception if it doesn't equal 0.
Exchanges
data ExchangeOpts Source #
A record that contains the fields needed when creating a new exhange using declareExchange
. The default values apply when you use newExchange
.
ExchangeOpts | |
|
Instances
Read ExchangeOpts Source # | |
Defined in Network.AMQP readsPrec :: Int -> ReadS ExchangeOpts # readList :: ReadS [ExchangeOpts] # | |
Show ExchangeOpts Source # | |
Defined in Network.AMQP showsPrec :: Int -> ExchangeOpts -> ShowS # show :: ExchangeOpts -> String # showList :: [ExchangeOpts] -> ShowS # | |
Eq ExchangeOpts Source # | |
Defined in Network.AMQP (==) :: ExchangeOpts -> ExchangeOpts -> Bool # (/=) :: ExchangeOpts -> ExchangeOpts -> Bool # | |
Ord ExchangeOpts Source # | |
Defined in Network.AMQP compare :: ExchangeOpts -> ExchangeOpts -> Ordering # (<) :: ExchangeOpts -> ExchangeOpts -> Bool # (<=) :: ExchangeOpts -> ExchangeOpts -> Bool # (>) :: ExchangeOpts -> ExchangeOpts -> Bool # (>=) :: ExchangeOpts -> ExchangeOpts -> Bool # max :: ExchangeOpts -> ExchangeOpts -> ExchangeOpts # min :: ExchangeOpts -> ExchangeOpts -> ExchangeOpts # |
newExchange :: ExchangeOpts Source #
an ExchangeOpts
with defaults set; you must override at least the exchangeName
and exchangeType
fields.
declareExchange :: Channel -> ExchangeOpts -> IO () Source #
declares a new exchange on the AMQP server. Can be used like this: declareExchange channel newExchange {exchangeName = "myExchange", exchangeType = "fanout"}
bindExchange :: Channel -> Text -> Text -> Text -> IO () Source #
bindExchange chan destinationName sourceName routingKey
binds the exchange to the exchange using the provided routing key
bindExchange' :: Channel -> Text -> Text -> Text -> FieldTable -> IO () Source #
an extended version of bindExchange
that allows you to include arbitrary arguments. This is useful to use the headers
exchange-type.
unbindExchange :: Channel -> Text -> Text -> Text -> IO () Source #
unbindExchange chan destinationName sourceName routingKey
unbinds an exchange from an exchange. The routingKey
must be identical to the one specified when binding the exchange.
unbindExchange' :: Channel -> Text -> Text -> Text -> FieldTable -> IO () Source #
an extended version of unbindExchange
that allows you to include arguments. The arguments
must be identical to the ones specified when binding the exchange.
Queues
A record that contains the fields needed when creating a new queue using declareQueue
. The default values apply when you use newQueue
.
QueueOpts | |
|
declareQueue :: Channel -> QueueOpts -> IO (Text, Int, Int) Source #
creates a new queue on the AMQP server; can be used like this: declareQueue channel newQueue {queueName = "myQueue"}
.
Returns a tuple (queue, messageCount, consumerCount)
.
queue
is the name of the new queue (if you don't specify a queue the server will autogenerate one).
messageCount
is the number of messages in the queue, which will be zero for newly-created queues. consumerCount
is the number of active consumers for the queue.
bindQueue :: Channel -> Text -> Text -> Text -> IO () Source #
bindQueue chan queue exchange routingKey
binds the queue to the exchange using the provided routing key. If exchange
is the empty string, the default exchange will be used.
bindQueue' :: Channel -> Text -> Text -> Text -> FieldTable -> IO () Source #
an extended version of bindQueue
that allows you to include arbitrary arguments. This is useful to use the headers
exchange-type.
unbindQueue :: Channel -> Text -> Text -> Text -> IO () Source #
unbindQueue chan queue exchange routingKey
unbinds a queue from an exchange. The routingKey
must be identical to the one specified when binding the queue.
unbindQueue' :: Channel -> Text -> Text -> Text -> FieldTable -> IO () Source #
an extended version of unbindQueue
that allows you to include arguments. The arguments
must be identical to the ones specified when binding the queue.
purgeQueue :: Channel -> Text -> IO Word32 Source #
remove all messages from the queue; returns the number of messages that were in the queue
deleteQueue :: Channel -> Text -> IO Word32 Source #
deletes the queue; returns the number of messages that were in the queue before deletion
Messaging
An AMQP message
Message | |
|
data DeliveryMode Source #
Persistent | the message will survive server restarts (if the queue is durable) |
NonPersistent | the message may be lost after server restarts |
Instances
Read DeliveryMode Source # | |
Defined in Network.AMQP.Internal readsPrec :: Int -> ReadS DeliveryMode # readList :: ReadS [DeliveryMode] # | |
Show DeliveryMode Source # | |
Defined in Network.AMQP.Internal showsPrec :: Int -> DeliveryMode -> ShowS # show :: DeliveryMode -> String # showList :: [DeliveryMode] -> ShowS # | |
Eq DeliveryMode Source # | |
Defined in Network.AMQP.Internal (==) :: DeliveryMode -> DeliveryMode -> Bool # (/=) :: DeliveryMode -> DeliveryMode -> Bool # | |
Ord DeliveryMode Source # | |
Defined in Network.AMQP.Internal compare :: DeliveryMode -> DeliveryMode -> Ordering # (<) :: DeliveryMode -> DeliveryMode -> Bool # (<=) :: DeliveryMode -> DeliveryMode -> Bool # (>) :: DeliveryMode -> DeliveryMode -> Bool # (>=) :: DeliveryMode -> DeliveryMode -> Bool # max :: DeliveryMode -> DeliveryMode -> DeliveryMode # min :: DeliveryMode -> DeliveryMode -> DeliveryMode # |
data PublishError Source #
Instances
Read PublishError Source # | |
Defined in Network.AMQP.Internal readsPrec :: Int -> ReadS PublishError # readList :: ReadS [PublishError] # | |
Show PublishError Source # | |
Defined in Network.AMQP.Internal showsPrec :: Int -> PublishError -> ShowS # show :: PublishError -> String # showList :: [PublishError] -> ShowS # | |
Eq PublishError Source # | |
Defined in Network.AMQP.Internal (==) :: PublishError -> PublishError -> Bool # (/=) :: PublishError -> PublishError -> Bool # |
data ReturnReplyCode Source #
Instances
Read ReturnReplyCode Source # | |
Defined in Network.AMQP.Internal | |
Show ReturnReplyCode Source # | |
Defined in Network.AMQP.Internal showsPrec :: Int -> ReturnReplyCode -> ShowS # show :: ReturnReplyCode -> String # showList :: [ReturnReplyCode] -> ShowS # | |
Eq ReturnReplyCode Source # | |
Defined in Network.AMQP.Internal (==) :: ReturnReplyCode -> ReturnReplyCode -> Bool # (/=) :: ReturnReplyCode -> ReturnReplyCode -> Bool # |
contains meta-information of a delivered message (through getMsg
or consumeMsgs
)
type ConsumerTag = Text Source #
specifies whether you have to acknowledge messages that you receive from consumeMsgs
or getMsg
. If you use Ack
, you have to call ackMsg
or ackEnv
after you have processed a message, otherwise it might be delivered again in the future
consumeMsgs :: Channel -> Text -> Ack -> ((Message, Envelope) -> IO ()) -> IO ConsumerTag Source #
consumeMsgs chan queue ack callback
subscribes to the given queue and returns a consumerTag. For any incoming message, the callback will be run. If ack ==
you will have to acknowledge all incoming messages (see Ack
ackMsg
and ackEnv
)
Exceptions in the callback
If an exception occurs in callback
, it will be caught and printed to stderr
. But you should not depend on this behaviour (it might change in future versions of this library);
instead, it is strongly recommended that you catch any exceptions that your callback may throw and handle them appropriately.
But make sure not to catch ChanThreadKilledException
(or re-throw it if you did catch it), since it is used internally by the library to close channels.
So unless you are confident that your callback won't throw exceptions, you may want to structure your code like this:
consumeMsgs chan name Ack $ \(msg, env) -> (do ...) `CE.catches` [ -- rethrow this exception, since the AMPQ library uses it internally CE.Handler $ \(e::ChanThreadKilledException) -> CE.throwIO e, -- (optional) catch individual exceptions that your code may throw CE.Handler $ \(e::CE.IOException) -> ..., CE.Handler $ \(e::SomeOtherException) -> ..., -- catch all exceptions that weren't handled above CE.Handler $ \(e::CE.SomeException) -> ... ]
In practice, it might be useful to encapsulate this exception-handling logic in a custom wrapper-function so that you can reuse it for every callback you pass to consumeMsgs
.
Blocking requests in the callback
The callback
will be run on the channel's receiver thread (which is responsible for handling all incoming messages on this channel, including responses to requests from the client) so DO NOT perform any blocking request on chan
inside the callback, as this would lead to a dead-lock. However, you CAN perform requests on other open channels inside the callback, though that would keep chan
blocked until the requests are done, so it is not recommended.
Unless you're using AMQP flow control, the following functions can safely be called on chan
: ackMsg
, ackEnv
, rejectMsg
, publishMsg
. If you use flow-control or want to perform anything more complex, it's recommended that instead of using consumeMsgs
you use getMsg
to fetch messages in a loop (because then your message-handling code will not run in the channel's receiver thread, so there will be no problems when performing blocking requests).
consumeMsgs' :: Channel -> Text -> Ack -> ((Message, Envelope) -> IO ()) -> (ConsumerTag -> IO ()) -> FieldTable -> IO ConsumerTag Source #
an extended version of consumeMsgs
that allows you to define a consumer cancellation callback and include arbitrary arguments.
cancelConsumer :: Channel -> ConsumerTag -> IO () Source #
stops a consumer that was started with consumeMsgs
publishMsg :: Channel -> Text -> Text -> Message -> IO (Maybe Int) Source #
publishMsg chan exchange routingKey msg
publishes msg
to the exchange with the provided exchange
. The effect of routingKey
depends on the type of the exchange.
Returns the sequence-number of the message (only if the channel is in publisher confirm mode; see confirmSelect
).
NOTE: This method may temporarily block if the AMQP server requested us to stop sending content data (using the flow control mechanism). So don't rely on this method returning immediately.
publishMsg' :: Channel -> Text -> Text -> Bool -> Message -> IO (Maybe Int) Source #
Like publishMsg
, but additionally allows you to specify whether the mandatory
flag should be set.
getMsg :: Channel -> Ack -> Text -> IO (Maybe (Message, Envelope)) Source #
getMsg chan ack queue
gets a message from the specified queue. If ack==
, you have to call Ack
ackMsg
or ackEnv
for any message that you get, otherwise it might be delivered again in the future (by calling recoverMsgs
).
Will return Nothing
when no message is currently available.
rejectMsg :: Channel -> LongLongInt -> Bool -> IO () Source #
rejectMsg chan deliveryTag requeue
allows a client to reject a message. It can be used to interrupt and cancel large incoming messages, or return untreatable messages to their original queue. If requeue==False
, the message will be discarded. If it is True
, the server will attempt to requeue the message.
NOTE: RabbitMQ 1.7 doesn't implement this command
recoverMsgs :: Channel -> Bool -> IO () Source #
recoverMsgs chan requeue
asks the broker to redeliver all messages that were received but not acknowledged on the specified channel.
If requeue==False
, the message will be redelivered to the original recipient. If requeue==True
, the server will attempt to requeue the message, potentially then delivering it to an alternative subscriber.
ackMsg :: Channel -> LongLongInt -> Bool -> IO () Source #
ackMsg chan deliveryTag multiple
acknowledges one or more messages. A message MUST not be acknowledged more than once.
if multiple==True
, the deliverTag
is treated as "up to and including", so that the client can acknowledge multiple messages with a single method call. If multiple==False
, deliveryTag
refers to a single message.
If multiple==True
, and deliveryTag==0
, tells the server to acknowledge all outstanding mesages.
nackMsg :: Channel -> LongLongInt -> Bool -> Bool -> IO () Source #
nackMsg chan deliveryTag multiple requeue
rejects one or more messages. A message MUST not be rejected more than once.
if multiple==True
, the deliverTag
is treated as "up to and including", so that the client can reject multiple messages with a single method call. If multiple==False
, deliveryTag
refers to a single message.
If requeue==True
, the server will try to requeue the message. If requeue==False
, the message will be dropped by the server.
Transactions
txSelect :: Channel -> IO () Source #
This method sets the channel to use standard transactions. The client must use this method at least once on a channel before using the Commit or Rollback methods.
txCommit :: Channel -> IO () Source #
This method commits all messages published and acknowledged in the current transaction. A new transaction starts immediately after a commit.
txRollback :: Channel -> IO () Source #
This method abandons all messages published and acknowledged in the current transaction. A new transaction starts immediately after a rollback.
Confirms
confirmSelect :: Channel -> Bool -> IO () Source #
confirmSelect chan nowait
puts the channel in publisher confirm mode. This mode is a RabbitMQ
extension where a producer receives confirmations when messages are successfully processed by
the broker. Publisher confirms are a relatively lightweight alternative to full transactional
mode. For details about the delivery guarantees and performace implications of this mode, see
https://www.rabbitmq.com/confirms.html. Note that on a single channel, publisher confirms and
transactions are mutually exclusive (you cannot select both at the same time).
When nowait==True
the server will not send back a response to this method.
waitForConfirms :: Channel -> IO ConfirmationResult Source #
Calling this function will cause the invoking thread to block until all previously published
messages have been acknowledged by the broker (positively or negatively). Returns a value of type
ConfirmationResult
, holding a tuple of two IntSets (acked, nacked)
, ontaining the delivery tags
for the messages that have been confirmed by the broker.
waitForConfirmsUntil :: Channel -> Int -> IO ConfirmationResult Source #
Same as waitForConfirms
, but with a timeout in microseconds. Note that, since this operation
may timeout before the server has acked or nacked all pending messages, the returned ConfirmationResult
should be pattern-matched for the constructors Complete (acked, nacked)
and Partial (acked, nacked, pending)
addConfirmationListener :: Channel -> ((Word64, Bool, AckType) -> IO ()) -> IO () Source #
Adds a handler which will be invoked each time the Channel
receives a confirmation from the broker.
The parameters passed to the the handler are the deliveryTag
for the message being confirmed, a flag
indicating whether the confirmation refers to this message individually (False
) or all messages up to
this one (True
) and an AckType
whose value can be either BasicAck
or BasicNack
.
data ConfirmationResult Source #
Instances
Show ConfirmationResult Source # | |
Defined in Network.AMQP.Types showsPrec :: Int -> ConfirmationResult -> ShowS # show :: ConfirmationResult -> String # showList :: [ConfirmationResult] -> ShowS # |
Flow Control
flow :: Channel -> Bool -> IO () Source #
flow chan active
tells the AMQP server to pause or restart the flow of content
data. This is a simple flow-control mechanism that a peer can use
to avoid overflowing its queues or otherwise finding itself receiving
more messages than it can process.
If active==True
the server will start sending content data, if active==False
the server will stop sending content data.
A new channel is always active by default.
NOTE: RabbitMQ 1.7 doesn't implement this command.
SASL
data SASLMechanism Source #
A SASLMechanism
is described by its name (saslName
), its initial response (saslInitialResponse
), and an optional function (saslChallengeFunc
) that
transforms a security challenge provided by the server into response, which is then sent back to the server for verification.
SASLMechanism | |
|
amqplain :: Text -> Text -> SASLMechanism Source #
The AMQPLAIN
SASL mechanism. See http://www.rabbitmq.com/authentication.html.
rabbitCRdemo :: Text -> Text -> SASLMechanism Source #
The RABBIT-CR-DEMO
SASL mechanism needs to be explicitly enabled on the RabbitMQ server and should only be used for demonstration purposes of the challenge-response cycle.
See http://www.rabbitmq.com/authentication.html.
Exceptions
data AMQPException Source #
ChannelClosedException CloseType String | the |
ConnectionClosedException CloseType String | String may contain a reason |
AllChannelsAllocatedException Int | the |
Instances
Exception AMQPException Source # | |
Defined in Network.AMQP.Types | |
Show AMQPException Source # | |
Defined in Network.AMQP.Types showsPrec :: Int -> AMQPException -> ShowS # show :: AMQPException -> String # showList :: [AMQPException] -> ShowS # | |
Eq AMQPException Source # | |
Defined in Network.AMQP.Types (==) :: AMQPException -> AMQPException -> Bool # (/=) :: AMQPException -> AMQPException -> Bool # | |
Ord AMQPException Source # | |
Defined in Network.AMQP.Types compare :: AMQPException -> AMQPException -> Ordering # (<) :: AMQPException -> AMQPException -> Bool # (<=) :: AMQPException -> AMQPException -> Bool # (>) :: AMQPException -> AMQPException -> Bool # (>=) :: AMQPException -> AMQPException -> Bool # max :: AMQPException -> AMQPException -> AMQPException # min :: AMQPException -> AMQPException -> AMQPException # |
data ChanThreadKilledException Source #
Thrown in the channel thread when the connection gets closed. When handling exceptions in a subscription callback, make sure to re-throw this so the channel thread can be stopped.
Instances
describes whether a channel was closed by user-request (Normal) or by an AMQP exception (Abnormal)
URI parsing
fromURI :: String -> ConnectionOpts Source #
Parses amqp standard URI of the form amqp://user:password
host:port/vhost and returns a
ConnectionOpts for use with
openConnection''
| Any of these fields may be empty and will be replaced with defaults from
amqp:/guest:guest@localhost:5672@