{-# LANGUAGE BangPatterns, CPP, OverloadedStrings, ScopedTypeVariables, DeriveDataTypeable #-} module Network.AMQP.Internal where import Paths_RabbitMQ(version) import Data.Version(showVersion) import Control.Concurrent import Control.Concurrent.STM import Data.Binary import Data.Binary.Get import Data.Binary.Put as BPut import Network.Socket (PortNumber, withSocketsDo) import System.IO (hPutStrLn, stderr) import qualified Control.Exception as CE import qualified Data.ByteString as BS import qualified Data.ByteString.Char8 as BC import qualified Data.ByteString.Lazy as BL import qualified Data.Map as M import qualified Data.Foldable as F import qualified Data.IntMap as IM import qualified Data.IntSet as IntSet import qualified Data.Sequence as Seq import qualified Data.Text as T import qualified Data.Text.Encoding as E import Network.AMQP.Prelude import Network.AMQP.Protocol import Network.AMQP.Types import Network.AMQP.Helpers import Network.AMQP.Generated import Network.AMQP.ChannelAllocator import qualified Network.AMQP.Connection as Conn data AckType = BasicAck | BasicNack deriving Show data DeliveryMode = Persistent -- ^ the message will survive server restarts (if the queue is durable) | NonPersistent -- ^ the message may be lost after server restarts deriving (Eq, Ord, Read, Show) deliveryModeToInt :: DeliveryMode -> Octet deliveryModeToInt NonPersistent = 1 deliveryModeToInt Persistent = 2 intToDeliveryMode :: Octet -> DeliveryMode intToDeliveryMode 1 = NonPersistent intToDeliveryMode 2 = Persistent intToDeliveryMode n = error ("Unknown delivery mode int: " ++ show n) -- | An AMQP message data Message = Message { msgBody :: BL.ByteString, -- ^ the content of your message msgDeliveryMode :: Maybe DeliveryMode, -- ^ see 'DeliveryMode' msgTimestamp :: Maybe Timestamp, -- ^ use in any way you like; this doesn't affect the way the message is handled msgID :: Maybe Text, -- ^ use in any way you like; this doesn't affect the way the message is handled msgType :: Maybe Text, -- ^ use in any way you like; this doesn't affect the way the message is handled msgUserID :: Maybe Text, msgApplicationID :: Maybe Text, msgClusterID :: Maybe Text, msgContentType :: Maybe Text, msgContentEncoding :: Maybe Text, msgReplyTo :: Maybe Text, msgPriority :: Maybe Octet, msgCorrelationID :: Maybe Text, msgExpiration :: Maybe Text, msgHeaders :: Maybe FieldTable } deriving (Eq, Ord, Read, Show) -- | contains meta-information of a delivered message (through 'getMsg' or 'consumeMsgs') data Envelope = Envelope { envDeliveryTag :: LongLongInt, envRedelivered :: Bool, envExchangeName :: Text, envRoutingKey :: Text, envChannel :: Channel } data PublishError = PublishError { errReplyCode :: ReturnReplyCode, errExchange :: Maybe Text, errRoutingKey :: Text } deriving (Eq, Read, Show) data ReturnReplyCode = Unroutable Text | NoConsumers Text | NotFound Text deriving (Eq, Read, Show) ------------- ASSEMBLY ------------------------- -- an assembly is a higher-level object consisting of several frames (like in amqp 0-10) data Assembly = SimpleMethod MethodPayload | ContentMethod MethodPayload ContentHeaderProperties BL.ByteString --method, properties, content-data deriving Show -- | reads all frames necessary to build an assembly readAssembly :: Chan FramePayload -> IO Assembly readAssembly chan = do m <- readChan chan case m of MethodPayload p -> --got a method frame if hasContent m then do --several frames containing the content will follow, so read them (props, msg) <- collectContent chan pure $ ContentMethod p props msg else do pure $ SimpleMethod p x -> error $ "didn't expect frame: " ++ show x -- | reads a contentheader and contentbodies and assembles them collectContent :: Chan FramePayload -> IO (ContentHeaderProperties, BL.ByteString) collectContent chan = do (ContentHeaderPayload _ _ bodySize props) <- readChan chan content <- collect $ fromIntegral bodySize pure (props, BL.concat content) where collect x | x <= 0 = pure [] collect x = do (ContentBodyPayload payload) <- readChan chan r <- collect (x - (BL.length payload)) pure $ payload : r ------------ CONNECTION ------------------- {- general concept: Each connection has its own thread. Each channel has its own thread. Connection reads data from socket and forwards it to channel. Channel processes data and forwards it to application. Outgoing data is written directly onto the socket. Incoming Data: Socket -> Connection-Thread -> Channel-Thread -> Application Outgoing Data: Application -> Socket -} data Connection = Connection { connHandle :: Conn.Connection, connChanAllocator :: ChannelAllocator, connChannels :: MVar (IM.IntMap (Channel, ThreadId)), -- open channels (channelID => (Channel, ChannelThread)) connMaxFrameSize :: Int, --negotiated maximum frame size connClosed :: MVar (Maybe (CloseType, String)), connClosedLock :: MVar (), -- used by closeConnection to block until connection-close handshake is complete connWriteLock :: MVar (), -- to ensure atomic writes to the socket connClosedHandlers :: MVar [IO ()], connBlockedHandlers :: MVar [(Text -> IO (), IO ())], connLastReceived :: MVar Int64, -- the timestamp from a monotonic clock when the last frame was received connLastSent :: MVar Int64, -- the timestamp from a monotonic clock when the last frame was written connServerProperties :: FieldTable -- the server properties sent in Connection_start } -- | Represents the parameters to connect to a broker or a cluster of brokers. -- See 'defaultConnectionOpts'. data ConnectionOpts = ConnectionOpts { coServers :: ![(String, PortNumber)], -- ^ A list of host-port pairs. Useful in a clustered setup to connect to the first available host. coVHost :: !Text, -- ^ The VHost to connect to. coAuth :: ![SASLMechanism], -- ^ The 'SASLMechanism's to use for authenticating with the broker. coMaxFrameSize :: !(Maybe Word32), -- ^ The maximum frame size to be used. If not specified, no limit is assumed. coHeartbeatDelay :: !(Maybe Word16), -- ^ The delay in seconds, after which the client expects a heartbeat frame from the broker. If 'Nothing', the value suggested by the broker is used. Use @Just 0@ to disable the heartbeat mechnism. coMaxChannel :: !(Maybe Word16), -- ^ The maximum number of channels the client will use. coTLSSettings :: !(Maybe Conn.SSLContext), -- ^ Whether or not to connect to servers using TLS. See http://www.rabbitmq.com/ssl.html for details. coName :: !(Maybe Text) -- ^ optional connection name (will be displayed in the RabbitMQ web interface) } -- | A 'SASLMechanism' is described by its name ('saslName'), its initial response ('saslInitialResponse'), and an optional function ('saslChallengeFunc') that -- transforms a security challenge provided by the server into response, which is then sent back to the server for verification. data SASLMechanism = SASLMechanism { saslName :: !Text, -- ^ mechanism name saslInitialResponse :: !BS.ByteString, -- ^ initial response saslChallengeFunc :: !(Maybe (BS.ByteString -> IO BS.ByteString)) -- ^ challenge processing function } -- | reads incoming frames from socket and forwards them to the opened channels connectionReceiver :: Connection -> IO () connectionReceiver conn = do CE.catch (do Frame chanID payload <- readFrame (connHandle conn) updateLastReceived conn forwardToChannel chanID payload ) (\(e :: CE.IOException) -> myThreadId >>= killConnection conn Abnormal (CE.toException e)) connectionReceiver conn where closedByUserEx = ConnectionClosedException Normal "closed by user" forwardToChannel 0 (MethodPayload Connection_close_ok) = myThreadId >>= killConnection conn Normal (CE.toException closedByUserEx) forwardToChannel 0 (MethodPayload (Connection_close _ (ShortString errorMsg) _ _)) = do writeFrame (connHandle conn) $ Frame 0 $ MethodPayload Connection_close_ok myThreadId >>= killConnection conn Abnormal (CE.toException . ConnectionClosedException Abnormal . T.unpack $ errorMsg) forwardToChannel 0 HeartbeatPayload = pure () forwardToChannel 0 (MethodPayload (Connection_blocked reason)) = handleBlocked reason forwardToChannel 0 (MethodPayload Connection_unblocked) = handleUnblocked forwardToChannel 0 payload = hPutStrLn stderr $ "Got unexpected msg on channel zero: " ++ show payload forwardToChannel chanID payload = do --got asynchronous msg => forward to registered channel withMVar (connChannels conn) $ \cs -> do case IM.lookup (fromIntegral chanID) cs of Just c -> writeChan (inQueue $ fst c) payload Nothing -> hPutStrLn stderr $ "ERROR: channel not open " ++ show chanID handleBlocked (ShortString reason) = do withMVar (connBlockedHandlers conn) $ \listeners -> forM_ listeners $ \(l, _) -> CE.catch (l reason) $ \(ex :: CE.SomeException) -> hPutStrLn stderr $ "connection blocked listener threw exception: "++ show ex handleUnblocked = do withMVar (connBlockedHandlers conn) $ \listeners -> forM_ listeners $ \(_, l) -> CE.catch l $ \(ex :: CE.SomeException) -> hPutStrLn stderr $ "connection unblocked listener threw exception: "++ show ex -- | Opens a connection to a broker specified by the given 'ConnectionOpts' parameter. openConnection'' :: ConnectionOpts -> IO Connection openConnection'' connOpts = withSocketsDo $ do handle <- connect [] $ coServers connOpts (maxFrameSize, maxChannel, heartbeatTimeout, serverProps) <- CE.handle (\(_ :: CE.IOException) -> CE.throwIO $ ConnectionClosedException Abnormal "Handshake failed. Please check the RabbitMQ logs for more information") $ do Conn.connectionPut handle $ BS.append (BC.pack "AMQP") (BS.pack [ 1 , 1 --TCP/IP , 0 --Major Version , 9 --Minor Version ]) -- S: connection.start Frame 0 (MethodPayload (Connection_start _ _ serverProps (LongString serverMechanisms) _)) <- readFrame handle selectedSASL <- selectSASLMechanism handle serverMechanisms -- C: start_ok writeFrame handle $ start_ok selectedSASL -- S: secure or tune Frame 0 (MethodPayload (Connection_tune channel_max frame_max sendHeartbeat)) <- handleSecureUntilTune handle selectedSASL -- C: tune_ok let maxFrameSize = chooseMin frame_max $ coMaxFrameSize connOpts finalHeartbeatSec = fromMaybe sendHeartbeat (coHeartbeatDelay connOpts) heartbeatTimeout = mfilter (/=0) (Just finalHeartbeatSec) fixChanNum x = if x == 0 then 65535 else x maxChannel = chooseMin (fixChanNum channel_max) $ fmap fixChanNum $ coMaxChannel connOpts writeFrame handle (Frame 0 (MethodPayload (Connection_tune_ok maxChannel maxFrameSize finalHeartbeatSec) )) -- C: open writeFrame handle open -- S: open_ok Frame 0 (MethodPayload (Connection_open_ok _)) <- readFrame handle -- Connection established! pure (maxFrameSize, maxChannel, heartbeatTimeout, serverProps) --build Connection object cChannels <- newMVar IM.empty cClosed <- newMVar Nothing cChanAllocator <- newChannelAllocator $ fromIntegral maxChannel _ <- allocateChannel cChanAllocator -- allocate channel 0 writeLock <- newMVar () ccl <- newEmptyMVar cClosedHandlers <- newMVar [] cBlockedHandlers <- newMVar [] cLastReceived <- getTimestamp >>= newMVar cLastSent <- getTimestamp >>= newMVar let conn = Connection handle cChanAllocator cChannels (fromIntegral maxFrameSize) cClosed ccl writeLock cClosedHandlers cBlockedHandlers cLastReceived cLastSent serverProps --spawn the connectionReceiver connThread <- forkFinally (connectionReceiver conn) $ \res -> do -- try closing socket CE.catch (Conn.connectionClose handle) (\(_ :: CE.SomeException) -> pure ()) -- mark as closed modifyMVar_ cClosed $ pure . Just . fromMaybe (Abnormal, "unknown reason") -- kill all channel-threads, making sure the channel threads will -- be killed taking into account the overall state of the -- connection: if the thread died for an unexpected exception, -- inform the channel threads downstream accordingly. Otherwise -- just use a normal 'killThread' finaliser. let finaliser = ChanThreadKilledException $ case res of Left ex -> ex Right _ -> CE.toException CE.ThreadKilled modifyMVar_ cChannels $ \x -> do mapM_ (flip CE.throwTo finaliser . snd) $ IM.elems x pure IM.empty -- mark connection as closed, so all pending calls to 'closeConnection' can now return void $ tryPutMVar ccl () -- notify connection-close-handlers withMVar cClosedHandlers sequence_ case heartbeatTimeout of Nothing -> pure () Just timeout -> do heartbeatThread <- watchHeartbeats conn (fromIntegral timeout) connThread addConnectionClosedHandler conn True (killThread heartbeatThread) pure conn where connect excs ((host, port) : rest) = do result <- CE.try (Conn.connectTo Conn.ConnectionParams { Conn.connectionHostname = host , Conn.connectionPort = port , Conn.connectionSecure = coTLSSettings connOpts }) either (\(ex :: CE.SomeException) -> do connect (ex:excs) rest) (pure) result connect excs [] = CE.throwIO $ ConnectionClosedException Abnormal $ "Could not connect to any of the provided brokers: " ++ show (zip (coServers connOpts) (reverse excs)) selectSASLMechanism handle serverMechanisms = let serverSaslList = T.split (== ' ') $ E.decodeUtf8 serverMechanisms clientMechanisms = coAuth connOpts clientSaslList = map saslName clientMechanisms maybeSasl = F.find (\(SASLMechanism name _ _) -> elem name serverSaslList) clientMechanisms in abortIfNothing maybeSasl handle ("None of the provided SASL mechanisms "++show clientSaslList++" is supported by the server "++show serverSaslList++".") start_ok sasl = (Frame 0 (MethodPayload (Connection_start_ok clientProperties (ShortString $ saslName sasl) (LongString $ saslInitialResponse sasl) (ShortString "en_US")) )) where clientProperties = FieldTable $ M.fromList $ [ ("platform", FVString "Haskell") , ("version" , FVString . T.pack $ showVersion version) , ("capabilities", FVFieldTable clientCapabilities) ] ++ maybe [] (\x -> [("connection_name", FVString x)]) (coName connOpts) clientCapabilities = FieldTable $ M.fromList $ [ ("consumer_cancel_notify", FVBool True), ("connection.blocked", FVBool True) ] handleSecureUntilTune handle sasl = do tuneOrSecure <- readFrame handle case tuneOrSecure of Frame 0 (MethodPayload (Connection_secure (LongString challenge))) -> do processChallenge <- abortIfNothing (saslChallengeFunc sasl) handle $ "The server provided a challenge, but the selected SASL mechanism "++show (saslName sasl)++" is not equipped with a challenge processing function." challengeResponse <- processChallenge challenge writeFrame handle (Frame 0 (MethodPayload (Connection_secure_ok (LongString challengeResponse)))) handleSecureUntilTune handle sasl tune@(Frame 0 (MethodPayload (Connection_tune _ _ _))) -> pure tune x -> error $ "handleSecureUntilTune fail. received message: "++show x open = (Frame 0 (MethodPayload (Connection_open (ShortString $ coVHost connOpts) (ShortString $ T.pack "") -- capabilities; deprecated in 0-9-1 True))) -- insist; deprecated in 0-9-1 abortHandshake handle msg = do Conn.connectionClose handle CE.throwIO $ ConnectionClosedException Abnormal msg abortIfNothing m handle msg = case m of Nothing -> abortHandshake handle msg Just a -> pure a watchHeartbeats :: Connection -> Int -> ThreadId -> IO ThreadId watchHeartbeats conn timeout connThread = scheduleAtFixedRate rate $ do checkSendTimeout checkReceiveTimeout where rate = timeout * 1000 * 250 -- timeout / 4 in µs receiveTimeout = (fromIntegral rate) * 4 * 2 -- 2*timeout in µs sendTimeout = (fromIntegral rate) * 2 -- timeout/2 in µs skippedBeatEx = ConnectionClosedException Abnormal "killed connection after missing 2 consecutive heartbeats" checkReceiveTimeout = doCheck (connLastReceived conn) receiveTimeout (killConnection conn Abnormal (CE.toException skippedBeatEx) connThread) checkSendTimeout = doCheck (connLastSent conn) sendTimeout (writeFrame (connHandle conn) (Frame 0 HeartbeatPayload)) doCheck var timeout_µs action = withMVar var $ \lastFrameTime -> do time <- getTimestamp when (time >= lastFrameTime + timeout_µs) $ do action updateLastSent :: Connection -> IO () updateLastSent conn = modifyMVar_ (connLastSent conn) (const getTimestamp) updateLastReceived :: Connection -> IO () updateLastReceived conn = modifyMVar_ (connLastReceived conn) (const getTimestamp) -- | kill the connection thread abruptly killConnection :: Connection -> CloseType -> CE.SomeException -> ThreadId -> IO () killConnection conn closeType ex connThread = do modifyMVar_ (connClosed conn) $ const $ pure $ Just (closeType, show ex) throwTo connThread ex -- | closes a connection -- -- Make sure to call this function before your program exits to ensure that all published messages are received by the server. closeConnection :: Connection -> IO () closeConnection c = do CE.catch ( withMVar (connWriteLock c) $ \_ -> writeFrame (connHandle c) $ (Frame 0 (MethodPayload (Connection_close --TODO: set these values 0 -- reply_code (ShortString "") -- reply_text 0 -- class_id 0 -- method_id ))) ) (\ (_ :: CE.IOException) -> --do nothing if connection is already closed pure () ) -- wait for connection_close_ok by the server; this MVar gets filled in the CE.finally handler in openConnection' readMVar $ connClosedLock c pure () -- | get the server properties sent in connection.start getServerProperties :: Connection -> IO FieldTable getServerProperties = pure . connServerProperties -- | @addConnectionClosedHandler conn ifClosed handler@ adds a @handler@ that will be called after the connection is closed (either by calling @closeConnection@ or by an exception). If the @ifClosed@ parameter is True and the connection is already closed, the handler will be called immediately. If @ifClosed == False@ and the connection is already closed, the handler will never be called addConnectionClosedHandler :: Connection -> Bool -> IO () -> IO () addConnectionClosedHandler conn ifClosed handler = do withMVar (connClosed conn) $ \cc -> case cc of -- connection is already closed, so call the handler directly Just _ | ifClosed == True -> handler -- otherwise add it to the list _ -> modifyMVar_ (connClosedHandlers conn) $ \old -> pure $ handler:old -- | @addConnectionBlockedHandler conn blockedHandler unblockedHandler@ adds handlers that will be called -- when a connection gets blocked/unlocked due to server resource constraints. -- -- More information: addConnectionBlockedHandler :: Connection -> (Text -> IO ()) -> IO () -> IO () addConnectionBlockedHandler conn blockedHandler unblockedHandler = modifyMVar_ (connBlockedHandlers conn) $ \old -> pure $ (blockedHandler, unblockedHandler):old readFrame :: Conn.Connection -> IO Frame readFrame handle = do strictDat <- connectionGetExact handle 7 let dat = toLazy strictDat -- NB: userError returns an IOException so it will be catched in 'connectionReceiver' when (BL.null dat) $ CE.throwIO $ userError "connection not open" let len = fromIntegral $ peekFrameSize dat strictDat' <- connectionGetExact handle (len+1) -- +1 for the terminating 0xCE let dat' = toLazy strictDat' when (BL.null dat') $ CE.throwIO $ userError "connection not open" #if MIN_VERSION_binary(0, 7, 0) let ret = runGetOrFail get (BL.append dat dat') case ret of Left (_, _, errMsg) -> error $ "readFrame fail: " ++ errMsg Right (_, consumedBytes, _) | consumedBytes /= fromIntegral (len+8) -> error $ "readFrame: parser should read " ++ show (len+8) ++ " bytes; but read " ++ show consumedBytes Right (_, _, frame) -> pure frame #else let (frame, _, consumedBytes) = runGetState get (BL.append dat dat') 0 if consumedBytes /= fromIntegral (len+8) then error $ "readFrameSock: parser should read "++show (len+8)++" bytes; but read "++show consumedBytes else pure () pure frame #endif -- belongs in connection package and will be removed once it lands there connectionGetExact :: Conn.Connection -> Int -> IO BS.ByteString connectionGetExact conn x = loop BS.empty 0 where loop bs y | y == x = pure bs | otherwise = do next <- Conn.connectionGet conn (x - y) loop (BS.append bs next) (y + (BS.length next)) writeFrame :: Conn.Connection -> Frame -> IO () writeFrame handle f = do Conn.connectionPut handle . toStrict . runPut . put $ f ------------------------ CHANNEL ----------------------------- {- | A connection to an AMQP server is made up of separate channels. It is recommended to use a separate channel for each thread in your application that talks to the AMQP server (but you don't have to as channels are thread-safe) -} data Channel = Channel { connection :: Connection, inQueue :: Chan FramePayload, --incoming frames (from Connection) outstandingResponses :: MVar (Seq.Seq (MVar Assembly)), -- for every request an MVar is stored here waiting for the response channelID :: Word16, lastConsumerTag :: MVar Int, nextPublishSeqNum :: MVar Int, unconfirmedSet :: TVar IntSet.IntSet, ackedSet :: TVar IntSet.IntSet, --delivery tags nackedSet :: TVar IntSet.IntSet, --accumulate here. chanActive :: Lock, -- used for flow-control. if lock is closed, no content methods will be sent chanClosed :: MVar (Maybe (CloseType, String)), consumers :: MVar (M.Map Text ((Message, Envelope) -> IO (), -- who is consumer of a queue? (consumerTag => callback) (ConsumerTag -> IO ()))), -- cancellation notification callback returnListeners :: MVar ([(Message, PublishError) -> IO ()]), confirmListeners :: MVar ([(Word64, Bool, AckType) -> IO ()]), chanExceptionHandlers :: MVar [CE.SomeException -> IO ()] } -- | Thrown in the channel thread when the connection gets closed. -- When handling exceptions in a subscription callback, make sure to re-throw this so the channel thread can be stopped. data ChanThreadKilledException = ChanThreadKilledException { cause :: CE.SomeException } deriving (Show,Typeable) instance CE.Exception ChanThreadKilledException unwrapChanThreadKilledException :: CE.SomeException -> CE.SomeException unwrapChanThreadKilledException e = maybe e cause $ CE.fromException e msgFromContentHeaderProperties :: ContentHeaderProperties -> BL.ByteString -> Message msgFromContentHeaderProperties (CHBasic content_type content_encoding headers delivery_mode priority correlation_id reply_to expiration message_id timestamp message_type user_id application_id cluster_id) body = let msgId = fromShortString message_id contentType = fromShortString content_type contentEncoding = fromShortString content_encoding replyTo = fromShortString reply_to correlationID = fromShortString correlation_id messageType = fromShortString message_type userId = fromShortString user_id applicationId = fromShortString application_id clusterId = fromShortString cluster_id in Message body (fmap intToDeliveryMode delivery_mode) timestamp msgId messageType userId applicationId clusterId contentType contentEncoding replyTo priority correlationID (fromShortString expiration) headers where fromShortString (Just (ShortString s)) = Just s fromShortString _ = Nothing msgFromContentHeaderProperties c _ = error ("Unknown content header properties: " ++ show c) -- | The thread that is run for every channel channelReceiver :: Channel -> IO () channelReceiver chan = do --read incoming frames; they are put there by a Connection thread p <- readAssembly $ inQueue chan if isResponse p then do action <- modifyMVar (outstandingResponses chan) $ \val -> do case Seq.viewl val of x Seq.:< rest -> do pure (rest, putMVar x p) Seq.EmptyL -> do pure (val, CE.throwIO $ userError "got response, but have no corresponding request") action --handle asynchronous assemblies else handleAsync p channelReceiver chan where isResponse :: Assembly -> Bool isResponse (ContentMethod (Basic_deliver _ _ _ _ _) _ _) = False isResponse (ContentMethod (Basic_return _ _ _ _) _ _) = False isResponse (SimpleMethod (Channel_flow _)) = False isResponse (SimpleMethod (Channel_close _ _ _ _)) = False isResponse (SimpleMethod (Basic_ack _ _)) = False isResponse (SimpleMethod (Basic_nack _ _ _)) = False isResponse (SimpleMethod (Basic_cancel _ _)) = False isResponse _ = True --Basic.Deliver: forward msg to registered consumer handleAsync (ContentMethod (Basic_deliver (ShortString consumerTag) deliveryTag redelivered (ShortString exchange) (ShortString routingKey)) properties body) = withMVar (consumers chan) (\s -> do case M.lookup consumerTag s of Just (subscriber, _) -> do let msg = msgFromContentHeaderProperties properties body let env = Envelope {envDeliveryTag = deliveryTag, envRedelivered = redelivered, envExchangeName = exchange, envRoutingKey = routingKey, envChannel = chan} CE.catches (subscriber (msg, env)) [ CE.Handler (\(e::ChanThreadKilledException) -> CE.throwIO $ cause e), CE.Handler (\(e::CE.SomeException) -> hPutStrLn stderr $ "AMQP callback threw exception: " ++ show e) ] Nothing -> -- got a message, but have no registered subscriber; so drop it pure () ) handleAsync (SimpleMethod (Channel_close _ (ShortString errorMsg) _ _)) = do CE.catch ( writeAssembly' chan (SimpleMethod (Channel_close_ok)) ) (\ (_ :: CE.IOException) -> --do nothing if connection is already closed pure () ) closeChannel' chan Abnormal errorMsg myThreadId >>= flip CE.throwTo (ChannelClosedException Abnormal . T.unpack $ errorMsg) handleAsync (SimpleMethod (Channel_flow active)) = do if active then openLock $ chanActive chan else closeLock $ chanActive chan -- in theory we should respond with flow_ok but rabbitMQ 1.7 ignores that, so it doesn't matter pure () --Basic.return handleAsync (ContentMethod basicReturn@(Basic_return _ _ _ _) props body) = do let msg = msgFromContentHeaderProperties props body pubError = basicReturnToPublishError basicReturn withMVar (returnListeners chan) $ \listeners -> forM_ listeners $ \l -> CE.catch (l (msg, pubError)) $ \(ex :: CE.SomeException) -> hPutStrLn stderr $ "return listener on channel ["++(show $ channelID chan)++"] handling error ["++show pubError++"] threw exception: "++show ex handleAsync (SimpleMethod (Basic_ack deliveryTag multiple)) = handleConfirm deliveryTag multiple BasicAck handleAsync (SimpleMethod (Basic_nack deliveryTag multiple _)) = handleConfirm deliveryTag multiple BasicNack handleAsync (SimpleMethod (Basic_cancel consumerTag _)) = handleCancel consumerTag handleAsync m = error ("Unknown method: " ++ show m) handleConfirm deliveryTag multiple k = do withMVar (confirmListeners chan) $ \listeners -> forM_ listeners $ \l -> CE.catch (l (deliveryTag, multiple, k)) $ \(ex :: CE.SomeException) -> hPutStrLn stderr $ "confirm listener on channel ["++(show $ channelID chan)++"] handling method "++(show k)++" threw exception: "++ show ex let seqNum = fromIntegral deliveryTag let targetSet = case k of BasicAck -> (ackedSet chan) BasicNack -> (nackedSet chan) atomically $ do unconfSet <- readTVar (unconfirmedSet chan) let (merge, pending) = if multiple then (IntSet.union confs, pending') else (IntSet.insert seqNum, IntSet.delete seqNum unconfSet) where confs = fst parts pending' = snd parts parts = IntSet.partition (\n -> n <= seqNum) unconfSet modifyTVar' targetSet (\ts -> merge ts) writeTVar (unconfirmedSet chan) pending handleCancel (ShortString consumerTag) = withMVar (consumers chan) (\s -> do case M.lookup consumerTag s of Just (_, cancelCB) -> CE.catch (cancelCB consumerTag) $ \(ex :: CE.SomeException) -> hPutStrLn stderr $ "consumer cancellation listener "++(show consumerTag)++" on channel ["++(show $ channelID chan)++"] threw exception: "++ show ex Nothing -> -- got a cancellation notification, but have no registered subscriber; so drop it pure () ) basicReturnToPublishError (Basic_return code (ShortString errText) (ShortString exchange) (ShortString routingKey)) = let replyError = case code of 312 -> Unroutable errText 313 -> NoConsumers errText 404 -> NotFound errText num -> error $ "unexpected return error code: " ++ (show num) pubError = PublishError replyError (Just exchange) routingKey in pubError basicReturnToPublishError x = error $ "basicReturnToPublishError fail: "++show x -- | registers a callback function that is called whenever a message is returned from the broker ('basic.return'). addReturnListener :: Channel -> ((Message, PublishError) -> IO ()) -> IO () addReturnListener chan listener = do modifyMVar_ (returnListeners chan) $ \listeners -> pure $ listener:listeners -- | registers a callback function that is called whenever a channel is closed by an exception. addChannelExceptionHandler :: Channel -> (CE.SomeException -> IO ()) -> IO () addChannelExceptionHandler chan handler = do modifyMVar_ (chanExceptionHandlers chan) $ \handlers -> pure $ handler:handlers -- closes the channel internally; but doesn't tell the server closeChannel' :: Channel -> CloseType -> Text -> IO () closeChannel' c closeType reason = do modifyMVar_ (chanClosed c) $ \x -> do if isNothing x then do modifyMVar_ (connChannels $ connection c) $ \old -> do ret <- freeChannel (connChanAllocator $ connection c) $ fromIntegral $ channelID c when (not ret) $ hPutStrLn stderr "closeChannel error: channel already freed" pure $ IM.delete (fromIntegral $ channelID c) old void $ killLock $ chanActive c killOutstandingResponses $ outstandingResponses c pure $ Just (closeType, T.unpack reason) else pure x where killOutstandingResponses :: MVar (Seq.Seq (MVar a)) -> IO () killOutstandingResponses outResps = do modifyMVar_ outResps $ \val -> do F.mapM_ (\x -> tryPutMVar x $ error "channel closed") val pure undefined -- | opens a new channel on the connection -- -- By default, if a channel is closed by an AMQP exception, this exception will be printed to stderr. You can prevent this behaviour by setting a custom exception handler (using 'addChannelExceptionHandler'). openChannel :: Connection -> IO Channel openChannel c = do newInQueue <- newChan outRes <- newMVar Seq.empty lastConsTag <- newMVar 0 ca <- newLock closed <- newMVar Nothing conss <- newMVar M.empty retListeners <- newMVar [] aSet <- newTVarIO IntSet.empty nSet <- newTVarIO IntSet.empty nxtSeq <- newMVar 0 unconfSet <- newTVarIO IntSet.empty cnfListeners <- newMVar [] handlers <- newMVar [] --add new channel to connection's channel map newChannel <- modifyMVar (connChannels c) $ \mp -> do newChannelID <- allocateChannel (connChanAllocator c) let newChannel = Channel c newInQueue outRes (fromIntegral newChannelID) lastConsTag nxtSeq unconfSet aSet nSet ca closed conss retListeners cnfListeners handlers thrID <- forkFinally (channelReceiver newChannel) $ \res -> do closeChannel' newChannel Normal "closed" case res of Right _ -> pure () Left ex -> do let unwrappedExc = unwrapChanThreadKilledException ex handlers' <- readMVar handlers case (null handlers', fromAbnormalChannelClose unwrappedExc) of (True, Just reason) -> hPutStrLn stderr $ "unhandled AMQP channel exception (chanId="++show newChannelID++"): "++reason _ -> mapM_ ($ unwrappedExc) handlers' when (IM.member newChannelID mp) $ CE.throwIO $ userError "openChannel fail: channel already open" pure (IM.insert newChannelID (newChannel, thrID) mp, newChannel) SimpleMethod (Channel_open_ok _) <- request newChannel (SimpleMethod (Channel_open (ShortString ""))) pure newChannel where fromAbnormalChannelClose :: CE.SomeException -> Maybe String fromAbnormalChannelClose exc = case CE.fromException exc :: Maybe AMQPException of Just (ChannelClosedException Abnormal reason) -> Just reason _ -> Nothing -- | closes a channel. It is typically not necessary to manually call this as closing a connection will implicitly close all channels. closeChannel :: Channel -> IO () closeChannel c = do SimpleMethod Channel_close_ok <- request c $ SimpleMethod $ Channel_close 0 (ShortString "") 0 0 withMVar (connChannels $ connection c) $ \chans -> do case IM.lookup (fromIntegral $ channelID c) chans of Just (_, thrID) -> killThread thrID Nothing -> pure () -- | writes multiple frames to the channel atomically writeFrames :: Channel -> [FramePayload] -> IO () writeFrames chan payloads = let conn = connection chan in withMVar (connChannels conn) $ \chans -> if IM.member (fromIntegral $ channelID chan) chans then CE.catch -- ensure at most one thread is writing to the socket at any time (do withMVar (connWriteLock conn) $ \_ -> mapM_ (\payload -> writeFrame (connHandle conn) (Frame (channelID chan) payload)) payloads updateLastSent conn) ( \(_ :: CE.IOException) -> do CE.throwIO $ userError "connection not open" ) else do CE.throwIO $ userError "channel not open" writeAssembly' :: Channel -> Assembly -> IO () writeAssembly' chan (ContentMethod m properties msg) = do -- wait iff the AMQP server instructed us to withhold sending content data (flow control) waitLock $ chanActive chan let !toWrite = [(MethodPayload m), (ContentHeaderPayload (getClassIDOf properties) --classID 0 --weight is deprecated in AMQP 0-9 (fromIntegral $ BL.length msg) --bodySize properties)] ++ (if BL.length msg > 0 then do -- split into frames of maxFrameSize -- (need to substract 8 bytes to account for frame header and end-marker) map ContentBodyPayload (splitLen msg $ (fromIntegral $ connMaxFrameSize $ connection chan) - 8) else [] ) writeFrames chan toWrite where splitLen str len | BL.length str > len = (BL.take len str):(splitLen (BL.drop len str) len) splitLen str _ = [str] writeAssembly' chan (SimpleMethod m) = writeFrames chan [MethodPayload m] -- most exported functions in this module will use either 'writeAssembly' or 'request' to talk to the server -- so we perform the exception handling here -- | writes an assembly to the channel writeAssembly :: Channel -> Assembly -> IO () writeAssembly chan m = CE.catches (writeAssembly' chan m) [CE.Handler (\ (_ :: AMQPException) -> throwMostRelevantAMQPException chan), CE.Handler (\ (_ :: CE.ErrorCall) -> throwMostRelevantAMQPException chan), CE.Handler (\ (_ :: CE.IOException) -> throwMostRelevantAMQPException chan)] -- | sends an assembly and receives the response request :: Channel -> Assembly -> IO Assembly request chan m = do res <- newEmptyMVar CE.catches (do withMVar (chanClosed chan) $ \cc -> do if isNothing cc then do modifyMVar_ (outstandingResponses chan) $ \val -> pure $! val Seq.|> res writeAssembly' chan m else CE.throwIO $ userError "closed" -- res might contain an exception, so evaluate it here !r <- takeMVar res pure r ) [CE.Handler (\ (_ :: AMQPException) -> throwMostRelevantAMQPException chan), CE.Handler (\ (_ :: CE.ErrorCall) -> throwMostRelevantAMQPException chan), CE.Handler (\ (_ :: CE.IOException) -> throwMostRelevantAMQPException chan)] -- this throws an AMQPException based on the status of the connection and the channel -- if both connection and channel are closed, it will throw a ConnectionClosedException throwMostRelevantAMQPException :: Channel -> IO a throwMostRelevantAMQPException chan = do cc <- readMVar $ connClosed $ connection chan case cc of Just (closeType, r) -> CE.throwIO $ ConnectionClosedException closeType r Nothing -> do chc <- readMVar $ chanClosed chan case chc of Just (ct, r) -> CE.throwIO $ ChannelClosedException ct r Nothing -> CE.throwIO $ ConnectionClosedException Abnormal "unknown reason" waitForAllConfirms :: Channel -> STM (IntSet.IntSet, IntSet.IntSet) waitForAllConfirms chan = do pending <- readTVar $ (unconfirmedSet chan) check (IntSet.null pending) pure =<< (,) <$> swapTVar (ackedSet chan) IntSet.empty <*> swapTVar (nackedSet chan) IntSet.empty