{-# OPTIONS -XBangPatterns -XScopedTypeVariables -XDeriveDataTypeable #-} {- | A client library for AMQP servers implementing the 0-8 spec; currently only supports RabbitMQ (see ) A good introduction to AMQP can be found here (though it uses Python): /Example/: Connect to a server, declare a queue and an exchange and setup a callback for messages coming in on the queue. Then publish a single message to our new exchange >import Network.AMQP >import qualified Data.ByteString.Lazy.Char8 as BL > >main = do > conn <- openConnection "127.0.0.1" "/" "guest" "guest" > chan <- openChannel conn > > -- declare a queue, exchange and binding > declareQueue chan newQueue {queueName = "myQueue"} > declareExchange chan newExchange {exchangeName = "myExchange", exchangeType = "direct"} > bindQueue chan "myQueue" "myExchange" "myKey" > > -- subscribe to the queue > consumeMsgs chan "myQueue" Ack myCallback > > -- publish a message to our new exchange > publishMsg chan "myExchange" "myKey" > newMsg {msgBody = (BL.pack "hello world"), > msgDeliveryMode = Just Persistent} > > getLine -- wait for keypress > closeConnection conn > putStrLn "connection closed" > > >myCallback :: (Message,Envelope) -> IO () >myCallback (msg, env) = do > putStrLn $ "received message: "++(BL.unpack $ msgBody msg) > -- acknowledge receiving the message > ackEnv env /Exception handling/: Some function calls can make the AMQP server throw an AMQP exception, which has the side-effect of closing the connection or channel. The AMQP exceptions are raised as Haskell exceptions (see 'AMQPException'). So upon receiving an 'AMQPException' you may have to reopen the channel or connection. -} module Network.AMQP ( -- * Connection Connection, openConnection, openConnection', closeConnection, -- * Channel Channel, openChannel, -- * Exchanges ExchangeOpts(..), newExchange, declareExchange, deleteExchange, -- * Queues QueueOpts(..), newQueue, declareQueue, bindQueue, purgeQueue, deleteQueue, -- * Messaging Message(..), DeliveryMode(..), newMsg, Envelope(..), ConsumerTag, Ack(..), consumeMsgs, cancelConsumer, publishMsg, getMsg, rejectMsg, recoverMsgs, ackMsg, ackEnv, -- * Transactions txSelect, txCommit, txRollback, -- * Flow Control flow, -- * Exceptions AMQPException(..) ) where import Data.Binary import Data.Binary.Get import Data.Binary.Put as BPut import Data.Typeable import qualified Data.Map as M import qualified Data.IntMap as IM import qualified Data.ByteString.Char8 as BS import qualified Data.ByteString.Lazy.Char8 as BL import Data.IORef import Data.Maybe import Data.Int import Control.Concurrent import Control.Monad import qualified Control.Exception as CE import Network.BSD import Network.Socket import qualified Network.Socket.ByteString as NB import Network.AMQP.Protocol import Network.AMQP.Types import Network.AMQP.Helpers import Network.AMQP.Generated {- TODO: - basic.qos - handle basic.return - connection.secure - connection.redirect -} ----- EXCHANGE ----- -- | A record that contains the fields needed when creating a new exhange using 'declareExchange'. The default values apply when you use 'newExchange'. data ExchangeOpts = ExchangeOpts { exchangeName :: String, -- ^ (must be set); the name of the exchange exchangeType :: String, -- ^ (must be set); the type of the exchange (\"fanout\", \"direct\", \"topic\") -- optional exchangePassive :: Bool, -- ^ (default 'False'); If set, the server will not create the exchange. The client can use this to check whether an exchange exists without modifying the server state. exchangeDurable :: Bool, -- ^ (default 'True'); If set when creating a new exchange, the exchange will be marked as durable. Durable exchanges remain active when a server restarts. Non-durable exchanges (transient exchanges) are purged if/when a server restarts. exchangeAutoDelete :: Bool, -- ^ (default 'False'); If set, the exchange is deleted when all queues have finished using it. exchangeInternal :: Bool -- ^ (default 'False'); If set, the exchange may not be used directly by publishers, but only when bound to other exchanges. Internal exchanges are used to construct wiring that is not visible to applications. } -- | an 'ExchangeOpts' with defaults set; you must override at least the 'exchangeName' and 'exchangeType' fields. newExchange :: ExchangeOpts newExchange = ExchangeOpts "" "" False True False False -- | declares a new exchange on the AMQP server. Can be used like this: @declareExchange channel newExchange {exchangeName = \"myExchange\", exchangeType = \"fanout\"}@ declareExchange :: Channel -> ExchangeOpts -> IO () declareExchange chan exchg = do (SimpleMethod Exchange_declare_ok) <- request chan (SimpleMethod (Exchange_declare 1 -- ticket; ignored by rabbitMQ (ShortString $ exchangeName exchg) -- exchange (ShortString $ exchangeType exchg) -- typ (exchangePassive exchg) -- passive (exchangeDurable exchg) -- durable (exchangeAutoDelete exchg) -- auto_delete (exchangeInternal exchg) -- internal False -- nowait (FieldTable (M.fromList [])))) -- arguments return () -- | deletes the exchange with the provided name deleteExchange :: Channel -> String -> IO () deleteExchange chan exchangeName = do (SimpleMethod Exchange_delete_ok) <- request chan (SimpleMethod (Exchange_delete 1 -- ticket; ignored by rabbitMQ (ShortString exchangeName) -- exchange False -- if_unused; If set, the server will only delete the exchange if it has no queue bindings. False -- nowait )) return () ----- QUEUE ----- -- | A record that contains the fields needed when creating a new queue using 'declareQueue'. The default values apply when you use 'newQueue'. data QueueOpts = QueueOpts { --must be set queueName :: String, -- ^ (default \"\"); the name of the queue; if left empty, the server will generate a new name and return it from the 'declareQueue' method --optional queuePassive :: Bool, -- ^ (default 'False'); If set, the server will not create the queue. The client can use this to check whether a queue exists without modifying the server state. queueDurable :: Bool, -- ^ (default 'True'); If set when creating a new queue, the queue will be marked as durable. Durable queues remain active when a server restarts. Non-durable queues (transient queues) are purged if/when a server restarts. Note that durable queues do not necessarily hold persistent messages, although it does not make sense to send persistent messages to a transient queue. queueExclusive :: Bool, -- ^ (default 'False'); Exclusive queues may only be consumed from by the current connection. Setting the 'exclusive' flag always implies 'auto-delete'. queueAutoDelete :: Bool -- ^ (default 'False'); If set, the queue is deleted when all consumers have finished using it. Last consumer can be cancelled either explicitly or because its channel is closed. If there was no consumer ever on the queue, it won't be deleted. } -- | a 'QueueOpts' with defaults set; you should override at least 'queueName'. newQueue :: QueueOpts newQueue = QueueOpts "" False True False False -- | creates a new queue on the AMQP server; can be used like this: @declareQueue channel newQueue {queueName = \"myQueue\"}@. -- -- Returns a tuple @(queueName, messageCount, consumerCount)@. -- @queueName@ is the name of the new queue (if you don't specify a queueName the server will autogenerate one). -- @messageCount@ is the number of messages in the queue, which will be zero for newly-created queues. @consumerCount@ is the number of active consumers for the queue. declareQueue :: Channel -> QueueOpts -> IO (String, Int, Int) declareQueue chan queue = do (SimpleMethod (Queue_declare_ok (ShortString qName) messageCount consumerCount)) <- request chan $ (SimpleMethod (Queue_declare 1 -- ticket (ShortString $ queueName queue) (queuePassive queue) (queueDurable queue) (queueExclusive queue) (queueAutoDelete queue) False -- no-wait; true means no answer from server (FieldTable (M.fromList [])))) return (qName, fromIntegral messageCount, fromIntegral consumerCount) -- | @bindQueue chan queueName exchangeName routingKey@ binds the queue to the exchange using the provided routing key bindQueue :: Channel -> String -> String -> String -> IO () bindQueue chan queueName exchangeName routingKey = do (SimpleMethod Queue_bind_ok) <- request chan (SimpleMethod (Queue_bind 1 -- ticket; ignored by rabbitMQ (ShortString queueName) (ShortString exchangeName) (ShortString routingKey) False -- nowait (FieldTable (M.fromList [])))) -- arguments return () -- | remove all messages from the queue; returns the number of messages that were in the queue purgeQueue :: Channel -> String -> IO Word32 purgeQueue chan queueName = do (SimpleMethod (Queue_purge_ok msgCount)) <- request chan $ (SimpleMethod (Queue_purge 1 -- ticket (ShortString queueName) -- queue False -- nowait )) return msgCount -- | deletes the queue; returns the number of messages that were in the queue before deletion deleteQueue :: Channel -> String -> IO Word32 deleteQueue chan queueName = do (SimpleMethod (Queue_delete_ok msgCount)) <- request chan $ (SimpleMethod (Queue_delete 1 -- ticket (ShortString queueName) -- queue False -- if_unused False -- if_empty False -- nowait )) return msgCount ----- MSG (the BASIC class in AMQP) ----- type ConsumerTag = String -- | specifies whether you have to acknowledge messages that you receive from 'consumeMsgs' or 'getMsg'. If you use 'Ack', you have to call 'ackMsg' or 'ackEnv' after you have processed a message, otherwise it might be delivered again in the future data Ack = Ack | NoAck ackToBool :: Ack -> Bool ackToBool Ack = False ackToBool NoAck = True -- | @consumeMsgs chan queueName ack callback@ subscribes to the given queue and returns a consumerTag. For any incoming message, the callback will be run. If @ack == 'Ack'@ you will have to acknowledge all incoming messages (see 'ackMsg' and 'ackEnv') -- -- NOTE: The callback will be run on the same thread as the channel thread (every channel spawns its own thread to listen for incoming data) so DO NOT perform any request on @chan@ inside the callback (however, you CAN perform requests on other open channels inside the callback, though I wouldn't recommend it). -- Functions that can safely be called on @chan@ are 'ackMsg', 'ackEnv', 'rejectMsg', 'recoverMsgs'. If you want to perform anything more complex, it's a good idea to wrap it inside 'forkIO'. consumeMsgs :: Channel -> String -> Ack -> ((Message,Envelope) -> IO ()) -> IO ConsumerTag consumeMsgs chan queueName ack callback = do --generate a new consumer tag newConsumerTag <- (liftM show) $ modifyMVar (lastConsumerTag chan) $ \c -> return (c+1,c+1) --register the consumer modifyMVar_ (consumers chan) $ \c -> return $ M.insert newConsumerTag callback c writeAssembly chan (SimpleMethod $ Basic_consume 1 -- ticket (ShortString queueName) -- queue (ShortString newConsumerTag) -- consumer_tag False -- no_local; If the no-local field is set the server will not send messages to the client that published them. (ackToBool ack) -- no_ack False -- exclusive; Request exclusive consumer access, meaning only this consumer can access the queue. True -- nowait ) return newConsumerTag -- | stops a consumer that was started with 'consumeMsgs' cancelConsumer :: Channel -> ConsumerTag -> IO () cancelConsumer chan consumerTag = do --unregister the consumer modifyMVar_ (consumers chan) $ \c -> return $ M.delete consumerTag c (SimpleMethod (Basic_cancel_ok consumerTag')) <- request chan $ (SimpleMethod (Basic_cancel (ShortString consumerTag) -- consumer_tag False -- nowait )) return () -- | @publishMsg chan exchangeName routingKey msg@ publishes @msg@ to the exchange with the provided @exchangeName@. The effect of @routingKey@ depends on the type of the exchange -- -- NOTE: This method may temporarily block if the AMQP server requested us to stop sending content data (using the flow control mechanism). So don't rely on this method returning immediately publishMsg :: Channel -> String -> String -> Message -> IO () publishMsg chan exchangeName routingKey msg = do writeAssembly chan (ContentMethod (Basic_publish 1 -- ticket; ignored by rabbitMQ (ShortString exchangeName) (ShortString routingKey) False -- mandatory; if true, the server might return the msg, which is currently not handled False) --immediate; if true, the server might return the msg, which is currently not handled --TODO: add more of these to 'Message' (CHBasic (fmap ShortString $ msgContentType msg) Nothing Nothing (fmap deliveryModeToInt $ msgDeliveryMode msg) -- delivery_mode Nothing Nothing Nothing Nothing (fmap ShortString $ msgID msg) (msgTimestamp msg) Nothing Nothing Nothing Nothing ) (msgBody msg)) return () -- | @getMsg chan ack queueName@ gets a message from the specified queue. If @ack=='Ack'@, you have to call 'ackMsg' or 'ackEnv' for any message that you get, otherwise it might be delivered again in the future (by calling 'recoverMsgs') getMsg :: Channel -> Ack -> String -> IO (Maybe (Message, Envelope)) getMsg chan ack queueName = do ret <- request chan (SimpleMethod (Basic_get 1 -- ticket (ShortString queueName) -- queue (ackToBool ack) -- no_ack )) case ret of ContentMethod (Basic_get_ok deliveryTag redelivered (ShortString exchangeName) (ShortString routingKey) msgCount) properties msgBody -> return $ Just $ (msgFromContentHeaderProperties properties msgBody, Envelope {envDeliveryTag = deliveryTag, envRedelivered = redelivered, envExchangeName = exchangeName, envRoutingKey = routingKey, envChannel = chan}) _ -> return Nothing {- | @ackMsg chan deliveryTag multiple@ acknowledges one or more messages. if @multiple==True@, the @deliverTag@ is treated as \"up to and including\", so that the client can acknowledge multiple messages with a single method call. If @multiple==False@, @deliveryTag@ refers to a single message. If @multiple==True@, and @deliveryTag==0@, tells the server to acknowledge all outstanding mesages. -} ackMsg :: Channel -> LongLongInt -> Bool -> IO () ackMsg chan deliveryTag multiple = writeAssembly chan $ (SimpleMethod (Basic_ack deliveryTag -- delivery_tag multiple -- multiple )) -- | Acknowledges a single message. This is a wrapper for 'ackMsg' in case you have the 'Envelope' at hand. ackEnv :: Envelope -> IO () ackEnv env = ackMsg (envChannel env) (envDeliveryTag env) False -- | @rejectMsg chan deliveryTag requeue@ allows a client to reject a message. It can be used to interrupt and cancel large incoming messages, or return untreatable messages to their original queue. If @requeue==False@, the message will be discarded. If it is 'True', the server will attempt to requeue the message. -- -- NOTE: RabbitMQ 1.7 doesn't implement this command rejectMsg :: Channel -> LongLongInt -> Bool -> IO () rejectMsg chan deliveryTag requeue = writeAssembly chan $ (SimpleMethod (Basic_reject deliveryTag -- delivery_tag requeue -- requeue )) -- | @recoverMsgs chan requeue@ asks the broker to redeliver all messages that were received but not acknowledged on the specified channel. --If @requeue==False@, the message will be redelivered to the original recipient. If @requeue==True@, the server will attempt to requeue the message, potentially then delivering it to an alternative subscriber. recoverMsgs :: Channel -> Bool -> IO () recoverMsgs chan requeue = writeAssembly chan $ (SimpleMethod (Basic_recover requeue -- requeue )) ------------------- TRANSACTIONS (TX) -------------------------- -- | This method sets the channel to use standard transactions. The client must use this method at least once on a channel before using the Commit or Rollback methods. txSelect :: Channel -> IO () txSelect chan = do (SimpleMethod Tx_select_ok) <- request chan $ SimpleMethod Tx_select return () -- | This method commits all messages published and acknowledged in the current transaction. A new transaction starts immediately after a commit. txCommit :: Channel -> IO () txCommit chan = do (SimpleMethod Tx_commit_ok) <- request chan $ SimpleMethod Tx_commit return () -- | This method abandons all messages published and acknowledged in the current transaction. A new transaction starts immediately after a rollback. txRollback :: Channel -> IO () txRollback chan = do (SimpleMethod Tx_rollback_ok) <- request chan $ SimpleMethod Tx_rollback return () --------------------- FLOW CONTROL ------------------------ {- | @flow chan active@ tells the AMQP server to pause or restart the flow of content data. This is a simple flow-control mechanism that a peer can use to avoid overflowing its queues or otherwise finding itself receiving more messages than it can process. If @active==True@ the server will start sending content data, if @active==False@ the server will stop sending content data. A new channel is always active by default. NOTE: RabbitMQ 1.7 doesn't implement this command. -} flow :: Channel -> Bool -> IO () flow chan active = do (SimpleMethod (Channel_flow_ok _)) <- request chan $ SimpleMethod (Channel_flow active) return () -------------------------- MESSAGE / ENVELOPE ------------------ -- | contains meta-information of a delivered message (through 'getMsg' or 'consumeMsgs') data Envelope = Envelope { envDeliveryTag :: LongLongInt, envRedelivered :: Bool, envExchangeName :: String, envRoutingKey :: String, envChannel :: Channel } data DeliveryMode = Persistent -- ^ the message will survive server restarts (if the queue is durable) | NonPersistent -- ^ the message may be lost after server restarts deriving Show deliveryModeToInt NonPersistent = 1 deliveryModeToInt Persistent = 2 intToDeliveryMode 1 = NonPersistent intToDeliveryMode 2 = Persistent -- | An AMQP message data Message = Message { msgBody :: BL.ByteString, -- ^ the content of your message msgDeliveryMode :: Maybe DeliveryMode, -- ^ see 'DeliveryMode' msgTimestamp :: Maybe Timestamp, -- ^ use in any way you like; this doesn't affect the way the message is handled msgID :: Maybe String, -- ^ use in any way you like; this doesn't affect the way the message is handled msgContentType :: Maybe String } deriving Show -- | a 'Msg' with defaults set; you should override at least 'msgBody' newMsg :: Message newMsg = Message (BL.empty) Nothing Nothing Nothing Nothing ------------- ASSEMBLY ------------------------- -- an assembly is a higher-level object consisting of several frames (like in amqp 0-10) data Assembly = SimpleMethod MethodPayload | ContentMethod MethodPayload ContentHeaderProperties BL.ByteString --method, properties, content-data deriving Show -- | reads all frames necessary to build an assembly readAssembly :: Chan FramePayload -> IO Assembly readAssembly chan = do m <- readChan chan case m of MethodPayload p -> --got a method frame if hasContent m then do --several frames containing the content will follow, so read them (props, msg) <- collectContent chan return $ ContentMethod p props msg else do return $ SimpleMethod p x -> error $ "didn't expect frame: "++(show x) -- | reads a contentheader and contentbodies and assembles them collectContent :: Chan FramePayload -> IO (ContentHeaderProperties, BL.ByteString) collectContent chan = do (ContentHeaderPayload _ _ bodySize props) <- readChan chan content <- collect $ fromIntegral bodySize return (props, BL.concat content) where collect x | x <= 0 = return [] collect rem = do (ContentBodyPayload payload) <- readChan chan r <- collect (rem - (BL.length payload)) return $ payload : r ------------ CONNECTION ------------------- {- general concept: Each connection has its own thread. Each channel has its own thread. Connection reads data from socket and forwards it to channel. Channel processes data and forwards it to application. Outgoing data is written directly onto the socket. Incoming Data: Socket -> Connection-Thread -> Channel-Thread -> Application Outgoing Data: Application -> Socket -} data Connection = Connection { connSocket :: Socket, connChannels :: (MVar (IM.IntMap (Channel, ThreadId))), --open channels (channelID => (Channel, ChannelThread)) connMaxFrameSize :: Int, --negotiated maximum frame size connClosed :: MVar (Maybe String), connWriteLock :: MVar (), -- to ensure atomic writes to the socket lastChannelID :: MVar Int --for auto-incrementing the channelIDs } -- | reads incoming frames from socket and forwards them to the opened channels connectionReceiver :: Connection -> IO () connectionReceiver conn = do (Frame chanID payload) <- readFrameSock (connSocket conn) (connMaxFrameSize conn) forwardToChannel chanID payload connectionReceiver conn where forwardToChannel 0 (MethodPayload Connection_close_ok) = do modifyMVar_ (connClosed conn) $ \x -> return $ Just "closed by user" killThread =<< myThreadId forwardToChannel 0 (MethodPayload (Connection_close _ (ShortString errorMsg) _ _ )) = do modifyMVar_ (connClosed conn) $ \x -> return $ Just errorMsg killThread =<< myThreadId forwardToChannel 0 payload = print $ "Got unexpected msg on channel zero: "++(show payload) forwardToChannel chanID payload = do --got asynchronous msg => forward to registered channel withMVar (connChannels conn) $ \cs -> do case IM.lookup (fromIntegral chanID) cs of Just c -> writeChan (inQueue $ fst c) payload Nothing -> print $ "ERROR: channel not open "++(show chanID) -- | @openConnection hostname virtualHost loginName loginPassword@ opens a connection to an AMQP server running on @hostname@. -- @virtualHost@ is used as a namespace for AMQP resources (default is \"/\"), so different applications could use multiple virtual hosts on the same AMQP server -- -- NOTE: If the login name, password or virtual host are invalid, this method will throw a 'ConnectionClosedException'. The exception will not contain a reason why the connection was closed, so you'll have to find out yourself. openConnection :: String -> String -> String -> String -> IO Connection openConnection host vhost loginName loginPassword = openConnection' host 5672 vhost loginName loginPassword -- | same as 'openConnection' but allows you to specify a non-default port-number as the 2nd parameter openConnection' :: String -> PortNumber -> String -> String -> String -> IO Connection openConnection' host port vhost loginName loginPassword = do proto <- getProtocolNumber "tcp" sock <- socket AF_INET Stream proto addr <- inet_addr host connect sock (SockAddrInet port addr) NB.send sock $ toStrict $ BPut.runPut $ do BPut.putByteString $ BS.pack "AMQP" BPut.putWord8 1 BPut.putWord8 1 --TCP/IP BPut.putWord8 9 --Major Version BPut.putWord8 1 --Minor Version -- S: connection.start Frame 0 (MethodPayload (Connection_start version_major version_minor server_properties mechanisms locales)) <- readFrameSock sock 4096 -- C: start_ok writeFrameSock sock start_ok -- S: tune Frame 0 (MethodPayload (Connection_tune channel_max frame_max heartbeat)) <- readFrameSock sock 4096 -- C: tune_ok let maxFrameSize = (min 131072 frame_max) writeFrameSock sock (Frame 0 (MethodPayload --TODO: handle channel_max (Connection_tune_ok 0 maxFrameSize 0) )) -- C: open writeFrameSock sock open -- S: open_ok Frame 0 (MethodPayload (Connection_open_ok _)) <- readFrameSock sock $ fromIntegral maxFrameSize -- Connection established! --build Connection object connChannels <- newMVar IM.empty lastChanID <- newMVar 0 cClosed <- newMVar Nothing writeLock <- newMVar () let conn = Connection sock connChannels (fromIntegral maxFrameSize) cClosed writeLock lastChanID --spawn the connectionReceiver forkIO $ CE.finally (connectionReceiver conn) (do --this will be run when a connection gets killed (probably because of socket failure) sClose sock -- mark as closed modifyMVar_ cClosed $ \x -> return $ Just $ maybe "closed" id x --kill all channel-threads withMVar connChannels $ \cc -> mapM_ (\c -> killThread $ snd c) $ IM.elems cc withMVar connChannels $ \cc -> return $ IM.empty ) return conn where start_ok = (Frame 0 (MethodPayload (Connection_start_ok (FieldTable (M.fromList [])) (ShortString "AMQPLAIN") --login has to be a table without first 4 bytes (LongString (drop 4 $ BL.unpack $ runPut $ put $ FieldTable (M.fromList [(ShortString "LOGIN",FVLongString $ LongString loginName), (ShortString "PASSWORD", FVLongString $ LongString loginPassword)]))) (ShortString "en_US")) )) open = (Frame 0 (MethodPayload (Connection_open (ShortString vhost) --virtual host (ShortString "") -- capabilities True))) --insist; True because we don't support redirect yet -- | closes a connection closeConnection :: Connection -> IO () closeConnection c = do CE.catch ( withMVar (connWriteLock c) $ \_ -> writeFrameSock (connSocket c) $ (Frame 0 (MethodPayload (Connection_close --TODO: set these values 0 -- reply_code (ShortString "") -- reply_text 0 -- class_id 0 -- method_id ))) ) (\ (e::CE.IOException) -> do --do nothing if connection is already closed return () ) -- connection_close_ok will be handled in the "connectionReceiver" method above return () readFrameSock :: Socket -> Int -> IO Frame readFrameSock sock maxFrameSize = do dat <- recvExact 7 let len = fromIntegral $ peekFrameSize dat dat' <- recvExact (len+1) -- +1 for the terminating 0xCE let (frame, rest, consumedBytes) = runGetState get (BL.append dat dat') 0 return frame where recvExact bytes = recvExact' bytes $ BL.empty recvExact' bytes buf = do dat <- NB.recv sock (min bytes maxFrameSize) if BS.length dat == 0 then CE.throwIO $ ConnectionClosedException "" else do let buf' = BL.append buf (toLazy dat) bufLen = fromIntegral $ BL.length buf' if bufLen >= bytes then return buf' else recvExact' (bytes-bufLen) buf' writeFrameSock :: Socket -> Frame -> IO () writeFrameSock sock x = do NB.send sock $ toStrict $ runPut $ put x return () ------------------------ CHANNEL ----------------------------- {- | A connection to an AMQP server is made up of separate channels. It is recommended to use a separate channel for each thread in your application that talks to the AMQP server (but you don't have to as channels are thread-safe) -} data Channel = Channel { connection :: Connection, inQueue :: Chan FramePayload, --incoming frames (from Connection) outstandingResponses :: Chan (MVar Assembly), -- for every request an MVar is stored here waiting for the response channelID :: Word16, lastConsumerTag :: MVar Int, chanActive :: Lock, -- used for flow-control. if lock is closed, no content methods will be sent chanClosed :: MVar (Maybe String), consumers :: MVar (M.Map String ((Message, Envelope) -> IO ())) -- who is consumer of a queue? (consumerTag => callback) } msgFromContentHeaderProperties :: ContentHeaderProperties -> BL.ByteString -> Message msgFromContentHeaderProperties (CHBasic content_type content_encoding headers delivery_mode priority correlation_id reply_to expiration message_id timestamp typ user_id app_id cluster_id) msgBody = let msgId = case message_id of (Just (ShortString s)) -> Just s _ -> Nothing contentType = case content_type of (Just (ShortString s)) -> Just s _ -> Nothing in Message msgBody (fmap intToDeliveryMode delivery_mode) timestamp msgId contentType -- | The thread that is run for every channel channelReceiver :: Channel -> IO () channelReceiver chan = do --read incoming frames; they are put there by a Connection thread p <- readAssembly $ inQueue chan if isResponse p then do emp <- isEmptyChan $ outstandingResponses chan if emp then CE.throwIO $ userError "got response, but have no corresponding request" else do x <- readChan (outstandingResponses chan) putMVar x p --handle asynchronous assemblies else handleAsync p channelReceiver chan where isResponse :: Assembly -> Bool isResponse (ContentMethod (Basic_deliver _ _ _ _ _) _ _) = False isResponse (ContentMethod (Basic_return _ _ _ _) _ _) = False isResponse (SimpleMethod (Channel_flow _)) = False isResponse (SimpleMethod (Channel_close _ _ _ _)) = False isResponse _ = True --Basic.Deliver: forward msg to registered consumer handleAsync (ContentMethod (Basic_deliver (ShortString consumerTag) deliveryTag redelivered (ShortString exchangeName) (ShortString routingKey)) properties msgBody) = withMVar (consumers chan) (\s -> do let subscriber = fromJust $ M.lookup consumerTag s let msg = msgFromContentHeaderProperties properties msgBody let env = Envelope {envDeliveryTag = deliveryTag, envRedelivered = redelivered, envExchangeName = exchangeName, envRoutingKey = routingKey, envChannel = chan} subscriber (msg, env) ) handleAsync (SimpleMethod (Channel_close errorNum (ShortString errorMsg) _ _)) = do modifyMVar_ (chanClosed chan) $ \x -> return $ Just errorMsg closeChannel' chan killThread =<< myThreadId handleAsync (SimpleMethod (Channel_flow active)) = do if active then openLock $ chanActive chan else closeLock $ chanActive chan -- in theory we should respond with flow_ok but rabbitMQ 1.7 ignores that, so it doesn't matter return () --Basic.return handleAsync (ContentMethod (Basic_return replyCode replyText exchange routingKey) properties msgData) = --TODO: implement handling -- this won't be called currently, because publishMsg sets "mandatory" and "immediate" to false print "BASIC.RETURN not implemented" -- closes the channel internally; but doesn't tell the server closeChannel' c = do modifyMVar_ (connChannels $ connection c) $ \old -> return $ IM.delete (fromIntegral $ channelID c) old -- mark channel as closed modifyMVar_ (chanClosed c) $ \x -> do killLock $ chanActive c killOutstandingResponses $ outstandingResponses c return $ Just $ maybe "closed" id x where killOutstandingResponses :: Chan (MVar a) -> IO () killOutstandingResponses chan = do emp <- isEmptyChan chan if emp then return () else do x <- readChan chan tryPutMVar x $ error "channel closed" killOutstandingResponses chan -- | opens a new channel on the connection -- -- There's currently no closeChannel method, but you can always just close the connection (the maximum number of channels is 65535). openChannel :: Connection -> IO Channel openChannel c = do newInQueue <- newChan outRes <- newChan lastConsumerTag <- newMVar 0 ca <- newLock chanClosed <- newMVar Nothing consumers <- newMVar M.empty --get a new unused channelID newChannelID <- modifyMVar (lastChannelID c) $ \x -> return (x+1,x+1) let newChannel = Channel c newInQueue outRes (fromIntegral newChannelID) lastConsumerTag ca chanClosed consumers thrID <- forkIO $ CE.finally (channelReceiver newChannel) (closeChannel' newChannel) --add new channel to connection's channel map modifyMVar_ (connChannels c) (\oldMap -> return $ IM.insert newChannelID (newChannel, thrID) oldMap) (SimpleMethod Channel_open_ok) <- request newChannel (SimpleMethod (Channel_open (ShortString ""))) return newChannel -- | writes multiple frames to the channel atomically writeFrames :: Channel -> [FramePayload] -> IO () writeFrames chan payloads = let conn = connection chan in withMVar (connChannels conn) $ \chans -> if IM.member (fromIntegral $ channelID chan) chans then CE.catch -- ensure at most one thread is writing to the socket at any time (withMVar (connWriteLock conn) $ \_ -> mapM_ (\payload -> writeFrameSock (connSocket conn) (Frame (channelID chan) payload)) payloads) ( \(e :: CE.IOException) -> do CE.throwIO $ userError "connection not open" ) else do CE.throwIO $ userError "channel not open" writeAssembly' :: Channel -> Assembly -> IO () writeAssembly' chan (ContentMethod m properties msg) = do -- wait iff the AMQP server instructed us to withhold sending content data (flow control) waitLock $ chanActive chan let !toWrite = [(MethodPayload m), (ContentHeaderPayload (getClassIDOf properties) --classID 0 --weight is deprecated in AMQP 0-9 (fromIntegral $ BL.length msg) --bodySize properties)] ++ (if BL.length msg > 0 then do --split into frames of maxFrameSize map ContentBodyPayload (splitLen msg (fromIntegral $ connMaxFrameSize $ connection chan)) else [] ) writeFrames chan toWrite where splitLen str len | BL.length str > len = (BL.take len str):(splitLen (BL.drop len str) len) splitLen str _ = [str] writeAssembly' chan (SimpleMethod m) = do writeFrames chan [MethodPayload m] -- most exported functions in this module will use either 'writeAssembly' or 'request' to talk to the server -- so we perform the exception handling here -- | writes an assembly to the channel writeAssembly :: Channel -> Assembly -> IO () writeAssembly chan m = CE.catches (writeAssembly' chan m) [CE.Handler (\ (ex :: AMQPException) -> throwMostRelevantAMQPException chan), CE.Handler (\ (ex :: CE.ErrorCall) -> throwMostRelevantAMQPException chan), CE.Handler (\ (ex :: CE.IOException) -> throwMostRelevantAMQPException chan)] -- | sends an assembly and receives the response request :: Channel -> Assembly -> IO Assembly request chan m = do res <- newEmptyMVar CE.catches (do withMVar (chanClosed chan) $ \cc -> do if isNothing cc then do writeChan (outstandingResponses chan) res writeAssembly' chan m else CE.throwIO $ userError "closed" -- res might contain an exception, so evaluate it here !r <- takeMVar res return r ) [CE.Handler (\ (ex :: AMQPException) -> throwMostRelevantAMQPException chan), CE.Handler (\ (ex :: CE.ErrorCall) -> throwMostRelevantAMQPException chan), CE.Handler (\ (ex :: CE.IOException) -> throwMostRelevantAMQPException chan)] -- this throws an AMQPException based on the status of the connection and the channel -- if both connection and channel are closed, it will throw a ConnectionClosedException throwMostRelevantAMQPException chan = do cc <- readMVar $ connClosed $ connection chan case cc of Just r -> CE.throwIO $ ConnectionClosedException r Nothing -> do chc <- readMVar $ chanClosed chan case chc of Just r -> CE.throwIO $ ChannelClosedException r Nothing -> CE.throwIO $ ConnectionClosedException "unknown reason" ----------------------------- EXCEPTIONS --------------------------- data AMQPException = -- | the 'String' contains the reason why the channel was closed ChannelClosedException String | ConnectionClosedException String -- ^ String may contain a reason deriving (Typeable, Show, Ord, Eq) instance CE.Exception AMQPException