{-# LANGUAGE BangPatterns, CPP, OverloadedStrings, ScopedTypeVariables, LambdaCase #-}
module Network.AMQP.Internal where

import Paths_amqp(version)
import Data.Version(showVersion)

import Control.Applicative
import Control.Concurrent
import Control.Concurrent.STM
import Control.Monad
import Data.Binary
import Data.Binary.Get
import Data.Binary.Put as BPut
import Data.Int (Int64)
import Data.Maybe
import Data.Text (Text)
import Network.Socket (PortNumber, withSocketsDo)
import System.IO (hPutStrLn, stderr)

import qualified Control.Exception as CE
import qualified Data.ByteString as BS
import qualified Data.ByteString.Char8 as BC
import qualified Data.ByteString.Lazy as BL
import qualified Data.Map as M
import qualified Data.Foldable as F
import qualified Data.IntMap as IM
import qualified Data.IntSet as IntSet
import qualified Data.Sequence as Seq
import qualified Data.Text as T
import qualified Data.Text.Encoding as E
import qualified Network.Connection as Conn

import Network.AMQP.Protocol
import Network.AMQP.Types
import Network.AMQP.Helpers
import Network.AMQP.Generated
import Network.AMQP.ChannelAllocator

data AckType = BasicAck | BasicNack deriving Int -> AckType -> ShowS
[AckType] -> ShowS
AckType -> [Char]
forall a.
(Int -> a -> ShowS) -> (a -> [Char]) -> ([a] -> ShowS) -> Show a
showList :: [AckType] -> ShowS
$cshowList :: [AckType] -> ShowS
show :: AckType -> [Char]
$cshow :: AckType -> [Char]
showsPrec :: Int -> AckType -> ShowS
$cshowsPrec :: Int -> AckType -> ShowS
Show

data DeliveryMode = Persistent -- ^ the message will survive server restarts (if the queue is durable)

                  | NonPersistent -- ^ the message may be lost after server restarts

    deriving (DeliveryMode -> DeliveryMode -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: DeliveryMode -> DeliveryMode -> Bool
$c/= :: DeliveryMode -> DeliveryMode -> Bool
== :: DeliveryMode -> DeliveryMode -> Bool
$c== :: DeliveryMode -> DeliveryMode -> Bool
Eq, Eq DeliveryMode
DeliveryMode -> DeliveryMode -> Bool
DeliveryMode -> DeliveryMode -> Ordering
DeliveryMode -> DeliveryMode -> DeliveryMode
forall a.
Eq a
-> (a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
min :: DeliveryMode -> DeliveryMode -> DeliveryMode
$cmin :: DeliveryMode -> DeliveryMode -> DeliveryMode
max :: DeliveryMode -> DeliveryMode -> DeliveryMode
$cmax :: DeliveryMode -> DeliveryMode -> DeliveryMode
>= :: DeliveryMode -> DeliveryMode -> Bool
$c>= :: DeliveryMode -> DeliveryMode -> Bool
> :: DeliveryMode -> DeliveryMode -> Bool
$c> :: DeliveryMode -> DeliveryMode -> Bool
<= :: DeliveryMode -> DeliveryMode -> Bool
$c<= :: DeliveryMode -> DeliveryMode -> Bool
< :: DeliveryMode -> DeliveryMode -> Bool
$c< :: DeliveryMode -> DeliveryMode -> Bool
compare :: DeliveryMode -> DeliveryMode -> Ordering
$ccompare :: DeliveryMode -> DeliveryMode -> Ordering
Ord, ReadPrec [DeliveryMode]
ReadPrec DeliveryMode
Int -> ReadS DeliveryMode
ReadS [DeliveryMode]
forall a.
(Int -> ReadS a)
-> ReadS [a] -> ReadPrec a -> ReadPrec [a] -> Read a
readListPrec :: ReadPrec [DeliveryMode]
$creadListPrec :: ReadPrec [DeliveryMode]
readPrec :: ReadPrec DeliveryMode
$creadPrec :: ReadPrec DeliveryMode
readList :: ReadS [DeliveryMode]
$creadList :: ReadS [DeliveryMode]
readsPrec :: Int -> ReadS DeliveryMode
$creadsPrec :: Int -> ReadS DeliveryMode
Read, Int -> DeliveryMode -> ShowS
[DeliveryMode] -> ShowS
DeliveryMode -> [Char]
forall a.
(Int -> a -> ShowS) -> (a -> [Char]) -> ([a] -> ShowS) -> Show a
showList :: [DeliveryMode] -> ShowS
$cshowList :: [DeliveryMode] -> ShowS
show :: DeliveryMode -> [Char]
$cshow :: DeliveryMode -> [Char]
showsPrec :: Int -> DeliveryMode -> ShowS
$cshowsPrec :: Int -> DeliveryMode -> ShowS
Show)

deliveryModeToInt :: DeliveryMode -> Octet
deliveryModeToInt :: DeliveryMode -> Word8
deliveryModeToInt DeliveryMode
NonPersistent = Word8
1
deliveryModeToInt DeliveryMode
Persistent = Word8
2

intToDeliveryMode :: Octet -> DeliveryMode
intToDeliveryMode :: Word8 -> DeliveryMode
intToDeliveryMode Word8
1 = DeliveryMode
NonPersistent
intToDeliveryMode Word8
2 = DeliveryMode
Persistent
intToDeliveryMode Word8
n = forall a. HasCallStack => [Char] -> a
error ([Char]
"Unknown delivery mode int: " forall a. [a] -> [a] -> [a]
++ forall a. Show a => a -> [Char]
show Word8
n)

-- | An AMQP message

data Message = Message {
    Message -> ByteString
msgBody :: BL.ByteString, -- ^ the content of your message

    Message -> Maybe DeliveryMode
msgDeliveryMode :: Maybe DeliveryMode, -- ^ see 'DeliveryMode'

    Message -> Maybe LongLongInt
msgTimestamp :: Maybe Timestamp, -- ^ use in any way you like; this doesn't affect the way the message is handled

    Message -> Maybe Text
msgID :: Maybe Text, -- ^ use in any way you like; this doesn't affect the way the message is handled

    Message -> Maybe Text
msgType :: Maybe Text, -- ^ use in any way you like; this doesn't affect the way the message is handled

    Message -> Maybe Text
msgUserID :: Maybe Text,
    Message -> Maybe Text
msgApplicationID :: Maybe Text,
    Message -> Maybe Text
msgClusterID :: Maybe Text,
    Message -> Maybe Text
msgContentType :: Maybe Text,
    Message -> Maybe Text
msgContentEncoding :: Maybe Text,
    Message -> Maybe Text
msgReplyTo :: Maybe Text,
    Message -> Maybe Word8
msgPriority :: Maybe Octet,
    Message -> Maybe Text
msgCorrelationID :: Maybe Text,
    Message -> Maybe Text
msgExpiration :: Maybe Text,
    Message -> Maybe FieldTable
msgHeaders :: Maybe FieldTable
} deriving (Message -> Message -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: Message -> Message -> Bool
$c/= :: Message -> Message -> Bool
== :: Message -> Message -> Bool
$c== :: Message -> Message -> Bool
Eq, Eq Message
Message -> Message -> Bool
Message -> Message -> Ordering
Message -> Message -> Message
forall a.
Eq a
-> (a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
min :: Message -> Message -> Message
$cmin :: Message -> Message -> Message
max :: Message -> Message -> Message
$cmax :: Message -> Message -> Message
>= :: Message -> Message -> Bool
$c>= :: Message -> Message -> Bool
> :: Message -> Message -> Bool
$c> :: Message -> Message -> Bool
<= :: Message -> Message -> Bool
$c<= :: Message -> Message -> Bool
< :: Message -> Message -> Bool
$c< :: Message -> Message -> Bool
compare :: Message -> Message -> Ordering
$ccompare :: Message -> Message -> Ordering
Ord, ReadPrec [Message]
ReadPrec Message
Int -> ReadS Message
ReadS [Message]
forall a.
(Int -> ReadS a)
-> ReadS [a] -> ReadPrec a -> ReadPrec [a] -> Read a
readListPrec :: ReadPrec [Message]
$creadListPrec :: ReadPrec [Message]
readPrec :: ReadPrec Message
$creadPrec :: ReadPrec Message
readList :: ReadS [Message]
$creadList :: ReadS [Message]
readsPrec :: Int -> ReadS Message
$creadsPrec :: Int -> ReadS Message
Read, Int -> Message -> ShowS
[Message] -> ShowS
Message -> [Char]
forall a.
(Int -> a -> ShowS) -> (a -> [Char]) -> ([a] -> ShowS) -> Show a
showList :: [Message] -> ShowS
$cshowList :: [Message] -> ShowS
show :: Message -> [Char]
$cshow :: Message -> [Char]
showsPrec :: Int -> Message -> ShowS
$cshowsPrec :: Int -> Message -> ShowS
Show)

-- | contains meta-information of a delivered message (through 'getMsg' or 'consumeMsgs')

data Envelope = Envelope {
    Envelope -> LongLongInt
envDeliveryTag :: LongLongInt,
    Envelope -> Bool
envRedelivered :: Bool,
    Envelope -> Text
envExchangeName :: Text,
    Envelope -> Text
envRoutingKey :: Text,
    Envelope -> Channel
envChannel :: Channel
}

data PublishError = PublishError {
    PublishError -> ReturnReplyCode
errReplyCode :: ReturnReplyCode,
    PublishError -> Maybe Text
errExchange :: Maybe Text,
    PublishError -> Text
errRoutingKey :: Text
} deriving (PublishError -> PublishError -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: PublishError -> PublishError -> Bool
$c/= :: PublishError -> PublishError -> Bool
== :: PublishError -> PublishError -> Bool
$c== :: PublishError -> PublishError -> Bool
Eq, ReadPrec [PublishError]
ReadPrec PublishError
Int -> ReadS PublishError
ReadS [PublishError]
forall a.
(Int -> ReadS a)
-> ReadS [a] -> ReadPrec a -> ReadPrec [a] -> Read a
readListPrec :: ReadPrec [PublishError]
$creadListPrec :: ReadPrec [PublishError]
readPrec :: ReadPrec PublishError
$creadPrec :: ReadPrec PublishError
readList :: ReadS [PublishError]
$creadList :: ReadS [PublishError]
readsPrec :: Int -> ReadS PublishError
$creadsPrec :: Int -> ReadS PublishError
Read, Int -> PublishError -> ShowS
[PublishError] -> ShowS
PublishError -> [Char]
forall a.
(Int -> a -> ShowS) -> (a -> [Char]) -> ([a] -> ShowS) -> Show a
showList :: [PublishError] -> ShowS
$cshowList :: [PublishError] -> ShowS
show :: PublishError -> [Char]
$cshow :: PublishError -> [Char]
showsPrec :: Int -> PublishError -> ShowS
$cshowsPrec :: Int -> PublishError -> ShowS
Show)

data ReturnReplyCode = Unroutable Text
                     | NoConsumers Text
                     | NotFound Text
    deriving (ReturnReplyCode -> ReturnReplyCode -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: ReturnReplyCode -> ReturnReplyCode -> Bool
$c/= :: ReturnReplyCode -> ReturnReplyCode -> Bool
== :: ReturnReplyCode -> ReturnReplyCode -> Bool
$c== :: ReturnReplyCode -> ReturnReplyCode -> Bool
Eq, ReadPrec [ReturnReplyCode]
ReadPrec ReturnReplyCode
Int -> ReadS ReturnReplyCode
ReadS [ReturnReplyCode]
forall a.
(Int -> ReadS a)
-> ReadS [a] -> ReadPrec a -> ReadPrec [a] -> Read a
readListPrec :: ReadPrec [ReturnReplyCode]
$creadListPrec :: ReadPrec [ReturnReplyCode]
readPrec :: ReadPrec ReturnReplyCode
$creadPrec :: ReadPrec ReturnReplyCode
readList :: ReadS [ReturnReplyCode]
$creadList :: ReadS [ReturnReplyCode]
readsPrec :: Int -> ReadS ReturnReplyCode
$creadsPrec :: Int -> ReadS ReturnReplyCode
Read, Int -> ReturnReplyCode -> ShowS
[ReturnReplyCode] -> ShowS
ReturnReplyCode -> [Char]
forall a.
(Int -> a -> ShowS) -> (a -> [Char]) -> ([a] -> ShowS) -> Show a
showList :: [ReturnReplyCode] -> ShowS
$cshowList :: [ReturnReplyCode] -> ShowS
show :: ReturnReplyCode -> [Char]
$cshow :: ReturnReplyCode -> [Char]
showsPrec :: Int -> ReturnReplyCode -> ShowS
$cshowsPrec :: Int -> ReturnReplyCode -> ShowS
Show)

------------- ASSEMBLY -------------------------

-- an assembly is a higher-level object consisting of several frames (like in amqp 0-10)

data Assembly = SimpleMethod MethodPayload
              | ContentMethod MethodPayload ContentHeaderProperties BL.ByteString --method, properties, content-data

    deriving Int -> Assembly -> ShowS
[Assembly] -> ShowS
Assembly -> [Char]
forall a.
(Int -> a -> ShowS) -> (a -> [Char]) -> ([a] -> ShowS) -> Show a
showList :: [Assembly] -> ShowS
$cshowList :: [Assembly] -> ShowS
show :: Assembly -> [Char]
$cshow :: Assembly -> [Char]
showsPrec :: Int -> Assembly -> ShowS
$cshowsPrec :: Int -> Assembly -> ShowS
Show

-- | reads all frames necessary to build an assembly

readAssembly :: Chan FramePayload -> IO Assembly
readAssembly :: Chan FramePayload -> IO Assembly
readAssembly Chan FramePayload
chan = do
    FramePayload
m <- forall a. Chan a -> IO a
readChan Chan FramePayload
chan
    case FramePayload
m of
        MethodPayload MethodPayload
p -> --got a method frame

            if FramePayload -> Bool
hasContent FramePayload
m
                then do
                    --several frames containing the content will follow, so read them

                    (ContentHeaderProperties
props, ByteString
msg) <- Chan FramePayload -> IO (ContentHeaderProperties, ByteString)
collectContent Chan FramePayload
chan
                    forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ MethodPayload -> ContentHeaderProperties -> ByteString -> Assembly
ContentMethod MethodPayload
p ContentHeaderProperties
props ByteString
msg
                else do
                    forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ MethodPayload -> Assembly
SimpleMethod MethodPayload
p
        FramePayload
x -> forall a. HasCallStack => [Char] -> a
error forall a b. (a -> b) -> a -> b
$ [Char]
"didn't expect frame: " forall a. [a] -> [a] -> [a]
++ forall a. Show a => a -> [Char]
show FramePayload
x

-- | reads a contentheader and contentbodies and assembles them

collectContent :: Chan FramePayload -> IO (ContentHeaderProperties, BL.ByteString)
collectContent :: Chan FramePayload -> IO (ContentHeaderProperties, ByteString)
collectContent Chan FramePayload
chan = do
    (ContentHeaderPayload Word16
_ Word16
_ LongLongInt
bodySize ContentHeaderProperties
props) <- forall a. Chan a -> IO a
readChan Chan FramePayload
chan

    [ByteString]
content <- ByteOffset -> IO [ByteString]
collect forall a b. (a -> b) -> a -> b
$ forall a b. (Integral a, Num b) => a -> b
fromIntegral LongLongInt
bodySize
    forall (m :: * -> *) a. Monad m => a -> m a
return (ContentHeaderProperties
props, [ByteString] -> ByteString
BL.concat [ByteString]
content)
  where
    collect :: ByteOffset -> IO [ByteString]
collect ByteOffset
x | ByteOffset
x forall a. Ord a => a -> a -> Bool
<= ByteOffset
0 = forall (m :: * -> *) a. Monad m => a -> m a
return []
    collect ByteOffset
x = do
        (ContentBodyPayload ByteString
payload) <- forall a. Chan a -> IO a
readChan Chan FramePayload
chan
        [ByteString]
r <- ByteOffset -> IO [ByteString]
collect (ByteOffset
x forall a. Num a => a -> a -> a
- ByteString -> ByteOffset
BL.length ByteString
payload)
        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ ByteString
payload forall a. a -> [a] -> [a]
: [ByteString]
r

------------ CONNECTION -------------------


{- general concept:
Each connection has its own thread. Each channel has its own thread.
Connection reads data from socket and forwards it to channel. Channel processes data and forwards it to application.
Outgoing data is written directly onto the socket.

Incoming Data: Socket -> Connection-Thread -> Channel-Thread -> Application
Outgoing Data: Application -> Socket
-}

data Connection = Connection {
    Connection -> Connection
connHandle :: Conn.Connection,
    Connection -> ChannelAllocator
connChanAllocator :: ChannelAllocator,
    Connection -> MVar (IntMap (Channel, ThreadId))
connChannels :: MVar (IM.IntMap (Channel, ThreadId)), -- open channels (channelID => (Channel, ChannelThread))

    Connection -> Int
connMaxFrameSize :: Int, --negotiated maximum frame size

    Connection -> MVar (Maybe (CloseType, [Char]))
connClosed :: MVar (Maybe (CloseType, String)),
    Connection -> MVar ()
connClosedLock :: MVar (), -- used by closeConnection to block until connection-close handshake is complete

    Connection -> MVar ()
connWriteLock :: MVar (), -- to ensure atomic writes to the socket

    Connection -> MVar [IO ()]
connClosedHandlers :: MVar [IO ()],
    Connection -> MVar [(Text -> IO (), IO ())]
connBlockedHandlers :: MVar [(Text -> IO (), IO ())],
    Connection -> MVar ByteOffset
connLastReceived :: MVar Int64, -- the timestamp from a monotonic clock when the last frame was received

    Connection -> MVar ByteOffset
connLastSent :: MVar Int64, -- the timestamp from a monotonic clock when the last frame was written

    Connection -> FieldTable
connServerProperties :: FieldTable, -- the server properties sent in Connection_start

    Connection -> MVar (Maybe ThreadId)
connThread :: MVar (Maybe ThreadId)
}

-- | Represents the parameters to connect to a broker or a cluster of brokers.

-- See 'defaultConnectionOpts'.

data ConnectionOpts = ConnectionOpts {
    ConnectionOpts -> [([Char], PortNumber)]
coServers :: ![(String, PortNumber)], -- ^ A list of host-port pairs. Useful in a clustered setup to connect to the first available host.

    ConnectionOpts -> Text
coVHost :: !Text, -- ^ The VHost to connect to.

    ConnectionOpts -> [SASLMechanism]
coAuth :: ![SASLMechanism], -- ^ The 'SASLMechanism's to use for authenticating with the broker.

    ConnectionOpts -> Maybe Word32
coMaxFrameSize :: !(Maybe Word32), -- ^ The maximum frame size to be used. If not specified, no limit is assumed.

    ConnectionOpts -> Maybe Word16
coHeartbeatDelay :: !(Maybe Word16), -- ^ The delay in seconds, after which the client expects a heartbeat frame from the broker. If 'Nothing', the value suggested by the broker is used. Use @Just 0@ to disable the heartbeat mechnism.

    ConnectionOpts -> Maybe Word16
coMaxChannel :: !(Maybe Word16), -- ^ The maximum number of channels the client will use.

    ConnectionOpts -> Maybe TLSSettings
coTLSSettings :: Maybe TLSSettings, -- ^ Whether or not to connect to servers using TLS. See http://www.rabbitmq.com/ssl.html for details.

    ConnectionOpts -> Maybe Text
coName :: !(Maybe Text) -- ^ optional connection name (will be displayed in the RabbitMQ web interface)

}

-- | Represents the kind of TLS connection to establish.

data TLSSettings =
    TLSTrusted   -- ^ Require trusted certificates (Recommended).

  | TLSUntrusted -- ^ Allow untrusted certificates (Discouraged. Vulnerable to man-in-the-middle attacks)

  | TLSCustom Conn.TLSSettings -- ^ Provide your own custom TLS settings


connectionTLSSettings :: TLSSettings -> Maybe Conn.TLSSettings
connectionTLSSettings :: TLSSettings -> Maybe TLSSettings
connectionTLSSettings TLSSettings
tlsSettings =
    forall a. a -> Maybe a
Just forall a b. (a -> b) -> a -> b
$ case TLSSettings
tlsSettings of
        TLSSettings
TLSTrusted -> Bool -> Bool -> Bool -> TLSSettings
Conn.TLSSettingsSimple Bool
False Bool
False Bool
False
        TLSSettings
TLSUntrusted -> Bool -> Bool -> Bool -> TLSSettings
Conn.TLSSettingsSimple Bool
True Bool
False Bool
False
        TLSCustom TLSSettings
x -> TLSSettings
x

-- | A 'SASLMechanism' is described by its name ('saslName'), its initial response ('saslInitialResponse'), and an optional function ('saslChallengeFunc') that

-- transforms a security challenge provided by the server into response, which is then sent back to the server for verification.

data SASLMechanism = SASLMechanism {
    SASLMechanism -> Text
saslName :: !Text, -- ^ mechanism name

    SASLMechanism -> ByteString
saslInitialResponse :: !BS.ByteString, -- ^ initial response

    SASLMechanism -> Maybe (ByteString -> IO ByteString)
saslChallengeFunc :: !(Maybe (BS.ByteString -> IO BS.ByteString)) -- ^ challenge processing function

}

-- | reads incoming frames from socket and forwards them to the opened channels

connectionReceiver :: Connection -> IO ()
connectionReceiver :: Connection -> IO ()
connectionReceiver Connection
conn = do
    forall e a. Exception e => IO a -> (e -> IO a) -> IO a
CE.catch (do
        Frame Word16
chanID FramePayload
payload <- Connection -> IO Frame
readFrame (Connection -> Connection
connHandle Connection
conn)
        Connection -> IO ()
updateLastReceived Connection
conn
        forall {a}. (Integral a, Show a) => a -> FramePayload -> IO ()
forwardToChannel Word16
chanID FramePayload
payload
        )
        (\(IOError
e :: CE.IOException) -> IO ThreadId
myThreadId forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Connection -> CloseType -> SomeException -> ThreadId -> IO ()
killConnection Connection
conn CloseType
Abnormal (forall e. Exception e => e -> SomeException
CE.toException IOError
e))
    Connection -> IO ()
connectionReceiver Connection
conn
  where
    closedByUserEx :: AMQPException
closedByUserEx = CloseType -> [Char] -> AMQPException
ConnectionClosedException CloseType
Normal [Char]
"closed by user"

    forwardToChannel :: a -> FramePayload -> IO ()
forwardToChannel a
0 (MethodPayload MethodPayload
Connection_close_ok) =
        IO ThreadId
myThreadId forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Connection -> CloseType -> SomeException -> ThreadId -> IO ()
killConnection Connection
conn CloseType
Normal (forall e. Exception e => e -> SomeException
CE.toException AMQPException
closedByUserEx)
    forwardToChannel a
0 (MethodPayload (Connection_close Word16
_ (ShortString Text
errorMsg) Word16
_ Word16
_)) = do
        Connection -> Frame -> IO ()
writeFrame (Connection -> Connection
connHandle Connection
conn) forall a b. (a -> b) -> a -> b
$ Word16 -> FramePayload -> Frame
Frame Word16
0 forall a b. (a -> b) -> a -> b
$ MethodPayload -> FramePayload
MethodPayload MethodPayload
Connection_close_ok
        IO ThreadId
myThreadId forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Connection -> CloseType -> SomeException -> ThreadId -> IO ()
killConnection Connection
conn CloseType
Abnormal (forall e. Exception e => e -> SomeException
CE.toException forall b c a. (b -> c) -> (a -> b) -> a -> c
. CloseType -> [Char] -> AMQPException
ConnectionClosedException CloseType
Abnormal forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> [Char]
T.unpack forall a b. (a -> b) -> a -> b
$ Text
errorMsg)
    forwardToChannel a
0 FramePayload
HeartbeatPayload = forall (m :: * -> *) a. Monad m => a -> m a
return ()
    forwardToChannel a
0 (MethodPayload (Connection_blocked ShortString
reason)) = ShortString -> IO ()
handleBlocked ShortString
reason
    forwardToChannel a
0 (MethodPayload MethodPayload
Connection_unblocked) = IO ()
handleUnblocked
    forwardToChannel a
0 FramePayload
payload = Handle -> [Char] -> IO ()
hPutStrLn Handle
stderr forall a b. (a -> b) -> a -> b
$ [Char]
"Got unexpected msg on channel zero: " forall a. [a] -> [a] -> [a]
++ forall a. Show a => a -> [Char]
show FramePayload
payload
    forwardToChannel a
chanID FramePayload
payload = do
        --got asynchronous msg => forward to registered channel

        forall a b. MVar a -> (a -> IO b) -> IO b
withMVar (Connection -> MVar (IntMap (Channel, ThreadId))
connChannels Connection
conn) forall a b. (a -> b) -> a -> b
$ \IntMap (Channel, ThreadId)
cs -> do
            case forall a. Int -> IntMap a -> Maybe a
IM.lookup (forall a b. (Integral a, Num b) => a -> b
fromIntegral a
chanID) IntMap (Channel, ThreadId)
cs of
                Just (Channel, ThreadId)
c -> forall a. Chan a -> a -> IO ()
writeChan (Channel -> Chan FramePayload
inQueue forall a b. (a -> b) -> a -> b
$ forall a b. (a, b) -> a
fst (Channel, ThreadId)
c) FramePayload
payload
                -- TODO: It's unclear how to handle this, although it probably never

                --   happens in practice.

                Maybe (Channel, ThreadId)
Nothing -> Handle -> [Char] -> IO ()
hPutStrLn Handle
stderr forall a b. (a -> b) -> a -> b
$ [Char]
"ERROR: channel not open " forall a. [a] -> [a] -> [a]
++ forall a. Show a => a -> [Char]
show a
chanID

    handleBlocked :: ShortString -> IO ()
handleBlocked (ShortString Text
reason) = do
        forall a b. MVar a -> (a -> IO b) -> IO b
withMVar (Connection -> MVar [(Text -> IO (), IO ())]
connBlockedHandlers Connection
conn) forall a b. (a -> b) -> a -> b
$ \[(Text -> IO (), IO ())]
listeners ->
            forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [(Text -> IO (), IO ())]
listeners forall a b. (a -> b) -> a -> b
$ \(Text -> IO ()
l, IO ()
_) -> forall e a. Exception e => IO a -> (e -> IO a) -> IO a
CE.catch (Text -> IO ()
l Text
reason) forall a b. (a -> b) -> a -> b
$ \(SomeException
ex :: CE.SomeException) ->
                Handle -> [Char] -> IO ()
hPutStrLn Handle
stderr forall a b. (a -> b) -> a -> b
$ [Char]
"connection blocked listener threw exception: "forall a. [a] -> [a] -> [a]
++ forall a. Show a => a -> [Char]
show SomeException
ex

    handleUnblocked :: IO ()
handleUnblocked = do
        forall a b. MVar a -> (a -> IO b) -> IO b
withMVar (Connection -> MVar [(Text -> IO (), IO ())]
connBlockedHandlers Connection
conn) forall a b. (a -> b) -> a -> b
$ \[(Text -> IO (), IO ())]
listeners ->
            forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [(Text -> IO (), IO ())]
listeners forall a b. (a -> b) -> a -> b
$ \(Text -> IO ()
_, IO ()
l) -> forall e a. Exception e => IO a -> (e -> IO a) -> IO a
CE.catch IO ()
l forall a b. (a -> b) -> a -> b
$ \(SomeException
ex :: CE.SomeException) ->
                Handle -> [Char] -> IO ()
hPutStrLn Handle
stderr forall a b. (a -> b) -> a -> b
$ [Char]
"connection unblocked listener threw exception: "forall a. [a] -> [a] -> [a]
++ forall a. Show a => a -> [Char]
show SomeException
ex

-- | Opens a connection to a broker specified by the given 'ConnectionOpts' parameter.

openConnection'' :: ConnectionOpts -> IO Connection
openConnection'' :: ConnectionOpts -> IO Connection
openConnection'' ConnectionOpts
connOpts = forall a. IO a -> IO a
withSocketsDo forall a b. (a -> b) -> a -> b
$ do
    Connection
handle <- [SomeException] -> [([Char], PortNumber)] -> IO Connection
connect [] forall a b. (a -> b) -> a -> b
$ ConnectionOpts -> [([Char], PortNumber)]
coServers ConnectionOpts
connOpts
    (Word32
maxFrameSize, Word16
maxChannel, Maybe Word16
heartbeatTimeout, FieldTable
serverProps) <- forall e a. Exception e => (e -> IO a) -> IO a -> IO a
CE.handle (\(IOError
_ :: CE.IOException) -> forall e a. Exception e => e -> IO a
CE.throwIO forall a b. (a -> b) -> a -> b
$ CloseType -> [Char] -> AMQPException
ConnectionClosedException CloseType
Abnormal [Char]
"Handshake failed. Please check the RabbitMQ logs for more information") forall a b. (a -> b) -> a -> b
$ do
        Connection -> ByteString -> IO ()
Conn.connectionPut Connection
handle forall a b. (a -> b) -> a -> b
$ ByteString -> ByteString -> ByteString
BS.append ([Char] -> ByteString
BC.pack [Char]
"AMQP")
            ([Word8] -> ByteString
BS.pack [
                      Word8
1
                    , Word8
1 --TCP/IP

                    , Word8
0 --Major Version

                    , Word8
9 --Minor Version

                    ])

        -- S: connection.start

        Frame Word16
0 (MethodPayload (Connection_start Word8
_ Word8
_ FieldTable
serverProps (LongString ByteString
serverMechanisms) LongString
_)) <- Connection -> IO Frame
readFrame Connection
handle
        SASLMechanism
selectedSASL <- Connection -> ByteString -> IO SASLMechanism
selectSASLMechanism Connection
handle ByteString
serverMechanisms

        -- C: start_ok

        Connection -> Frame -> IO ()
writeFrame Connection
handle forall a b. (a -> b) -> a -> b
$ SASLMechanism -> Frame
start_ok SASLMechanism
selectedSASL
        -- S: secure or tune

        Frame Word16
0 (MethodPayload (Connection_tune Word16
channel_max Word32
frame_max Word16
sendHeartbeat)) <- Connection -> SASLMechanism -> IO Frame
handleSecureUntilTune Connection
handle SASLMechanism
selectedSASL
        -- C: tune_ok

        let maxFrameSize :: Word32
maxFrameSize = forall a. Ord a => a -> Maybe a -> a
chooseMin Word32
frame_max forall a b. (a -> b) -> a -> b
$ ConnectionOpts -> Maybe Word32
coMaxFrameSize ConnectionOpts
connOpts
            finalHeartbeatSec :: Word16
finalHeartbeatSec = forall a. a -> Maybe a -> a
fromMaybe Word16
sendHeartbeat (ConnectionOpts -> Maybe Word16
coHeartbeatDelay ConnectionOpts
connOpts)
            heartbeatTimeout :: Maybe Word16
heartbeatTimeout = forall (m :: * -> *) a. MonadPlus m => (a -> Bool) -> m a -> m a
mfilter (forall a. Eq a => a -> a -> Bool
/=Word16
0) (forall a. a -> Maybe a
Just Word16
finalHeartbeatSec)

            fixChanNum :: a -> a
fixChanNum a
x = if a
x forall a. Eq a => a -> a -> Bool
== a
0 then a
65535 else a
x
            maxChannel :: Word16
maxChannel = forall a. Ord a => a -> Maybe a -> a
chooseMin (forall {a}. (Eq a, Num a) => a -> a
fixChanNum Word16
channel_max) forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall {a}. (Eq a, Num a) => a -> a
fixChanNum forall a b. (a -> b) -> a -> b
$ ConnectionOpts -> Maybe Word16
coMaxChannel ConnectionOpts
connOpts

        Connection -> Frame -> IO ()
writeFrame Connection
handle (Word16 -> FramePayload -> Frame
Frame Word16
0 (MethodPayload -> FramePayload
MethodPayload
            (Word16 -> Word32 -> Word16 -> MethodPayload
Connection_tune_ok Word16
maxChannel Word32
maxFrameSize Word16
finalHeartbeatSec)
            ))
        -- C: open

        Connection -> Frame -> IO ()
writeFrame Connection
handle Frame
open

        -- S: open_ok

        Frame Word16
0 (MethodPayload (Connection_open_ok ShortString
_)) <- Connection -> IO Frame
readFrame Connection
handle

        -- Connection established!

        forall (m :: * -> *) a. Monad m => a -> m a
return (Word32
maxFrameSize, Word16
maxChannel, Maybe Word16
heartbeatTimeout, FieldTable
serverProps)

    --build Connection object

    MVar (IntMap (Channel, ThreadId))
cChannels <- forall a. a -> IO (MVar a)
newMVar forall a. IntMap a
IM.empty
    MVar (Maybe (CloseType, [Char]))
cClosed <- forall a. a -> IO (MVar a)
newMVar forall a. Maybe a
Nothing
    ChannelAllocator
cChanAllocator <- Int -> IO ChannelAllocator
newChannelAllocator forall a b. (a -> b) -> a -> b
$ forall a b. (Integral a, Num b) => a -> b
fromIntegral Word16
maxChannel
    Int
_ <- ChannelAllocator -> IO Int
allocateChannel ChannelAllocator
cChanAllocator -- allocate channel 0

    MVar ()
writeLock <- forall a. a -> IO (MVar a)
newMVar ()
    MVar ()
ccl <- forall a. IO (MVar a)
newEmptyMVar
    MVar [IO ()]
cClosedHandlers <- forall a. a -> IO (MVar a)
newMVar []
    MVar [(Text -> IO (), IO ())]
cBlockedHandlers <- forall a. a -> IO (MVar a)
newMVar []
    MVar ByteOffset
cLastReceived <- IO ByteOffset
getTimestamp forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= forall a. a -> IO (MVar a)
newMVar
    MVar ByteOffset
cLastSent <- IO ByteOffset
getTimestamp forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= forall a. a -> IO (MVar a)
newMVar
    MVar (Maybe ThreadId)
cThread <- forall a. a -> IO (MVar a)
newMVar forall a. Maybe a
Nothing

    let conn :: Connection
conn = Connection
-> ChannelAllocator
-> MVar (IntMap (Channel, ThreadId))
-> Int
-> MVar (Maybe (CloseType, [Char]))
-> MVar ()
-> MVar ()
-> MVar [IO ()]
-> MVar [(Text -> IO (), IO ())]
-> MVar ByteOffset
-> MVar ByteOffset
-> FieldTable
-> MVar (Maybe ThreadId)
-> Connection
Connection Connection
handle ChannelAllocator
cChanAllocator MVar (IntMap (Channel, ThreadId))
cChannels (forall a b. (Integral a, Num b) => a -> b
fromIntegral Word32
maxFrameSize) MVar (Maybe (CloseType, [Char]))
cClosed MVar ()
ccl MVar ()
writeLock MVar [IO ()]
cClosedHandlers MVar [(Text -> IO (), IO ())]
cBlockedHandlers MVar ByteOffset
cLastReceived MVar ByteOffset
cLastSent FieldTable
serverProps MVar (Maybe ThreadId)
cThread

    --spawn the connectionReceiver

    ThreadId
connThreadId <- forall a. IO a -> (Either SomeException a -> IO ()) -> IO ThreadId
forkFinally' (Connection -> IO ()
connectionReceiver Connection
conn) forall a b. (a -> b) -> a -> b
$ \Either SomeException ()
res -> do
        -- try closing socket

        forall e a. Exception e => IO a -> (e -> IO a) -> IO a
CE.catch (Connection -> IO ()
Conn.connectionClose Connection
handle) (\(SomeException
_ :: CE.SomeException) -> forall (m :: * -> *) a. Monad m => a -> m a
return ())

        -- mark as closed

        forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ MVar (Maybe (CloseType, [Char]))
cClosed forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Monad m => a -> m a
return forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. a -> Maybe a
Just forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. a -> Maybe a -> a
fromMaybe (CloseType
Abnormal, [Char]
"unknown reason")

        -- kill all channel-threads, making sure the channel threads will

        -- be killed taking into account the overall state of the

        -- connection: if the thread died for an unexpected exception,

        -- inform the channel threads downstream accordingly. Otherwise

        -- just use a normal 'killThread' finaliser.

        let finaliser :: ChanThreadKilledException
finaliser = SomeException -> ChanThreadKilledException
ChanThreadKilledException forall a b. (a -> b) -> a -> b
$ case Either SomeException ()
res of
                Left SomeException
ex -> SomeException
ex
                Right ()
_ -> forall e. Exception e => e -> SomeException
CE.toException AsyncException
CE.ThreadKilled
        forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ MVar (IntMap (Channel, ThreadId))
cChannels forall a b. (a -> b) -> a -> b
$ \IntMap (Channel, ThreadId)
x -> do
            forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (forall a b c. (a -> b -> c) -> b -> a -> c
flip forall e. Exception e => ThreadId -> e -> IO ()
CE.throwTo ChanThreadKilledException
finaliser forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a b. (a, b) -> b
snd) forall a b. (a -> b) -> a -> b
$ forall a. IntMap a -> [a]
IM.elems IntMap (Channel, ThreadId)
x
            forall (m :: * -> *) a. Monad m => a -> m a
return forall a. IntMap a
IM.empty

        -- mark connection as closed, so all pending calls to 'closeConnection' can now return

        forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall a. MVar a -> a -> IO Bool
tryPutMVar MVar ()
ccl ()

        -- notify connection-close-handlers

        forall a b. MVar a -> (a -> IO b) -> IO b
withMVar MVar [IO ()]
cClosedHandlers forall (t :: * -> *) (m :: * -> *) a.
(Foldable t, Monad m) =>
t (m a) -> m ()
sequence_

    case Maybe Word16
heartbeatTimeout of
        Maybe Word16
Nothing      -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
        Just Word16
timeout -> do
            ThreadId
heartbeatThread <- Connection -> Int -> ThreadId -> IO ThreadId
watchHeartbeats Connection
conn (forall a b. (Integral a, Num b) => a -> b
fromIntegral Word16
timeout) ThreadId
connThreadId
            Connection -> Bool -> IO () -> IO ()
addConnectionClosedHandler Connection
conn Bool
True (ThreadId -> IO ()
killThread ThreadId
heartbeatThread)

    forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ MVar (Maybe ThreadId)
cThread forall a b. (a -> b) -> a -> b
$ \Maybe ThreadId
_ -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a. a -> Maybe a
Just ThreadId
connThreadId
    forall (m :: * -> *) a. Monad m => a -> m a
return Connection
conn
  where
    connect :: [SomeException] -> [([Char], PortNumber)] -> IO Connection
connect [SomeException]
excs (([Char]
host, PortNumber
port) : [([Char], PortNumber)]
rest) = do
        ConnectionContext
ctx <- IO ConnectionContext
Conn.initConnectionContext
        Either SomeException Connection
result <- forall e a. Exception e => IO a -> IO (Either e a)
CE.try forall a b. (a -> b) -> a -> b
$ ConnectionContext -> ConnectionParams -> IO Connection
Conn.connectTo ConnectionContext
ctx forall a b. (a -> b) -> a -> b
$ Conn.ConnectionParams
                    { connectionHostname :: [Char]
Conn.connectionHostname  = [Char]
host
                    , connectionPort :: PortNumber
Conn.connectionPort      = PortNumber
port
                    , connectionUseSecure :: Maybe TLSSettings
Conn.connectionUseSecure = Maybe TLSSettings
tlsSettings
                    , connectionUseSocks :: Maybe ProxySettings
Conn.connectionUseSocks  = forall a. Maybe a
Nothing
                    }
        forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either (\(SomeException
ex :: CE.SomeException) -> [SomeException] -> [([Char], PortNumber)] -> IO Connection
connect (SomeException
exforall a. a -> [a] -> [a]
:[SomeException]
excs) [([Char], PortNumber)]
rest)
               forall (m :: * -> *) a. Monad m => a -> m a
return
               Either SomeException Connection
result
    connect [SomeException]
excs [] = forall e a. Exception e => e -> IO a
CE.throwIO forall a b. (a -> b) -> a -> b
$ CloseType -> [Char] -> AMQPException
ConnectionClosedException CloseType
Abnormal forall a b. (a -> b) -> a -> b
$ [Char]
"Could not connect to any of the provided brokers: " forall a. [a] -> [a] -> [a]
++ forall a. Show a => a -> [Char]
show (forall a b. [a] -> [b] -> [(a, b)]
zip (ConnectionOpts -> [([Char], PortNumber)]
coServers ConnectionOpts
connOpts) (forall a. [a] -> [a]
reverse [SomeException]
excs))
    tlsSettings :: Maybe TLSSettings
tlsSettings = forall b a. b -> (a -> b) -> Maybe a -> b
maybe forall a. Maybe a
Nothing TLSSettings -> Maybe TLSSettings
connectionTLSSettings (ConnectionOpts -> Maybe TLSSettings
coTLSSettings ConnectionOpts
connOpts)
    selectSASLMechanism :: Connection -> ByteString -> IO SASLMechanism
selectSASLMechanism Connection
handle ByteString
serverMechanisms =
        let serverSaslList :: [Text]
serverSaslList = (Char -> Bool) -> Text -> [Text]
T.split (forall a. Eq a => a -> a -> Bool
== Char
' ') forall a b. (a -> b) -> a -> b
$ ByteString -> Text
E.decodeUtf8 ByteString
serverMechanisms
            clientMechanisms :: [SASLMechanism]
clientMechanisms = ConnectionOpts -> [SASLMechanism]
coAuth ConnectionOpts
connOpts
            clientSaslList :: [Text]
clientSaslList = forall a b. (a -> b) -> [a] -> [b]
map SASLMechanism -> Text
saslName [SASLMechanism]
clientMechanisms
            maybeSasl :: Maybe SASLMechanism
maybeSasl = forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Maybe a
F.find (\(SASLMechanism Text
name ByteString
_ Maybe (ByteString -> IO ByteString)
_) -> forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
elem Text
name [Text]
serverSaslList) [SASLMechanism]
clientMechanisms
        in forall {b}. Maybe b -> Connection -> [Char] -> IO b
abortIfNothing Maybe SASLMechanism
maybeSasl Connection
handle
            ([Char]
"None of the provided SASL mechanisms "forall a. [a] -> [a] -> [a]
++forall a. Show a => a -> [Char]
show [Text]
clientSaslListforall a. [a] -> [a] -> [a]
++[Char]
" is supported by the server "forall a. [a] -> [a] -> [a]
++forall a. Show a => a -> [Char]
show [Text]
serverSaslListforall a. [a] -> [a] -> [a]
++[Char]
".")

    start_ok :: SASLMechanism -> Frame
start_ok SASLMechanism
sasl = Word16 -> FramePayload -> Frame
Frame Word16
0 forall a b. (a -> b) -> a -> b
$ MethodPayload -> FramePayload
MethodPayload forall a b. (a -> b) -> a -> b
$ FieldTable
-> ShortString -> LongString -> ShortString -> MethodPayload
Connection_start_ok
                        FieldTable
clientProperties
                        (Text -> ShortString
ShortString forall a b. (a -> b) -> a -> b
$ SASLMechanism -> Text
saslName SASLMechanism
sasl)
                        (ByteString -> LongString
LongString forall a b. (a -> b) -> a -> b
$ SASLMechanism -> ByteString
saslInitialResponse SASLMechanism
sasl)
                        (Text -> ShortString
ShortString Text
"en_US")
      where
        clientProperties :: FieldTable
clientProperties = Map Text FieldValue -> FieldTable
FieldTable forall a b. (a -> b) -> a -> b
$ forall k a. Ord k => [(k, a)] -> Map k a
M.fromList forall a b. (a -> b) -> a -> b
$ [
            (Text
"platform", ByteString -> FieldValue
FVString ByteString
"Haskell"),
            (Text
"version" , ByteString -> FieldValue
FVString forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> ByteString
E.encodeUtf8 forall b c a. (b -> c) -> (a -> b) -> a -> c
. [Char] -> Text
T.pack forall a b. (a -> b) -> a -> b
$ Version -> [Char]
showVersion Version
version),
            (Text
"capabilities", FieldTable -> FieldValue
FVFieldTable FieldTable
clientCapabilities)
          ] forall a. [a] -> [a] -> [a]
++ forall b a. b -> (a -> b) -> Maybe a -> b
maybe [] (\Text
x -> [(Text
"connection_name", ByteString -> FieldValue
FVString forall a b. (a -> b) -> a -> b
$ Text -> ByteString
E.encodeUtf8 Text
x)]) (ConnectionOpts -> Maybe Text
coName ConnectionOpts
connOpts)

        clientCapabilities :: FieldTable
clientCapabilities = Map Text FieldValue -> FieldTable
FieldTable forall a b. (a -> b) -> a -> b
$ forall k a. Ord k => [(k, a)] -> Map k a
M.fromList [
            (Text
"consumer_cancel_notify", Bool -> FieldValue
FVBool Bool
True),
            (Text
"connection.blocked", Bool -> FieldValue
FVBool Bool
True)
          ]

    handleSecureUntilTune :: Connection -> SASLMechanism -> IO Frame
handleSecureUntilTune Connection
handle SASLMechanism
sasl = do
        Frame
tuneOrSecure <- Connection -> IO Frame
readFrame Connection
handle
        case Frame
tuneOrSecure of
            Frame Word16
0 (MethodPayload (Connection_secure (LongString ByteString
challenge))) -> do
                ByteString -> IO ByteString
processChallenge <- forall {b}. Maybe b -> Connection -> [Char] -> IO b
abortIfNothing (SASLMechanism -> Maybe (ByteString -> IO ByteString)
saslChallengeFunc SASLMechanism
sasl)
                    Connection
handle forall a b. (a -> b) -> a -> b
$ [Char]
"The server provided a challenge, but the selected SASL mechanism "forall a. [a] -> [a] -> [a]
++forall a. Show a => a -> [Char]
show (SASLMechanism -> Text
saslName SASLMechanism
sasl)forall a. [a] -> [a] -> [a]
++[Char]
" is not equipped with a challenge processing function."
                ByteString
challengeResponse <- ByteString -> IO ByteString
processChallenge ByteString
challenge
                Connection -> Frame -> IO ()
writeFrame Connection
handle (Word16 -> FramePayload -> Frame
Frame Word16
0 (MethodPayload -> FramePayload
MethodPayload (LongString -> MethodPayload
Connection_secure_ok (ByteString -> LongString
LongString ByteString
challengeResponse))))
                Connection -> SASLMechanism -> IO Frame
handleSecureUntilTune Connection
handle SASLMechanism
sasl

            tune :: Frame
tune@(Frame Word16
0 (MethodPayload Connection_tune{})) -> forall (m :: * -> *) a. Monad m => a -> m a
return Frame
tune
            Frame
x -> forall a. HasCallStack => [Char] -> a
error forall a b. (a -> b) -> a -> b
$ [Char]
"handleSecureUntilTune fail. received message: "forall a. [a] -> [a] -> [a]
++forall a. Show a => a -> [Char]
show Frame
x

    open :: Frame
open = Word16 -> FramePayload -> Frame
Frame Word16
0 forall a b. (a -> b) -> a -> b
$ MethodPayload -> FramePayload
MethodPayload forall a b. (a -> b) -> a -> b
$ ShortString -> ShortString -> Bool -> MethodPayload
Connection_open
        (Text -> ShortString
ShortString forall a b. (a -> b) -> a -> b
$ ConnectionOpts -> Text
coVHost ConnectionOpts
connOpts)
        (Text -> ShortString
ShortString forall a b. (a -> b) -> a -> b
$ [Char] -> Text
T.pack [Char]
"") -- capabilities; deprecated in 0-9-1

        Bool
True -- insist; deprecated in 0-9-1


    abortHandshake :: Connection -> [Char] -> IO b
abortHandshake Connection
handle [Char]
msg = do
        Connection -> IO ()
Conn.connectionClose Connection
handle
        forall e a. Exception e => e -> IO a
CE.throwIO forall a b. (a -> b) -> a -> b
$ CloseType -> [Char] -> AMQPException
ConnectionClosedException CloseType
Abnormal [Char]
msg

    abortIfNothing :: Maybe b -> Connection -> [Char] -> IO b
abortIfNothing Maybe b
m Connection
handle [Char]
msg = case Maybe b
m of
        Maybe b
Nothing -> forall {b}. Connection -> [Char] -> IO b
abortHandshake Connection
handle [Char]
msg
        Just b
a  -> forall (m :: * -> *) a. Monad m => a -> m a
return b
a


watchHeartbeats :: Connection -> Int -> ThreadId -> IO ThreadId
watchHeartbeats :: Connection -> Int -> ThreadId -> IO ThreadId
watchHeartbeats Connection
conn Int
timeout ThreadId
connThread = Int -> IO () -> IO ThreadId
scheduleAtFixedRate Int
rate forall a b. (a -> b) -> a -> b
$ do
    IO ()
checkSendTimeout
    IO ()
checkReceiveTimeout
  where
    rate :: Int
rate = Int
timeout forall a. Num a => a -> a -> a
* Int
1000 forall a. Num a => a -> a -> a
* Int
250 -- timeout / 4 in µs

    receiveTimeout :: ByteOffset
receiveTimeout = forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
rate forall a. Num a => a -> a -> a
* ByteOffset
4 forall a. Num a => a -> a -> a
* ByteOffset
2 -- 2*timeout in µs

    sendTimeout :: ByteOffset
sendTimeout = forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
rate forall a. Num a => a -> a -> a
* ByteOffset
2 -- timeout/2 in µs


    skippedBeatEx :: AMQPException
skippedBeatEx = CloseType -> [Char] -> AMQPException
ConnectionClosedException CloseType
Abnormal [Char]
"killed connection after missing 2 consecutive heartbeats"

    checkReceiveTimeout :: IO ()
checkReceiveTimeout = MVar ByteOffset -> ByteOffset -> IO () -> IO ()
doCheck (Connection -> MVar ByteOffset
connLastReceived Connection
conn) ByteOffset
receiveTimeout forall a b. (a -> b) -> a -> b
$
        Connection -> CloseType -> SomeException -> ThreadId -> IO ()
killConnection Connection
conn CloseType
Abnormal (forall e. Exception e => e -> SomeException
CE.toException AMQPException
skippedBeatEx) ThreadId
connThread

    checkSendTimeout :: IO ()
checkSendTimeout = MVar ByteOffset -> ByteOffset -> IO () -> IO ()
doCheck (Connection -> MVar ByteOffset
connLastSent Connection
conn) ByteOffset
sendTimeout forall a b. (a -> b) -> a -> b
$
        Connection -> Frame -> IO ()
writeFrame (Connection -> Connection
connHandle Connection
conn) (Word16 -> FramePayload -> Frame
Frame Word16
0 FramePayload
HeartbeatPayload)

    doCheck :: MVar ByteOffset -> ByteOffset -> IO () -> IO ()
doCheck MVar ByteOffset
var ByteOffset
timeout_µs IO ()
action = forall a b. MVar a -> (a -> IO b) -> IO b
withMVar MVar ByteOffset
var forall a b. (a -> b) -> a -> b
$ \ByteOffset
lastFrameTime -> do
        ByteOffset
time <- IO ByteOffset
getTimestamp
        forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (ByteOffset
time forall a. Ord a => a -> a -> Bool
>= ByteOffset
lastFrameTime forall a. Num a => a -> a -> a
+ ByteOffset
timeout_µs) IO ()
action

updateLastSent :: Connection -> IO ()
updateLastSent :: Connection -> IO ()
updateLastSent Connection
conn = forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ (Connection -> MVar ByteOffset
connLastSent Connection
conn) (forall a b. a -> b -> a
const IO ByteOffset
getTimestamp)

updateLastReceived :: Connection -> IO ()
updateLastReceived :: Connection -> IO ()
updateLastReceived Connection
conn = forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ (Connection -> MVar ByteOffset
connLastReceived Connection
conn) (forall a b. a -> b -> a
const IO ByteOffset
getTimestamp)

-- | kill the connection thread abruptly

killConnection :: Connection -> CloseType -> CE.SomeException -> ThreadId -> IO ()
killConnection :: Connection -> CloseType -> SomeException -> ThreadId -> IO ()
killConnection Connection
conn CloseType
closeType SomeException
ex ThreadId
connThreadId = do
    forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ (Connection -> MVar (Maybe (CloseType, [Char]))
connClosed Connection
conn) forall a b. (a -> b) -> a -> b
$ forall a b. a -> b -> a
const forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a. a -> Maybe a
Just (CloseType
closeType, forall a. Show a => a -> [Char]
show SomeException
ex)
    forall e. Exception e => ThreadId -> e -> IO ()
throwTo ThreadId
connThreadId SomeException
ex

-- | closes a connection

--

-- Make sure to call this function before your program exits to ensure that all published messages are received by the server.

closeConnection :: Connection -> IO ()
closeConnection :: Connection -> IO ()
closeConnection Connection
c = do
    forall e a. Exception e => IO a -> (e -> IO a) -> IO a
CE.catch (
        forall a b. MVar a -> (a -> IO b) -> IO b
withMVar (Connection -> MVar ()
connWriteLock Connection
c) forall a b. (a -> b) -> a -> b
$ \()
_ -> Connection -> Frame -> IO ()
writeFrame (Connection -> Connection
connHandle Connection
c) forall a b. (a -> b) -> a -> b
$ Word16 -> FramePayload -> Frame
Frame Word16
0 forall a b. (a -> b) -> a -> b
$ MethodPayload -> FramePayload
MethodPayload forall a b. (a -> b) -> a -> b
$ Word16 -> ShortString -> Word16 -> Word16 -> MethodPayload
Connection_close
            --TODO: set these values

            Word16
0 -- reply_code

            (Text -> ShortString
ShortString Text
"") -- reply_text

            Word16
0 -- class_id

            Word16
0 -- method_id

        )
        (\ (IOError
e :: CE.IOException) -> do
            -- Make sure the connection is closed.

            -- (This pattern match can't fail, since openConnection'' will always fill

            --  the variable in, and there's no other way to get a connection.)

            Just ThreadId
thrID <- forall a. MVar a -> IO a
readMVar (Connection -> MVar (Maybe ThreadId)
connThread Connection
c)
            Connection -> CloseType -> SomeException -> ThreadId -> IO ()
killConnection Connection
c CloseType
Abnormal (forall e. Exception e => e -> SomeException
CE.toException IOError
e) ThreadId
thrID
        )

    -- wait for connection_close_ok by the server; this MVar gets filled in the CE.finally handler in openConnection'

    forall a. MVar a -> IO a
readMVar forall a b. (a -> b) -> a -> b
$ Connection -> MVar ()
connClosedLock Connection
c
    forall (m :: * -> *) a. Monad m => a -> m a
return ()

-- | get the server properties sent in connection.start

getServerProperties :: Connection -> IO FieldTable
getServerProperties :: Connection -> IO FieldTable
getServerProperties = forall (m :: * -> *) a. Monad m => a -> m a
return forall b c a. (b -> c) -> (a -> b) -> a -> c
. Connection -> FieldTable
connServerProperties

-- | @addConnectionClosedHandler conn ifClosed handler@ adds a @handler@ that will be called after the connection is closed (either by calling @closeConnection@ or by an exception). If the @ifClosed@ parameter is True and the connection is already closed, the handler will be called immediately. If @ifClosed == False@ and the connection is already closed, the handler will never be called

addConnectionClosedHandler :: Connection -> Bool -> IO () -> IO ()
addConnectionClosedHandler :: Connection -> Bool -> IO () -> IO ()
addConnectionClosedHandler Connection
conn Bool
ifClosed IO ()
handler = do
    forall a b. MVar a -> (a -> IO b) -> IO b
withMVar (Connection -> MVar (Maybe (CloseType, [Char]))
connClosed Connection
conn) forall a b. (a -> b) -> a -> b
$ \case
        -- connection is already closed, so call the handler directly

        Just (CloseType, [Char])
_ | Bool
ifClosed -> IO ()
handler

        -- otherwise add it to the list

        Maybe (CloseType, [Char])
_ -> forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ (Connection -> MVar [IO ()]
connClosedHandlers Connection
conn) forall a b. (a -> b) -> a -> b
$ \[IO ()]
old -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ IO ()
handlerforall a. a -> [a] -> [a]
:[IO ()]
old

-- | @addConnectionBlockedHandler conn blockedHandler unblockedHandler@ adds handlers that will be called

-- when a connection gets blocked/unlocked due to server resource constraints.

--

-- More information: <https://www.rabbitmq.com/connection-blocked.html>

addConnectionBlockedHandler :: Connection -> (Text -> IO ()) -> IO () -> IO ()
addConnectionBlockedHandler :: Connection -> (Text -> IO ()) -> IO () -> IO ()
addConnectionBlockedHandler Connection
conn Text -> IO ()
blockedHandler IO ()
unblockedHandler =
    forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ (Connection -> MVar [(Text -> IO (), IO ())]
connBlockedHandlers Connection
conn) forall a b. (a -> b) -> a -> b
$ \[(Text -> IO (), IO ())]
old -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ (Text -> IO ()
blockedHandler, IO ()
unblockedHandler)forall a. a -> [a] -> [a]
:[(Text -> IO (), IO ())]
old

readFrame :: Conn.Connection -> IO Frame
readFrame :: Connection -> IO Frame
readFrame Connection
handle = do
    ByteString
strictDat <- Connection -> Int -> IO ByteString
connectionGetExact Connection
handle Int
7
    let dat :: ByteString
dat = ByteString -> ByteString
toLazy ByteString
strictDat
    -- NB: userError returns an IOException so it will be catched in 'connectionReceiver'

    forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (ByteString -> Bool
BL.null ByteString
dat) forall a b. (a -> b) -> a -> b
$ forall e a. Exception e => e -> IO a
CE.throwIO forall a b. (a -> b) -> a -> b
$ [Char] -> IOError
userError [Char]
"connection not open"
    let len :: Int
len = forall a b. (Integral a, Num b) => a -> b
fromIntegral forall a b. (a -> b) -> a -> b
$ ByteString -> Word32
peekFrameSize ByteString
dat
    ByteString
strictDat' <- Connection -> Int -> IO ByteString
connectionGetExact Connection
handle (Int
lenforall a. Num a => a -> a -> a
+Int
1) -- +1 for the terminating 0xCE

    let dat' :: ByteString
dat' = ByteString -> ByteString
toLazy ByteString
strictDat'
    forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (ByteString -> Bool
BL.null ByteString
dat') forall a b. (a -> b) -> a -> b
$ forall e a. Exception e => e -> IO a
CE.throwIO forall a b. (a -> b) -> a -> b
$ [Char] -> IOError
userError [Char]
"connection not open"
#if MIN_VERSION_binary(0, 7, 0)
    let ret :: Either
  (ByteString, ByteOffset, [Char]) (ByteString, ByteOffset, Frame)
ret = forall a.
Get a
-> ByteString
-> Either
     (ByteString, ByteOffset, [Char]) (ByteString, ByteOffset, a)
runGetOrFail forall t. Binary t => Get t
get (ByteString -> ByteString -> ByteString
BL.append ByteString
dat ByteString
dat')
    case Either
  (ByteString, ByteOffset, [Char]) (ByteString, ByteOffset, Frame)
ret of
        Left (ByteString
_, ByteOffset
_, [Char]
errMsg) -> forall a. HasCallStack => [Char] -> a
error forall a b. (a -> b) -> a -> b
$ [Char]
"readFrame fail: " forall a. [a] -> [a] -> [a]
++ [Char]
errMsg
        Right (ByteString
_, ByteOffset
consumedBytes, Frame
_) | ByteOffset
consumedBytes forall a. Eq a => a -> a -> Bool
/= forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int
lenforall a. Num a => a -> a -> a
+Int
8) ->
            forall a. HasCallStack => [Char] -> a
error forall a b. (a -> b) -> a -> b
$ [Char]
"readFrame: parser should read " forall a. [a] -> [a] -> [a]
++ forall a. Show a => a -> [Char]
show (Int
lenforall a. Num a => a -> a -> a
+Int
8) forall a. [a] -> [a] -> [a]
++ [Char]
" bytes; but read " forall a. [a] -> [a] -> [a]
++ forall a. Show a => a -> [Char]
show ByteOffset
consumedBytes
        Right (ByteString
_, ByteOffset
_, Frame
frame) -> forall (m :: * -> *) a. Monad m => a -> m a
return Frame
frame
#else
    let (frame, _, consumedBytes) = runGetState get (BL.append dat dat') 0
    if consumedBytes /= fromIntegral (len+8)
        then error $ "readFrameSock: parser should read "++show (len+8)++" bytes; but read "++show consumedBytes
        else return ()
    return frame
#endif

-- belongs in connection package and will be removed once it lands there

connectionGetExact :: Conn.Connection -> Int -> IO BS.ByteString
connectionGetExact :: Connection -> Int -> IO ByteString
connectionGetExact Connection
conn Int
x = ByteString -> Int -> IO ByteString
loop ByteString
BS.empty Int
0
  where loop :: ByteString -> Int -> IO ByteString
loop ByteString
bs Int
y
          | Int
y forall a. Eq a => a -> a -> Bool
== Int
x = forall (m :: * -> *) a. Monad m => a -> m a
return ByteString
bs
          | Bool
otherwise = do
            ByteString
next <- Connection -> Int -> IO ByteString
Conn.connectionGet Connection
conn (Int
x forall a. Num a => a -> a -> a
- Int
y)
            ByteString -> Int -> IO ByteString
loop (ByteString -> ByteString -> ByteString
BS.append ByteString
bs ByteString
next) (Int
y forall a. Num a => a -> a -> a
+ ByteString -> Int
BS.length ByteString
next)

writeFrame :: Conn.Connection -> Frame -> IO ()
writeFrame :: Connection -> Frame -> IO ()
writeFrame Connection
handle Frame
f = do
    Connection -> ByteString -> IO ()
Conn.connectionPut Connection
handle forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> ByteString
toStrict forall b c a. (b -> c) -> (a -> b) -> a -> c
. Put -> ByteString
runPut forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall t. Binary t => t -> Put
put forall a b. (a -> b) -> a -> b
$ Frame
f

------------------------ CHANNEL -----------------------------


{- | A connection to an AMQP server is made up of separate channels. It is recommended to use a separate channel for each thread in your application that talks to the AMQP server (but you don't have to as channels are thread-safe)
-}
data Channel = Channel {
    Channel -> Connection
connection :: Connection,
    Channel -> Chan FramePayload
inQueue :: Chan FramePayload, -- incoming frames (from Connection)

    Channel -> MVar (Seq (MVar Assembly))
outstandingResponses :: MVar (Seq.Seq (MVar Assembly)), -- for every request an MVar is stored here waiting for the response

    Channel -> Word16
channelID :: Word16,
    Channel -> MVar Int
lastConsumerTag :: MVar Int,

    Channel -> MVar Int
nextPublishSeqNum :: MVar Int,
    Channel -> TVar IntSet
unconfirmedSet :: TVar IntSet.IntSet,
    Channel -> TVar IntSet
ackedSet :: TVar IntSet.IntSet,  -- delivery tags

    Channel -> TVar IntSet
nackedSet :: TVar IntSet.IntSet, -- accumulate here.


    Channel -> Lock
chanActive :: Lock, -- used for flow-control. if lock is closed, no content methods will be sent

    Channel -> MVar (Maybe (CloseType, [Char]))
chanClosed :: MVar (Maybe (CloseType, String)),
    Channel
-> MVar (Map Text ((Message, Envelope) -> IO (), Text -> IO ()))
consumers :: MVar (M.Map Text ((Message, Envelope) -> IO (), -- who is consumer of a queue? (consumerTag => callback)

                                   ConsumerTag -> IO ())),    -- cancellation notification callback

    Channel -> MVar [(Message, PublishError) -> IO ()]
returnListeners :: MVar [(Message, PublishError) -> IO ()],
    Channel -> MVar [(LongLongInt, Bool, AckType) -> IO ()]
confirmListeners :: MVar [(Word64, Bool, AckType) -> IO ()],
    Channel -> MVar [SomeException -> IO ()]
chanExceptionHandlers :: MVar [CE.SomeException -> IO ()]
}

-- | Thrown in the channel thread when the connection gets closed.

-- When handling exceptions in a subscription callback, make sure to re-throw this so the channel thread can be stopped.

data ChanThreadKilledException = ChanThreadKilledException { ChanThreadKilledException -> SomeException
cause :: CE.SomeException }
  deriving (Int -> ChanThreadKilledException -> ShowS
[ChanThreadKilledException] -> ShowS
ChanThreadKilledException -> [Char]
forall a.
(Int -> a -> ShowS) -> (a -> [Char]) -> ([a] -> ShowS) -> Show a
showList :: [ChanThreadKilledException] -> ShowS
$cshowList :: [ChanThreadKilledException] -> ShowS
show :: ChanThreadKilledException -> [Char]
$cshow :: ChanThreadKilledException -> [Char]
showsPrec :: Int -> ChanThreadKilledException -> ShowS
$cshowsPrec :: Int -> ChanThreadKilledException -> ShowS
Show)

instance CE.Exception ChanThreadKilledException

-- | If the given exception is an instance of ChanThreadKilledException, this method returns

--    the inner exception. Otherwise the exception is returned unchanged.

unwrapChanThreadKilledException :: CE.SomeException -> CE.SomeException
unwrapChanThreadKilledException :: SomeException -> SomeException
unwrapChanThreadKilledException SomeException
e = forall b a. b -> (a -> b) -> Maybe a -> b
maybe SomeException
e ChanThreadKilledException -> SomeException
cause forall a b. (a -> b) -> a -> b
$ forall e. Exception e => SomeException -> Maybe e
CE.fromException SomeException
e

msgFromContentHeaderProperties :: ContentHeaderProperties -> BL.ByteString -> Message
msgFromContentHeaderProperties :: ContentHeaderProperties -> ByteString -> Message
msgFromContentHeaderProperties (CHBasic Maybe ShortString
content_type Maybe ShortString
content_encoding Maybe FieldTable
headers Maybe Word8
delivery_mode Maybe Word8
priority Maybe ShortString
correlation_id Maybe ShortString
reply_to Maybe ShortString
expiration Maybe ShortString
message_id Maybe LongLongInt
timestamp Maybe ShortString
message_type Maybe ShortString
user_id Maybe ShortString
application_id Maybe ShortString
cluster_id) ByteString
body =
    let msgId :: Maybe Text
msgId = Maybe ShortString -> Maybe Text
fromShortString Maybe ShortString
message_id
        contentType :: Maybe Text
contentType = Maybe ShortString -> Maybe Text
fromShortString Maybe ShortString
content_type
        contentEncoding :: Maybe Text
contentEncoding = Maybe ShortString -> Maybe Text
fromShortString Maybe ShortString
content_encoding
        replyTo :: Maybe Text
replyTo = Maybe ShortString -> Maybe Text
fromShortString Maybe ShortString
reply_to
        correlationID :: Maybe Text
correlationID = Maybe ShortString -> Maybe Text
fromShortString Maybe ShortString
correlation_id
        messageType :: Maybe Text
messageType = Maybe ShortString -> Maybe Text
fromShortString Maybe ShortString
message_type
        userId :: Maybe Text
userId = Maybe ShortString -> Maybe Text
fromShortString Maybe ShortString
user_id
        applicationId :: Maybe Text
applicationId = Maybe ShortString -> Maybe Text
fromShortString Maybe ShortString
application_id
        clusterId :: Maybe Text
clusterId = Maybe ShortString -> Maybe Text
fromShortString Maybe ShortString
cluster_id
    in ByteString
-> Maybe DeliveryMode
-> Maybe LongLongInt
-> Maybe Text
-> Maybe Text
-> Maybe Text
-> Maybe Text
-> Maybe Text
-> Maybe Text
-> Maybe Text
-> Maybe Text
-> Maybe Word8
-> Maybe Text
-> Maybe Text
-> Maybe FieldTable
-> Message
Message ByteString
body (forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Word8 -> DeliveryMode
intToDeliveryMode Maybe Word8
delivery_mode) Maybe LongLongInt
timestamp Maybe Text
msgId Maybe Text
messageType Maybe Text
userId Maybe Text
applicationId Maybe Text
clusterId Maybe Text
contentType Maybe Text
contentEncoding Maybe Text
replyTo Maybe Word8
priority Maybe Text
correlationID (Maybe ShortString -> Maybe Text
fromShortString Maybe ShortString
expiration) Maybe FieldTable
headers
  where
    fromShortString :: Maybe ShortString -> Maybe Text
fromShortString (Just (ShortString Text
s)) = forall a. a -> Maybe a
Just Text
s
    fromShortString Maybe ShortString
_ = forall a. Maybe a
Nothing
msgFromContentHeaderProperties ContentHeaderProperties
c ByteString
_ = forall a. HasCallStack => [Char] -> a
error ([Char]
"Unknown content header properties: " forall a. [a] -> [a] -> [a]
++ forall a. Show a => a -> [Char]
show ContentHeaderProperties
c)

-- | The thread that is run for every channel

channelReceiver :: Channel -> IO ()
channelReceiver :: Channel -> IO ()
channelReceiver Channel
chan = do
    --read incoming frames; they are put there by a Connection thread

    Assembly
p <- Chan FramePayload -> IO Assembly
readAssembly forall a b. (a -> b) -> a -> b
$ Channel -> Chan FramePayload
inQueue Channel
chan
    if Assembly -> Bool
isResponse Assembly
p
        then do
            IO ()
action <- forall a b. MVar a -> (a -> IO (a, b)) -> IO b
modifyMVar (Channel -> MVar (Seq (MVar Assembly))
outstandingResponses Channel
chan) forall a b. (a -> b) -> a -> b
$ \Seq (MVar Assembly)
val -> do
                        case forall a. Seq a -> ViewL a
Seq.viewl Seq (MVar Assembly)
val of
                            MVar Assembly
x Seq.:< Seq (MVar Assembly)
rest -> do
                                forall (m :: * -> *) a. Monad m => a -> m a
return (Seq (MVar Assembly)
rest, forall a. MVar a -> a -> IO ()
putMVar MVar Assembly
x Assembly
p)
                            ViewL (MVar Assembly)
Seq.EmptyL -> do
                                forall (m :: * -> *) a. Monad m => a -> m a
return (Seq (MVar Assembly)
val, forall e a. Exception e => e -> IO a
CE.throwIO forall a b. (a -> b) -> a -> b
$ [Char] -> IOError
userError [Char]
"got response, but have no corresponding request")
            IO ()
action

        --handle asynchronous assemblies

        else Assembly -> IO ()
handleAsync Assembly
p
    Channel -> IO ()
channelReceiver Channel
chan
  where
    isResponse :: Assembly -> Bool
    isResponse :: Assembly -> Bool
isResponse (ContentMethod Basic_deliver{} ContentHeaderProperties
_ ByteString
_) = Bool
False
    isResponse (ContentMethod Basic_return{} ContentHeaderProperties
_ ByteString
_) = Bool
False
    isResponse (SimpleMethod (Channel_flow Bool
_)) = Bool
False
    isResponse (SimpleMethod Channel_close{}) = Bool
False
    isResponse (SimpleMethod (Basic_ack LongLongInt
_ Bool
_)) = Bool
False
    isResponse (SimpleMethod Basic_nack{}) = Bool
False
    isResponse (SimpleMethod (Basic_cancel ShortString
_ Bool
_)) = Bool
False
    isResponse Assembly
_ = Bool
True

    --Basic.Deliver: forward msg to registered consumer

    handleAsync :: Assembly -> IO ()
handleAsync (ContentMethod (Basic_deliver (ShortString Text
consumerTag) LongLongInt
deliveryTag Bool
redelivered (ShortString Text
exchange)
                                                (ShortString Text
routingKey))
                                ContentHeaderProperties
properties ByteString
body) =
        forall a b. MVar a -> (a -> IO b) -> IO b
withMVar (Channel
-> MVar (Map Text ((Message, Envelope) -> IO (), Text -> IO ()))
consumers Channel
chan) (\Map Text ((Message, Envelope) -> IO (), Text -> IO ())
s -> do
            case forall k a. Ord k => k -> Map k a -> Maybe a
M.lookup Text
consumerTag Map Text ((Message, Envelope) -> IO (), Text -> IO ())
s of
                Just ((Message, Envelope) -> IO ()
subscriber, Text -> IO ()
_) -> do
                    let msg :: Message
msg = ContentHeaderProperties -> ByteString -> Message
msgFromContentHeaderProperties ContentHeaderProperties
properties ByteString
body
                    let env :: Envelope
env = Envelope {envDeliveryTag :: LongLongInt
envDeliveryTag = LongLongInt
deliveryTag, envRedelivered :: Bool
envRedelivered = Bool
redelivered,
                                    envExchangeName :: Text
envExchangeName = Text
exchange, envRoutingKey :: Text
envRoutingKey = Text
routingKey, envChannel :: Channel
envChannel = Channel
chan}

                    forall a. IO a -> [Handler a] -> IO a
CE.catches ((Message, Envelope) -> IO ()
subscriber (Message
msg, Envelope
env)) [
                        forall a e. Exception e => (e -> IO a) -> Handler a
CE.Handler (\(ChanThreadKilledException
e::ChanThreadKilledException) -> forall e a. Exception e => e -> IO a
CE.throwIO forall a b. (a -> b) -> a -> b
$ ChanThreadKilledException -> SomeException
cause ChanThreadKilledException
e),
                        forall a e. Exception e => (e -> IO a) -> Handler a
CE.Handler (\(SomeException
e::CE.SomeException) -> Handle -> [Char] -> IO ()
hPutStrLn Handle
stderr forall a b. (a -> b) -> a -> b
$ [Char]
"AMQP callback threw exception: " forall a. [a] -> [a] -> [a]
++ forall a. Show a => a -> [Char]
show SomeException
e)
                      ]
                Maybe ((Message, Envelope) -> IO (), Text -> IO ())
Nothing ->
                    -- got a message, but have no registered subscriber; so drop it

                    forall (m :: * -> *) a. Monad m => a -> m a
return ()
            )
    handleAsync (SimpleMethod (Channel_close Word16
_ (ShortString Text
errorMsg) Word16
_ Word16
_)) = do
        forall e a. Exception e => IO a -> (e -> IO a) -> IO a
CE.catch (Channel -> Assembly -> IO ()
writeAssembly' Channel
chan (MethodPayload -> Assembly
SimpleMethod MethodPayload
Channel_close_ok))
            (\ (IOError
_ :: CE.IOException) ->
              -- do nothing if connection is already closed

              forall (m :: * -> *) a. Monad m => a -> m a
return ()
            )
        Channel -> CloseType -> Text -> IO ()
closeChannel' Channel
chan CloseType
Abnormal Text
errorMsg
        IO ThreadId
myThreadId forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= forall a b c. (a -> b -> c) -> b -> a -> c
flip forall e. Exception e => ThreadId -> e -> IO ()
CE.throwTo (CloseType -> [Char] -> AMQPException
ChannelClosedException CloseType
Abnormal forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> [Char]
T.unpack forall a b. (a -> b) -> a -> b
$ Text
errorMsg)
    handleAsync (SimpleMethod (Channel_flow Bool
active)) = do
        if Bool
active
            then Lock -> IO ()
openLock forall a b. (a -> b) -> a -> b
$ Channel -> Lock
chanActive Channel
chan
            else Lock -> IO ()
closeLock forall a b. (a -> b) -> a -> b
$ Channel -> Lock
chanActive Channel
chan
        -- in theory we should respond with flow_ok but rabbitMQ 1.7 ignores that, so it doesn't matter

        forall (m :: * -> *) a. Monad m => a -> m a
return ()
    --Basic.return

    handleAsync (ContentMethod basicReturn :: MethodPayload
basicReturn@Basic_return{} ContentHeaderProperties
props ByteString
body) = do
        let msg :: Message
msg      = ContentHeaderProperties -> ByteString -> Message
msgFromContentHeaderProperties ContentHeaderProperties
props ByteString
body
            pubError :: PublishError
pubError = MethodPayload -> PublishError
basicReturnToPublishError MethodPayload
basicReturn
        forall a b. MVar a -> (a -> IO b) -> IO b
withMVar (Channel -> MVar [(Message, PublishError) -> IO ()]
returnListeners Channel
chan) forall a b. (a -> b) -> a -> b
$ \[(Message, PublishError) -> IO ()]
listeners ->
            forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [(Message, PublishError) -> IO ()]
listeners forall a b. (a -> b) -> a -> b
$ \(Message, PublishError) -> IO ()
l -> forall e a. Exception e => IO a -> (e -> IO a) -> IO a
CE.catch ((Message, PublishError) -> IO ()
l (Message
msg, PublishError
pubError)) forall a b. (a -> b) -> a -> b
$ \(SomeException
ex :: CE.SomeException) ->
                Handle -> [Char] -> IO ()
hPutStrLn Handle
stderr forall a b. (a -> b) -> a -> b
$ [Char]
"return listener on channel ["forall a. [a] -> [a] -> [a]
++forall a. Show a => a -> [Char]
show (Channel -> Word16
channelID Channel
chan)forall a. [a] -> [a] -> [a]
++[Char]
"] handling error ["forall a. [a] -> [a] -> [a]
++forall a. Show a => a -> [Char]
show PublishError
pubErrorforall a. [a] -> [a] -> [a]
++[Char]
"] threw exception: "forall a. [a] -> [a] -> [a]
++forall a. Show a => a -> [Char]
show SomeException
ex
    handleAsync (SimpleMethod (Basic_ack LongLongInt
deliveryTag Bool
multiple)) = LongLongInt -> Bool -> AckType -> IO ()
handleConfirm LongLongInt
deliveryTag Bool
multiple AckType
BasicAck
    handleAsync (SimpleMethod (Basic_nack LongLongInt
deliveryTag Bool
multiple Bool
_)) = LongLongInt -> Bool -> AckType -> IO ()
handleConfirm LongLongInt
deliveryTag Bool
multiple AckType
BasicNack
    handleAsync (SimpleMethod (Basic_cancel ShortString
consumerTag Bool
_)) = ShortString -> IO ()
handleCancel ShortString
consumerTag
    handleAsync Assembly
m = forall a. HasCallStack => [Char] -> a
error ([Char]
"Unknown method: " forall a. [a] -> [a] -> [a]
++ forall a. Show a => a -> [Char]
show Assembly
m)

    handleConfirm :: LongLongInt -> Bool -> AckType -> IO ()
handleConfirm LongLongInt
deliveryTag Bool
multiple AckType
k = do
        forall a b. MVar a -> (a -> IO b) -> IO b
withMVar (Channel -> MVar [(LongLongInt, Bool, AckType) -> IO ()]
confirmListeners Channel
chan) forall a b. (a -> b) -> a -> b
$ \[(LongLongInt, Bool, AckType) -> IO ()]
listeners ->
            forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [(LongLongInt, Bool, AckType) -> IO ()]
listeners forall a b. (a -> b) -> a -> b
$ \(LongLongInt, Bool, AckType) -> IO ()
l -> forall e a. Exception e => IO a -> (e -> IO a) -> IO a
CE.catch ((LongLongInt, Bool, AckType) -> IO ()
l (LongLongInt
deliveryTag, Bool
multiple, AckType
k)) forall a b. (a -> b) -> a -> b
$ \(SomeException
ex :: CE.SomeException) ->
                Handle -> [Char] -> IO ()
hPutStrLn Handle
stderr forall a b. (a -> b) -> a -> b
$ [Char]
"confirm listener on channel ["forall a. [a] -> [a] -> [a]
++forall a. Show a => a -> [Char]
show (Channel -> Word16
channelID Channel
chan)forall a. [a] -> [a] -> [a]
++[Char]
"] handling method "forall a. [a] -> [a] -> [a]
++forall a. Show a => a -> [Char]
show AckType
kforall a. [a] -> [a] -> [a]
++[Char]
" threw exception: "forall a. [a] -> [a] -> [a]
++forall a. Show a => a -> [Char]
show SomeException
ex

        let seqNum :: Int
seqNum = forall a b. (Integral a, Num b) => a -> b
fromIntegral LongLongInt
deliveryTag
        let targetSet :: TVar IntSet
targetSet = case AckType
k of
              AckType
BasicAck  -> Channel -> TVar IntSet
ackedSet Channel
chan
              AckType
BasicNack -> Channel -> TVar IntSet
nackedSet Channel
chan
        forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
            IntSet
unconfSet <- forall a. TVar a -> STM a
readTVar (Channel -> TVar IntSet
unconfirmedSet Channel
chan)
            let (IntSet -> IntSet
merge, IntSet
pending) = if Bool
multiple
                                       then (IntSet -> IntSet -> IntSet
IntSet.union IntSet
confs, IntSet
pending')
                                       else (Int -> IntSet -> IntSet
IntSet.insert Int
seqNum, Int -> IntSet -> IntSet
IntSet.delete Int
seqNum IntSet
unconfSet)
                                    where
                                      confs :: IntSet
confs = forall a b. (a, b) -> a
fst (IntSet, IntSet)
parts
                                      pending' :: IntSet
pending' = forall a b. (a, b) -> b
snd (IntSet, IntSet)
parts
                                      parts :: (IntSet, IntSet)
parts = (Int -> Bool) -> IntSet -> (IntSet, IntSet)
IntSet.partition (\Int
n -> Int
n forall a. Ord a => a -> a -> Bool
<= Int
seqNum) IntSet
unconfSet
            forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar IntSet
targetSet (\IntSet
ts -> IntSet -> IntSet
merge IntSet
ts)
            forall a. TVar a -> a -> STM ()
writeTVar (Channel -> TVar IntSet
unconfirmedSet Channel
chan) IntSet
pending

    handleCancel :: ShortString -> IO ()
handleCancel (ShortString Text
consumerTag) =
        forall a b. MVar a -> (a -> IO b) -> IO b
withMVar (Channel
-> MVar (Map Text ((Message, Envelope) -> IO (), Text -> IO ()))
consumers Channel
chan) (\Map Text ((Message, Envelope) -> IO (), Text -> IO ())
s -> do
            case forall k a. Ord k => k -> Map k a -> Maybe a
M.lookup Text
consumerTag Map Text ((Message, Envelope) -> IO (), Text -> IO ())
s of
                Just ((Message, Envelope) -> IO ()
_, Text -> IO ()
cancelCB) ->
                    forall e a. Exception e => IO a -> (e -> IO a) -> IO a
CE.catch (Text -> IO ()
cancelCB Text
consumerTag) forall a b. (a -> b) -> a -> b
$ \(SomeException
ex :: CE.SomeException) ->
                        Handle -> [Char] -> IO ()
hPutStrLn Handle
stderr forall a b. (a -> b) -> a -> b
$ [Char]
"consumer cancellation listener "forall a. [a] -> [a] -> [a]
++forall a. Show a => a -> [Char]
show Text
consumerTagforall a. [a] -> [a] -> [a]
++[Char]
" on channel ["forall a. [a] -> [a] -> [a]
++forall a. Show a => a -> [Char]
show (Channel -> Word16
channelID Channel
chan)forall a. [a] -> [a] -> [a]
++[Char]
"] threw exception: "forall a. [a] -> [a] -> [a]
++ forall a. Show a => a -> [Char]
show SomeException
ex
                Maybe ((Message, Envelope) -> IO (), Text -> IO ())
Nothing ->
                    -- got a cancellation notification, but have no registered subscriber; so drop it

                    forall (m :: * -> *) a. Monad m => a -> m a
return ()
            )

    basicReturnToPublishError :: MethodPayload -> PublishError
basicReturnToPublishError (Basic_return Word16
code (ShortString Text
errText) (ShortString Text
exchange) (ShortString Text
routingKey)) =
        let replyError :: ReturnReplyCode
replyError = case Word16
code of
                Word16
312 -> Text -> ReturnReplyCode
Unroutable Text
errText
                Word16
313 -> Text -> ReturnReplyCode
NoConsumers Text
errText
                Word16
404 -> Text -> ReturnReplyCode
NotFound Text
errText
                Word16
num -> forall a. HasCallStack => [Char] -> a
error forall a b. (a -> b) -> a -> b
$ [Char]
"unexpected return error code: " forall a. [a] -> [a] -> [a]
++forall a. Show a => a -> [Char]
show Word16
num
            pubError :: PublishError
pubError = ReturnReplyCode -> Maybe Text -> Text -> PublishError
PublishError ReturnReplyCode
replyError (forall a. a -> Maybe a
Just Text
exchange) Text
routingKey
        in PublishError
pubError
    basicReturnToPublishError MethodPayload
x = forall a. HasCallStack => [Char] -> a
error forall a b. (a -> b) -> a -> b
$ [Char]
"basicReturnToPublishError fail: "forall a. [a] -> [a] -> [a]
++forall a. Show a => a -> [Char]
show MethodPayload
x

-- | Registers a callback function that is called whenever a message is returned from the broker ('basic.return').

addReturnListener :: Channel -> ((Message, PublishError) -> IO ()) -> IO ()
addReturnListener :: Channel -> ((Message, PublishError) -> IO ()) -> IO ()
addReturnListener Channel
chan (Message, PublishError) -> IO ()
listener = do
    forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ (Channel -> MVar [(Message, PublishError) -> IO ()]
returnListeners Channel
chan) forall a b. (a -> b) -> a -> b
$ \[(Message, PublishError) -> IO ()]
listeners -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ (Message, PublishError) -> IO ()
listenerforall a. a -> [a] -> [a]
:[(Message, PublishError) -> IO ()]
listeners

-- | Registers a callback function that is called whenever a channel is closed by an exception.

--

-- This method will always be called when a channel is closed, whether through normal means

--  ('closeChannel', 'closeConnection') or by some AMQP exception.

--

-- You can use 'isNormalChannelClose' to figure out if the exception was normal or due to an

--  AMQP exception.

addChannelExceptionHandler :: Channel -> (CE.SomeException -> IO ()) -> IO ()
addChannelExceptionHandler :: Channel -> (SomeException -> IO ()) -> IO ()
addChannelExceptionHandler Channel
chan SomeException -> IO ()
handler = do
    forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ (Channel -> MVar [SomeException -> IO ()]
chanExceptionHandlers Channel
chan) forall a b. (a -> b) -> a -> b
$ \[SomeException -> IO ()]
handlers -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ SomeException -> IO ()
handlerforall a. a -> [a] -> [a]
:[SomeException -> IO ()]
handlers

-- | This can be used with the exception passed to 'addChannelExceptionHandler'.

--

-- Returns True if the argument is a 'ConnectionClosedException' or 'ChannelClosedException' that happened

--  normally (i.e. by the user calling 'closeChannel' or 'closeConnection') and not due to some

--  AMQP exception.

isNormalChannelClose :: CE.SomeException -> Bool
isNormalChannelClose :: SomeException -> Bool
isNormalChannelClose SomeException
e = case forall e. Exception e => SomeException -> Maybe e
CE.fromException SomeException
e :: Maybe AMQPException of
    Just (ChannelClosedException CloseType
Normal [Char]
_) -> Bool
True
    Just (ConnectionClosedException CloseType
Normal [Char]
_) -> Bool
True
    Maybe AMQPException
_ -> Bool
False

-- closes the channel internally; but doesn't tell the server

closeChannel' :: Channel -> CloseType -> Text -> IO ()
closeChannel' :: Channel -> CloseType -> Text -> IO ()
closeChannel' Channel
c CloseType
closeType Text
reason = do
    forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ (Channel -> MVar (Maybe (CloseType, [Char]))
chanClosed Channel
c) forall a b. (a -> b) -> a -> b
$ \Maybe (CloseType, [Char])
x -> do
        if forall a. Maybe a -> Bool
isNothing Maybe (CloseType, [Char])
x
            then do
                forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ (Connection -> MVar (IntMap (Channel, ThreadId))
connChannels forall a b. (a -> b) -> a -> b
$ Channel -> Connection
connection Channel
c) forall a b. (a -> b) -> a -> b
$ \IntMap (Channel, ThreadId)
old -> do
                    Bool
ret <- ChannelAllocator -> Int -> IO Bool
freeChannel (Connection -> ChannelAllocator
connChanAllocator forall a b. (a -> b) -> a -> b
$ Channel -> Connection
connection Channel
c) forall a b. (a -> b) -> a -> b
$ forall a b. (Integral a, Num b) => a -> b
fromIntegral forall a b. (a -> b) -> a -> b
$ Channel -> Word16
channelID Channel
c
                    forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
ret forall a b. (a -> b) -> a -> b
$ Handle -> [Char] -> IO ()
hPutStrLn Handle
stderr [Char]
"closeChannel error: channel already freed"
                    forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a. Int -> IntMap a -> IntMap a
IM.delete (forall a b. (Integral a, Num b) => a -> b
fromIntegral forall a b. (a -> b) -> a -> b
$ Channel -> Word16
channelID Channel
c) IntMap (Channel, ThreadId)
old

                forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ Lock -> IO Bool
killLock forall a b. (a -> b) -> a -> b
$ Channel -> Lock
chanActive Channel
c
                forall a. MVar (Seq (MVar a)) -> IO ()
killOutstandingResponses forall a b. (a -> b) -> a -> b
$ Channel -> MVar (Seq (MVar Assembly))
outstandingResponses Channel
c
                forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a. a -> Maybe a
Just (CloseType
closeType, Text -> [Char]
T.unpack Text
reason)
            else forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (CloseType, [Char])
x
  where
    killOutstandingResponses :: MVar (Seq.Seq (MVar a)) -> IO ()
    killOutstandingResponses :: forall a. MVar (Seq (MVar a)) -> IO ()
killOutstandingResponses MVar (Seq (MVar a))
outResps = do
        forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ MVar (Seq (MVar a))
outResps forall a b. (a -> b) -> a -> b
$ \Seq (MVar a)
val -> do
            forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
F.mapM_ (\MVar a
x -> forall a. MVar a -> a -> IO Bool
tryPutMVar MVar a
x forall a b. (a -> b) -> a -> b
$ forall a. HasCallStack => [Char] -> a
error [Char]
"channel closed") Seq (MVar a)
val
            -- Intentionally put 'undefined' into the MVar, so that reading from it will throw.

            -- This MVar will be read from the 'request' method, where we appropriately catch ErrorCall exceptions.

            forall (m :: * -> *) a. Monad m => a -> m a
return forall a. HasCallStack => a
undefined

-- | opens a new channel on the connection

--

-- By default, if a channel is closed by an AMQP exception, this exception will be printed to stderr. You can prevent this behaviour by setting a custom exception handler (using 'addChannelExceptionHandler').

--

-- Example of adding an exception-handler:

--

-- > chan <- openChannel conn

-- > addChannelExceptionHandler chan $ \e -> do

-- >     unless (isNormalChannelClose e) $ do

-- >         putStrLn $ "channel exception: "++show e

openChannel :: Connection -> IO Channel
openChannel :: Connection -> IO Channel
openChannel Connection
c = do
    Chan FramePayload
newInQueue <- forall a. IO (Chan a)
newChan
    MVar (Seq (MVar Assembly))
outRes <- forall a. a -> IO (MVar a)
newMVar forall a. Seq a
Seq.empty
    MVar Int
lastConsTag <- forall a. a -> IO (MVar a)
newMVar Int
0
    Lock
ca <- IO Lock
newLock
    MVar (Maybe (CloseType, [Char]))
closed <- forall a. a -> IO (MVar a)
newMVar forall a. Maybe a
Nothing
    MVar (Map Text ((Message, Envelope) -> IO (), Text -> IO ()))
conss <- forall a. a -> IO (MVar a)
newMVar forall k a. Map k a
M.empty
    MVar [(Message, PublishError) -> IO ()]
retListeners <- forall a. a -> IO (MVar a)
newMVar []
    TVar IntSet
aSet <- forall a. a -> IO (TVar a)
newTVarIO IntSet
IntSet.empty
    TVar IntSet
nSet <- forall a. a -> IO (TVar a)
newTVarIO IntSet
IntSet.empty
    MVar Int
nxtSeq <- forall a. a -> IO (MVar a)
newMVar Int
0
    TVar IntSet
unconfSet <- forall a. a -> IO (TVar a)
newTVarIO IntSet
IntSet.empty
    MVar [(LongLongInt, Bool, AckType) -> IO ()]
cnfListeners <- forall a. a -> IO (MVar a)
newMVar []
    MVar [SomeException -> IO ()]
handlers <- forall a. a -> IO (MVar a)
newMVar []

    -- add new channel to connection's channel map

    Channel
newChannel <- forall a b. MVar a -> (a -> IO (a, b)) -> IO b
modifyMVar (Connection -> MVar (IntMap (Channel, ThreadId))
connChannels Connection
c) forall a b. (a -> b) -> a -> b
$ \IntMap (Channel, ThreadId)
mp -> do
        Int
newChannelID <- ChannelAllocator -> IO Int
allocateChannel (Connection -> ChannelAllocator
connChanAllocator Connection
c)
        let newChannel :: Channel
newChannel = Connection
-> Chan FramePayload
-> MVar (Seq (MVar Assembly))
-> Word16
-> MVar Int
-> MVar Int
-> TVar IntSet
-> TVar IntSet
-> TVar IntSet
-> Lock
-> MVar (Maybe (CloseType, [Char]))
-> MVar (Map Text ((Message, Envelope) -> IO (), Text -> IO ()))
-> MVar [(Message, PublishError) -> IO ()]
-> MVar [(LongLongInt, Bool, AckType) -> IO ()]
-> MVar [SomeException -> IO ()]
-> Channel
Channel Connection
c Chan FramePayload
newInQueue MVar (Seq (MVar Assembly))
outRes (forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
newChannelID) MVar Int
lastConsTag MVar Int
nxtSeq TVar IntSet
unconfSet TVar IntSet
aSet TVar IntSet
nSet Lock
ca MVar (Maybe (CloseType, [Char]))
closed MVar (Map Text ((Message, Envelope) -> IO (), Text -> IO ()))
conss MVar [(Message, PublishError) -> IO ()]
retListeners MVar [(LongLongInt, Bool, AckType) -> IO ()]
cnfListeners MVar [SomeException -> IO ()]
handlers
        ThreadId
thrID <- forall a. IO a -> (Either SomeException a -> IO ()) -> IO ThreadId
forkFinally' (Channel -> IO ()
channelReceiver Channel
newChannel) forall a b. (a -> b) -> a -> b
$ \Either SomeException ()
res -> do
            Channel -> CloseType -> Text -> IO ()
closeChannel' Channel
newChannel CloseType
Normal Text
"closed"
            case Either SomeException ()
res of
                Right ()
_ -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
                Left SomeException
ex -> do
                   let unwrappedExc :: SomeException
unwrappedExc = SomeException -> SomeException
unwrapChanThreadKilledException SomeException
ex
                   [SomeException -> IO ()]
handlers' <- forall a. MVar a -> IO a
readMVar MVar [SomeException -> IO ()]
handlers

                   case (forall (t :: * -> *) a. Foldable t => t a -> Bool
null [SomeException -> IO ()]
handlers', SomeException -> Maybe [Char]
fromAbnormalChannelClose SomeException
unwrappedExc) of
                       (Bool
True, Just [Char]
reason) -> Handle -> [Char] -> IO ()
hPutStrLn Handle
stderr forall a b. (a -> b) -> a -> b
$ [Char]
"unhandled AMQP channel exception (chanId="forall a. [a] -> [a] -> [a]
++forall a. Show a => a -> [Char]
show Int
newChannelIDforall a. [a] -> [a] -> [a]
++[Char]
"): "forall a. [a] -> [a] -> [a]
++[Char]
reason
                       (Bool, Maybe [Char])
_ -> forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (forall a b. (a -> b) -> a -> b
$ SomeException
unwrappedExc) [SomeException -> IO ()]
handlers'
        forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (forall a. Int -> IntMap a -> Bool
IM.member Int
newChannelID IntMap (Channel, ThreadId)
mp) forall a b. (a -> b) -> a -> b
$ forall e a. Exception e => e -> IO a
CE.throwIO forall a b. (a -> b) -> a -> b
$ [Char] -> IOError
userError [Char]
"openChannel fail: channel already open"
        forall (m :: * -> *) a. Monad m => a -> m a
return (forall a. Int -> a -> IntMap a -> IntMap a
IM.insert Int
newChannelID (Channel
newChannel, ThreadId
thrID) IntMap (Channel, ThreadId)
mp, Channel
newChannel)

    SimpleMethod (Channel_open_ok LongString
_) <- Channel -> Assembly -> IO Assembly
request Channel
newChannel forall a b. (a -> b) -> a -> b
$ MethodPayload -> Assembly
SimpleMethod forall a b. (a -> b) -> a -> b
$ ShortString -> MethodPayload
Channel_open (Text -> ShortString
ShortString Text
"")
    forall (m :: * -> *) a. Monad m => a -> m a
return Channel
newChannel

  where
    fromAbnormalChannelClose :: CE.SomeException -> Maybe String
    fromAbnormalChannelClose :: SomeException -> Maybe [Char]
fromAbnormalChannelClose SomeException
exc =
        case forall e. Exception e => SomeException -> Maybe e
CE.fromException SomeException
exc :: Maybe AMQPException of
            Just (ConnectionClosedException CloseType
_ [Char]
_) -> forall a. Maybe a
Nothing
            Just (ChannelClosedException CloseType
Normal [Char]
_) -> forall a. Maybe a
Nothing
            Just (ChannelClosedException CloseType
Abnormal [Char]
reason) -> forall a. a -> Maybe a
Just [Char]
reason
            Just (AllChannelsAllocatedException Int
_) -> forall a. a -> Maybe a
Just [Char]
"all channels allocated"
            Maybe AMQPException
Nothing -> forall a. a -> Maybe a
Just forall a b. (a -> b) -> a -> b
$ forall a. Show a => a -> [Char]
show SomeException
exc

-- | closes a channel. It is typically not necessary to manually call this as closing a connection will implicitly close all channels.

closeChannel :: Channel -> IO ()
closeChannel :: Channel -> IO ()
closeChannel Channel
c = do
    SimpleMethod MethodPayload
Channel_close_ok <- Channel -> Assembly -> IO Assembly
request Channel
c forall a b. (a -> b) -> a -> b
$ MethodPayload -> Assembly
SimpleMethod forall a b. (a -> b) -> a -> b
$ Word16 -> ShortString -> Word16 -> Word16 -> MethodPayload
Channel_close Word16
0 (Text -> ShortString
ShortString Text
"") Word16
0 Word16
0
    forall a b. MVar a -> (a -> IO b) -> IO b
withMVar (Connection -> MVar (IntMap (Channel, ThreadId))
connChannels forall a b. (a -> b) -> a -> b
$ Channel -> Connection
connection Channel
c) forall a b. (a -> b) -> a -> b
$ \IntMap (Channel, ThreadId)
chans -> do
        case forall a. Int -> IntMap a -> Maybe a
IM.lookup (forall a b. (Integral a, Num b) => a -> b
fromIntegral forall a b. (a -> b) -> a -> b
$ Channel -> Word16
channelID Channel
c) IntMap (Channel, ThreadId)
chans of
            Just (Channel
_, ThreadId
thrID) -> forall e. Exception e => ThreadId -> e -> IO ()
throwTo ThreadId
thrID forall a b. (a -> b) -> a -> b
$ CloseType -> [Char] -> AMQPException
ChannelClosedException CloseType
Normal [Char]
"closeChannel was called"
            Maybe (Channel, ThreadId)
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return ()

-- | writes multiple frames to the channel atomically

writeFrames :: Channel -> [FramePayload] -> IO ()
writeFrames :: Channel -> [FramePayload] -> IO ()
writeFrames Channel
chan [FramePayload]
payloads = do
    let conn :: Connection
conn = Channel -> Connection
connection Channel
chan
    forall a b. MVar a -> (a -> IO b) -> IO b
withMVar (Connection -> MVar (IntMap (Channel, ThreadId))
connChannels Connection
conn) forall a b. (a -> b) -> a -> b
$ \IntMap (Channel, ThreadId)
chans ->
        if forall a. Int -> IntMap a -> Bool
IM.member (forall a b. (Integral a, Num b) => a -> b
fromIntegral forall a b. (a -> b) -> a -> b
$ Channel -> Word16
channelID Channel
chan) IntMap (Channel, ThreadId)
chans
            then forall e a. Exception e => IO a -> (e -> IO a) -> IO a
CE.catch
                -- ensure at most one thread is writing to the socket at any time

                (do
                    forall a b. MVar a -> (a -> IO b) -> IO b
withMVar (Connection -> MVar ()
connWriteLock Connection
conn) forall a b. (a -> b) -> a -> b
$ \()
_ ->
                        forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (\FramePayload
payload -> Connection -> Frame -> IO ()
writeFrame (Connection -> Connection
connHandle Connection
conn) (Word16 -> FramePayload -> Frame
Frame (Channel -> Word16
channelID Channel
chan) FramePayload
payload)) [FramePayload]
payloads
                    Connection -> IO ()
updateLastSent Connection
conn)
                (\(IOError
_ :: CE.IOException) -> do
                    forall e a. Exception e => e -> IO a
CE.throwIO forall a b. (a -> b) -> a -> b
$ [Char] -> IOError
userError [Char]
"connection not open"
                )
            else do
                forall e a. Exception e => e -> IO a
CE.throwIO forall a b. (a -> b) -> a -> b
$ [Char] -> IOError
userError [Char]
"channel not open"

writeAssembly' :: Channel -> Assembly -> IO ()
writeAssembly' :: Channel -> Assembly -> IO ()
writeAssembly' Channel
chan (ContentMethod MethodPayload
m ContentHeaderProperties
properties ByteString
msg) = do
    -- wait iff the AMQP server instructed us to withhold sending content data (flow control)

    Lock -> IO ()
waitLock forall a b. (a -> b) -> a -> b
$ Channel -> Lock
chanActive Channel
chan
    let !toWrite :: [FramePayload]
toWrite = [
            MethodPayload -> FramePayload
MethodPayload MethodPayload
m,
            Word16
-> Word16 -> LongLongInt -> ContentHeaderProperties -> FramePayload
ContentHeaderPayload
                (ContentHeaderProperties -> Word16
getClassIDOf ContentHeaderProperties
properties) --classID

                Word16
0 --weight is deprecated in AMQP 0-9

                (forall a b. (Integral a, Num b) => a -> b
fromIntegral forall a b. (a -> b) -> a -> b
$ ByteString -> ByteOffset
BL.length ByteString
msg) --bodySize

                ContentHeaderProperties
properties] forall a. [a] -> [a] -> [a]
++
            (if ByteString -> ByteOffset
BL.length ByteString
msg forall a. Ord a => a -> a -> Bool
> ByteOffset
0
             then do
                -- split into frames of maxFrameSize

                -- (need to substract 8 bytes to account for frame header and end-marker)

                forall a b. (a -> b) -> [a] -> [b]
map ByteString -> FramePayload
ContentBodyPayload
                    (ByteString -> ByteOffset -> [ByteString]
splitLen ByteString
msg forall a b. (a -> b) -> a -> b
$ forall a b. (Integral a, Num b) => a -> b
fromIntegral (Connection -> Int
connMaxFrameSize forall a b. (a -> b) -> a -> b
$ Channel -> Connection
connection Channel
chan) forall a. Num a => a -> a -> a
- ByteOffset
8)
             else []
            )
    Channel -> [FramePayload] -> IO ()
writeFrames Channel
chan [FramePayload]
toWrite
  where
    splitLen :: ByteString -> ByteOffset -> [ByteString]
splitLen ByteString
str ByteOffset
len | ByteString -> ByteOffset
BL.length ByteString
str forall a. Ord a => a -> a -> Bool
> ByteOffset
len = ByteOffset -> ByteString -> ByteString
BL.take ByteOffset
len ByteString
str forall a. a -> [a] -> [a]
: ByteString -> ByteOffset -> [ByteString]
splitLen (ByteOffset -> ByteString -> ByteString
BL.drop ByteOffset
len ByteString
str) ByteOffset
len
    splitLen ByteString
str ByteOffset
_ = [ByteString
str]
writeAssembly' Channel
chan (SimpleMethod MethodPayload
m) = Channel -> [FramePayload] -> IO ()
writeFrames Channel
chan [MethodPayload -> FramePayload
MethodPayload MethodPayload
m]

-- most exported functions in this module will use either 'writeAssembly' or 'request' to talk to the server

-- so we perform the exception handling here


-- | writes an assembly to the channel

writeAssembly :: Channel -> Assembly -> IO ()
writeAssembly :: Channel -> Assembly -> IO ()
writeAssembly Channel
chan Assembly
m =
    forall a. IO a -> [Handler a] -> IO a
CE.catches
        (Channel -> Assembly -> IO ()
writeAssembly' Channel
chan Assembly
m)
        [forall a e. Exception e => (e -> IO a) -> Handler a
CE.Handler (\ (AMQPException
_ :: AMQPException) -> forall a. Channel -> IO a
throwMostRelevantAMQPException Channel
chan),
         forall a e. Exception e => (e -> IO a) -> Handler a
CE.Handler (\ (ErrorCall
_ :: CE.ErrorCall) -> forall a. Channel -> IO a
throwMostRelevantAMQPException Channel
chan),
         forall a e. Exception e => (e -> IO a) -> Handler a
CE.Handler (\ (IOError
_ :: CE.IOException) -> forall a. Channel -> IO a
throwMostRelevantAMQPException Channel
chan)]

-- | sends an assembly and receives the response

request :: Channel -> Assembly -> IO Assembly
request :: Channel -> Assembly -> IO Assembly
request Channel
chan Assembly
m = do
    MVar Assembly
res <- forall a. IO (MVar a)
newEmptyMVar
    forall a. IO a -> [Handler a] -> IO a
CE.catches (do
            forall a b. MVar a -> (a -> IO b) -> IO b
withMVar (Channel -> MVar (Maybe (CloseType, [Char]))
chanClosed Channel
chan) forall a b. (a -> b) -> a -> b
$ \Maybe (CloseType, [Char])
cc -> do
                if forall a. Maybe a -> Bool
isNothing Maybe (CloseType, [Char])
cc
                    then do
                        forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ (Channel -> MVar (Seq (MVar Assembly))
outstandingResponses Channel
chan) forall a b. (a -> b) -> a -> b
$ \Seq (MVar Assembly)
val -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$! Seq (MVar Assembly)
val forall a. Seq a -> a -> Seq a
Seq.|> MVar Assembly
res
                        Channel -> Assembly -> IO ()
writeAssembly' Channel
chan Assembly
m
                    else forall e a. Exception e => e -> IO a
CE.throwIO forall a b. (a -> b) -> a -> b
$ [Char] -> IOError
userError [Char]
"closed"

            -- res might contain an exception, so evaluate it here

            !Assembly
r <- forall a. MVar a -> IO a
takeMVar MVar Assembly
res
            forall (m :: * -> *) a. Monad m => a -> m a
return Assembly
r
            )
        [forall a e. Exception e => (e -> IO a) -> Handler a
CE.Handler (\ (AMQPException
_ :: AMQPException) -> forall a. Channel -> IO a
throwMostRelevantAMQPException Channel
chan),
         forall a e. Exception e => (e -> IO a) -> Handler a
CE.Handler (\ (ErrorCall
_ :: CE.ErrorCall) -> forall a. Channel -> IO a
throwMostRelevantAMQPException Channel
chan),
         forall a e. Exception e => (e -> IO a) -> Handler a
CE.Handler (\ (IOError
_ :: CE.IOException) -> forall a. Channel -> IO a
throwMostRelevantAMQPException Channel
chan)]

-- this throws an AMQPException based on the status of the connection and the channel

-- if both connection and channel are closed, it will throw a ConnectionClosedException

throwMostRelevantAMQPException :: Channel -> IO a
throwMostRelevantAMQPException :: forall a. Channel -> IO a
throwMostRelevantAMQPException Channel
chan = do
    Maybe (CloseType, [Char])
cc <- forall a. MVar a -> IO a
readMVar forall a b. (a -> b) -> a -> b
$ Connection -> MVar (Maybe (CloseType, [Char]))
connClosed forall a b. (a -> b) -> a -> b
$ Channel -> Connection
connection Channel
chan
    case Maybe (CloseType, [Char])
cc of
        Just (CloseType
closeType, [Char]
r) -> forall e a. Exception e => e -> IO a
CE.throwIO forall a b. (a -> b) -> a -> b
$ CloseType -> [Char] -> AMQPException
ConnectionClosedException CloseType
closeType [Char]
r
        Maybe (CloseType, [Char])
Nothing -> do
            Maybe (CloseType, [Char])
chc <- forall a. MVar a -> IO a
readMVar forall a b. (a -> b) -> a -> b
$ Channel -> MVar (Maybe (CloseType, [Char]))
chanClosed Channel
chan
            case Maybe (CloseType, [Char])
chc of
                Just (CloseType
ct, [Char]
r) -> forall e a. Exception e => e -> IO a
CE.throwIO forall a b. (a -> b) -> a -> b
$ CloseType -> [Char] -> AMQPException
ChannelClosedException CloseType
ct [Char]
r
                Maybe (CloseType, [Char])
Nothing -> forall e a. Exception e => e -> IO a
CE.throwIO forall a b. (a -> b) -> a -> b
$ CloseType -> [Char] -> AMQPException
ConnectionClosedException CloseType
Abnormal [Char]
"unknown reason"

waitForAllConfirms :: Channel -> STM (IntSet.IntSet, IntSet.IntSet)
waitForAllConfirms :: Channel -> STM (IntSet, IntSet)
waitForAllConfirms Channel
chan = do
    IntSet
pending <- forall a. TVar a -> STM a
readTVar forall a b. (a -> b) -> a -> b
$ Channel -> TVar IntSet
unconfirmedSet Channel
chan
    Bool -> STM ()
check (IntSet -> Bool
IntSet.null IntSet
pending)
    (,) forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. TVar a -> a -> STM a
swapTVar (Channel -> TVar IntSet
ackedSet Channel
chan) IntSet
IntSet.empty
        forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> forall a. TVar a -> a -> STM a
swapTVar (Channel -> TVar IntSet
nackedSet Channel
chan) IntSet
IntSet.empty