{-# LANGUAGE BangPatterns, CPP, OverloadedStrings, ScopedTypeVariables, LambdaCase #-}
module Network.AMQP.Internal where

import Paths_amqp(version)
import Data.Version(showVersion)

import Control.Applicative
import Control.Concurrent
import Control.Concurrent.STM
import Control.Monad
import Data.Binary
import Data.Binary.Get
import Data.Binary.Put as BPut
import Data.Int (Int64)
import Data.Maybe
import Data.Text (Text)
import Network.Socket (PortNumber, withSocketsDo)
import System.IO (hPutStrLn, stderr)

import qualified Control.Exception as CE
import qualified Data.ByteString as BS
import qualified Data.ByteString.Char8 as BC
import qualified Data.ByteString.Lazy as BL
import qualified Data.Map as M
import qualified Data.Foldable as F
import qualified Data.IntMap as IM
import qualified Data.IntSet as IntSet
import qualified Data.Sequence as Seq
import qualified Data.Text as T
import qualified Data.Text.Encoding as E
import qualified Network.Connection as Conn

import Network.AMQP.Protocol
import Network.AMQP.Types
import Network.AMQP.Helpers
import Network.AMQP.Generated
import Network.AMQP.ChannelAllocator

data AckType = BasicAck | BasicNack deriving Int -> AckType -> ShowS
[AckType] -> ShowS
AckType -> String
(Int -> AckType -> ShowS)
-> (AckType -> String) -> ([AckType] -> ShowS) -> Show AckType
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [AckType] -> ShowS
$cshowList :: [AckType] -> ShowS
show :: AckType -> String
$cshow :: AckType -> String
showsPrec :: Int -> AckType -> ShowS
$cshowsPrec :: Int -> AckType -> ShowS
Show

data DeliveryMode = Persistent -- ^ the message will survive server restarts (if the queue is durable)

                  | NonPersistent -- ^ the message may be lost after server restarts

    deriving (DeliveryMode -> DeliveryMode -> Bool
(DeliveryMode -> DeliveryMode -> Bool)
-> (DeliveryMode -> DeliveryMode -> Bool) -> Eq DeliveryMode
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: DeliveryMode -> DeliveryMode -> Bool
$c/= :: DeliveryMode -> DeliveryMode -> Bool
== :: DeliveryMode -> DeliveryMode -> Bool
$c== :: DeliveryMode -> DeliveryMode -> Bool
Eq, Eq DeliveryMode
Eq DeliveryMode
-> (DeliveryMode -> DeliveryMode -> Ordering)
-> (DeliveryMode -> DeliveryMode -> Bool)
-> (DeliveryMode -> DeliveryMode -> Bool)
-> (DeliveryMode -> DeliveryMode -> Bool)
-> (DeliveryMode -> DeliveryMode -> Bool)
-> (DeliveryMode -> DeliveryMode -> DeliveryMode)
-> (DeliveryMode -> DeliveryMode -> DeliveryMode)
-> Ord DeliveryMode
DeliveryMode -> DeliveryMode -> Bool
DeliveryMode -> DeliveryMode -> Ordering
DeliveryMode -> DeliveryMode -> DeliveryMode
forall a.
Eq a
-> (a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
min :: DeliveryMode -> DeliveryMode -> DeliveryMode
$cmin :: DeliveryMode -> DeliveryMode -> DeliveryMode
max :: DeliveryMode -> DeliveryMode -> DeliveryMode
$cmax :: DeliveryMode -> DeliveryMode -> DeliveryMode
>= :: DeliveryMode -> DeliveryMode -> Bool
$c>= :: DeliveryMode -> DeliveryMode -> Bool
> :: DeliveryMode -> DeliveryMode -> Bool
$c> :: DeliveryMode -> DeliveryMode -> Bool
<= :: DeliveryMode -> DeliveryMode -> Bool
$c<= :: DeliveryMode -> DeliveryMode -> Bool
< :: DeliveryMode -> DeliveryMode -> Bool
$c< :: DeliveryMode -> DeliveryMode -> Bool
compare :: DeliveryMode -> DeliveryMode -> Ordering
$ccompare :: DeliveryMode -> DeliveryMode -> Ordering
$cp1Ord :: Eq DeliveryMode
Ord, ReadPrec [DeliveryMode]
ReadPrec DeliveryMode
Int -> ReadS DeliveryMode
ReadS [DeliveryMode]
(Int -> ReadS DeliveryMode)
-> ReadS [DeliveryMode]
-> ReadPrec DeliveryMode
-> ReadPrec [DeliveryMode]
-> Read DeliveryMode
forall a.
(Int -> ReadS a)
-> ReadS [a] -> ReadPrec a -> ReadPrec [a] -> Read a
readListPrec :: ReadPrec [DeliveryMode]
$creadListPrec :: ReadPrec [DeliveryMode]
readPrec :: ReadPrec DeliveryMode
$creadPrec :: ReadPrec DeliveryMode
readList :: ReadS [DeliveryMode]
$creadList :: ReadS [DeliveryMode]
readsPrec :: Int -> ReadS DeliveryMode
$creadsPrec :: Int -> ReadS DeliveryMode
Read, Int -> DeliveryMode -> ShowS
[DeliveryMode] -> ShowS
DeliveryMode -> String
(Int -> DeliveryMode -> ShowS)
-> (DeliveryMode -> String)
-> ([DeliveryMode] -> ShowS)
-> Show DeliveryMode
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [DeliveryMode] -> ShowS
$cshowList :: [DeliveryMode] -> ShowS
show :: DeliveryMode -> String
$cshow :: DeliveryMode -> String
showsPrec :: Int -> DeliveryMode -> ShowS
$cshowsPrec :: Int -> DeliveryMode -> ShowS
Show)

deliveryModeToInt :: DeliveryMode -> Octet
deliveryModeToInt :: DeliveryMode -> Octet
deliveryModeToInt DeliveryMode
NonPersistent = Octet
1
deliveryModeToInt DeliveryMode
Persistent = Octet
2

intToDeliveryMode :: Octet -> DeliveryMode
intToDeliveryMode :: Octet -> DeliveryMode
intToDeliveryMode Octet
1 = DeliveryMode
NonPersistent
intToDeliveryMode Octet
2 = DeliveryMode
Persistent
intToDeliveryMode Octet
n = String -> DeliveryMode
forall a. HasCallStack => String -> a
error (String
"Unknown delivery mode int: " String -> ShowS
forall a. [a] -> [a] -> [a]
++ Octet -> String
forall a. Show a => a -> String
show Octet
n)

-- | An AMQP message

data Message = Message {
    Message -> ByteString
msgBody :: BL.ByteString, -- ^ the content of your message

    Message -> Maybe DeliveryMode
msgDeliveryMode :: Maybe DeliveryMode, -- ^ see 'DeliveryMode'

    Message -> Maybe Timestamp
msgTimestamp :: Maybe Timestamp, -- ^ use in any way you like; this doesn't affect the way the message is handled

    Message -> Maybe Text
msgID :: Maybe Text, -- ^ use in any way you like; this doesn't affect the way the message is handled

    Message -> Maybe Text
msgType :: Maybe Text, -- ^ use in any way you like; this doesn't affect the way the message is handled

    Message -> Maybe Text
msgUserID :: Maybe Text,
    Message -> Maybe Text
msgApplicationID :: Maybe Text,
    Message -> Maybe Text
msgClusterID :: Maybe Text,
    Message -> Maybe Text
msgContentType :: Maybe Text,
    Message -> Maybe Text
msgContentEncoding :: Maybe Text,
    Message -> Maybe Text
msgReplyTo :: Maybe Text,
    Message -> Maybe Octet
msgPriority :: Maybe Octet,
    Message -> Maybe Text
msgCorrelationID :: Maybe Text,
    Message -> Maybe Text
msgExpiration :: Maybe Text,
    Message -> Maybe FieldTable
msgHeaders :: Maybe FieldTable
} deriving (Message -> Message -> Bool
(Message -> Message -> Bool)
-> (Message -> Message -> Bool) -> Eq Message
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: Message -> Message -> Bool
$c/= :: Message -> Message -> Bool
== :: Message -> Message -> Bool
$c== :: Message -> Message -> Bool
Eq, Eq Message
Eq Message
-> (Message -> Message -> Ordering)
-> (Message -> Message -> Bool)
-> (Message -> Message -> Bool)
-> (Message -> Message -> Bool)
-> (Message -> Message -> Bool)
-> (Message -> Message -> Message)
-> (Message -> Message -> Message)
-> Ord Message
Message -> Message -> Bool
Message -> Message -> Ordering
Message -> Message -> Message
forall a.
Eq a
-> (a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
min :: Message -> Message -> Message
$cmin :: Message -> Message -> Message
max :: Message -> Message -> Message
$cmax :: Message -> Message -> Message
>= :: Message -> Message -> Bool
$c>= :: Message -> Message -> Bool
> :: Message -> Message -> Bool
$c> :: Message -> Message -> Bool
<= :: Message -> Message -> Bool
$c<= :: Message -> Message -> Bool
< :: Message -> Message -> Bool
$c< :: Message -> Message -> Bool
compare :: Message -> Message -> Ordering
$ccompare :: Message -> Message -> Ordering
$cp1Ord :: Eq Message
Ord, ReadPrec [Message]
ReadPrec Message
Int -> ReadS Message
ReadS [Message]
(Int -> ReadS Message)
-> ReadS [Message]
-> ReadPrec Message
-> ReadPrec [Message]
-> Read Message
forall a.
(Int -> ReadS a)
-> ReadS [a] -> ReadPrec a -> ReadPrec [a] -> Read a
readListPrec :: ReadPrec [Message]
$creadListPrec :: ReadPrec [Message]
readPrec :: ReadPrec Message
$creadPrec :: ReadPrec Message
readList :: ReadS [Message]
$creadList :: ReadS [Message]
readsPrec :: Int -> ReadS Message
$creadsPrec :: Int -> ReadS Message
Read, Int -> Message -> ShowS
[Message] -> ShowS
Message -> String
(Int -> Message -> ShowS)
-> (Message -> String) -> ([Message] -> ShowS) -> Show Message
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [Message] -> ShowS
$cshowList :: [Message] -> ShowS
show :: Message -> String
$cshow :: Message -> String
showsPrec :: Int -> Message -> ShowS
$cshowsPrec :: Int -> Message -> ShowS
Show)

-- | contains meta-information of a delivered message (through 'getMsg' or 'consumeMsgs')

data Envelope = Envelope {
    Envelope -> Timestamp
envDeliveryTag :: LongLongInt,
    Envelope -> Bool
envRedelivered :: Bool,
    Envelope -> Text
envExchangeName :: Text,
    Envelope -> Text
envRoutingKey :: Text,
    Envelope -> Channel
envChannel :: Channel
}

data PublishError = PublishError {
    PublishError -> ReturnReplyCode
errReplyCode :: ReturnReplyCode,
    PublishError -> Maybe Text
errExchange :: Maybe Text,
    PublishError -> Text
errRoutingKey :: Text
} deriving (PublishError -> PublishError -> Bool
(PublishError -> PublishError -> Bool)
-> (PublishError -> PublishError -> Bool) -> Eq PublishError
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: PublishError -> PublishError -> Bool
$c/= :: PublishError -> PublishError -> Bool
== :: PublishError -> PublishError -> Bool
$c== :: PublishError -> PublishError -> Bool
Eq, ReadPrec [PublishError]
ReadPrec PublishError
Int -> ReadS PublishError
ReadS [PublishError]
(Int -> ReadS PublishError)
-> ReadS [PublishError]
-> ReadPrec PublishError
-> ReadPrec [PublishError]
-> Read PublishError
forall a.
(Int -> ReadS a)
-> ReadS [a] -> ReadPrec a -> ReadPrec [a] -> Read a
readListPrec :: ReadPrec [PublishError]
$creadListPrec :: ReadPrec [PublishError]
readPrec :: ReadPrec PublishError
$creadPrec :: ReadPrec PublishError
readList :: ReadS [PublishError]
$creadList :: ReadS [PublishError]
readsPrec :: Int -> ReadS PublishError
$creadsPrec :: Int -> ReadS PublishError
Read, Int -> PublishError -> ShowS
[PublishError] -> ShowS
PublishError -> String
(Int -> PublishError -> ShowS)
-> (PublishError -> String)
-> ([PublishError] -> ShowS)
-> Show PublishError
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [PublishError] -> ShowS
$cshowList :: [PublishError] -> ShowS
show :: PublishError -> String
$cshow :: PublishError -> String
showsPrec :: Int -> PublishError -> ShowS
$cshowsPrec :: Int -> PublishError -> ShowS
Show)

data ReturnReplyCode = Unroutable Text
                     | NoConsumers Text
                     | NotFound Text
    deriving (ReturnReplyCode -> ReturnReplyCode -> Bool
(ReturnReplyCode -> ReturnReplyCode -> Bool)
-> (ReturnReplyCode -> ReturnReplyCode -> Bool)
-> Eq ReturnReplyCode
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: ReturnReplyCode -> ReturnReplyCode -> Bool
$c/= :: ReturnReplyCode -> ReturnReplyCode -> Bool
== :: ReturnReplyCode -> ReturnReplyCode -> Bool
$c== :: ReturnReplyCode -> ReturnReplyCode -> Bool
Eq, ReadPrec [ReturnReplyCode]
ReadPrec ReturnReplyCode
Int -> ReadS ReturnReplyCode
ReadS [ReturnReplyCode]
(Int -> ReadS ReturnReplyCode)
-> ReadS [ReturnReplyCode]
-> ReadPrec ReturnReplyCode
-> ReadPrec [ReturnReplyCode]
-> Read ReturnReplyCode
forall a.
(Int -> ReadS a)
-> ReadS [a] -> ReadPrec a -> ReadPrec [a] -> Read a
readListPrec :: ReadPrec [ReturnReplyCode]
$creadListPrec :: ReadPrec [ReturnReplyCode]
readPrec :: ReadPrec ReturnReplyCode
$creadPrec :: ReadPrec ReturnReplyCode
readList :: ReadS [ReturnReplyCode]
$creadList :: ReadS [ReturnReplyCode]
readsPrec :: Int -> ReadS ReturnReplyCode
$creadsPrec :: Int -> ReadS ReturnReplyCode
Read, Int -> ReturnReplyCode -> ShowS
[ReturnReplyCode] -> ShowS
ReturnReplyCode -> String
(Int -> ReturnReplyCode -> ShowS)
-> (ReturnReplyCode -> String)
-> ([ReturnReplyCode] -> ShowS)
-> Show ReturnReplyCode
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [ReturnReplyCode] -> ShowS
$cshowList :: [ReturnReplyCode] -> ShowS
show :: ReturnReplyCode -> String
$cshow :: ReturnReplyCode -> String
showsPrec :: Int -> ReturnReplyCode -> ShowS
$cshowsPrec :: Int -> ReturnReplyCode -> ShowS
Show)

------------- ASSEMBLY -------------------------

-- an assembly is a higher-level object consisting of several frames (like in amqp 0-10)

data Assembly = SimpleMethod MethodPayload
              | ContentMethod MethodPayload ContentHeaderProperties BL.ByteString --method, properties, content-data

    deriving Int -> Assembly -> ShowS
[Assembly] -> ShowS
Assembly -> String
(Int -> Assembly -> ShowS)
-> (Assembly -> String) -> ([Assembly] -> ShowS) -> Show Assembly
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [Assembly] -> ShowS
$cshowList :: [Assembly] -> ShowS
show :: Assembly -> String
$cshow :: Assembly -> String
showsPrec :: Int -> Assembly -> ShowS
$cshowsPrec :: Int -> Assembly -> ShowS
Show

-- | reads all frames necessary to build an assembly

readAssembly :: Chan FramePayload -> IO Assembly
readAssembly :: Chan FramePayload -> IO Assembly
readAssembly Chan FramePayload
chan = do
    FramePayload
m <- Chan FramePayload -> IO FramePayload
forall a. Chan a -> IO a
readChan Chan FramePayload
chan
    case FramePayload
m of
        MethodPayload MethodPayload
p -> --got a method frame

            if FramePayload -> Bool
hasContent FramePayload
m
                then do
                    --several frames containing the content will follow, so read them

                    (ContentHeaderProperties
props, ByteString
msg) <- Chan FramePayload -> IO (ContentHeaderProperties, ByteString)
collectContent Chan FramePayload
chan
                    Assembly -> IO Assembly
forall (m :: * -> *) a. Monad m => a -> m a
return (Assembly -> IO Assembly) -> Assembly -> IO Assembly
forall a b. (a -> b) -> a -> b
$ MethodPayload -> ContentHeaderProperties -> ByteString -> Assembly
ContentMethod MethodPayload
p ContentHeaderProperties
props ByteString
msg
                else do
                    Assembly -> IO Assembly
forall (m :: * -> *) a. Monad m => a -> m a
return (Assembly -> IO Assembly) -> Assembly -> IO Assembly
forall a b. (a -> b) -> a -> b
$ MethodPayload -> Assembly
SimpleMethod MethodPayload
p
        FramePayload
x -> String -> IO Assembly
forall a. HasCallStack => String -> a
error (String -> IO Assembly) -> String -> IO Assembly
forall a b. (a -> b) -> a -> b
$ String
"didn't expect frame: " String -> ShowS
forall a. [a] -> [a] -> [a]
++ FramePayload -> String
forall a. Show a => a -> String
show FramePayload
x

-- | reads a contentheader and contentbodies and assembles them

collectContent :: Chan FramePayload -> IO (ContentHeaderProperties, BL.ByteString)
collectContent :: Chan FramePayload -> IO (ContentHeaderProperties, ByteString)
collectContent Chan FramePayload
chan = do
    (ContentHeaderPayload ShortInt
_ ShortInt
_ Timestamp
bodySize ContentHeaderProperties
props) <- Chan FramePayload -> IO FramePayload
forall a. Chan a -> IO a
readChan Chan FramePayload
chan

    [ByteString]
content <- Int64 -> IO [ByteString]
collect (Int64 -> IO [ByteString]) -> Int64 -> IO [ByteString]
forall a b. (a -> b) -> a -> b
$ Timestamp -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Timestamp
bodySize
    (ContentHeaderProperties, ByteString)
-> IO (ContentHeaderProperties, ByteString)
forall (m :: * -> *) a. Monad m => a -> m a
return (ContentHeaderProperties
props, [ByteString] -> ByteString
BL.concat [ByteString]
content)
  where
    collect :: Int64 -> IO [ByteString]
collect Int64
x | Int64
x Int64 -> Int64 -> Bool
forall a. Ord a => a -> a -> Bool
<= Int64
0 = [ByteString] -> IO [ByteString]
forall (m :: * -> *) a. Monad m => a -> m a
return []
    collect Int64
x = do
        (ContentBodyPayload ByteString
payload) <- Chan FramePayload -> IO FramePayload
forall a. Chan a -> IO a
readChan Chan FramePayload
chan
        [ByteString]
r <- Int64 -> IO [ByteString]
collect (Int64
x Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
- ByteString -> Int64
BL.length ByteString
payload)
        [ByteString] -> IO [ByteString]
forall (m :: * -> *) a. Monad m => a -> m a
return ([ByteString] -> IO [ByteString])
-> [ByteString] -> IO [ByteString]
forall a b. (a -> b) -> a -> b
$ ByteString
payload ByteString -> [ByteString] -> [ByteString]
forall a. a -> [a] -> [a]
: [ByteString]
r

------------ CONNECTION -------------------


{- general concept:
Each connection has its own thread. Each channel has its own thread.
Connection reads data from socket and forwards it to channel. Channel processes data and forwards it to application.
Outgoing data is written directly onto the socket.

Incoming Data: Socket -> Connection-Thread -> Channel-Thread -> Application
Outgoing Data: Application -> Socket
-}

data Connection = Connection {
    Connection -> Connection
connHandle :: Conn.Connection,
    Connection -> ChannelAllocator
connChanAllocator :: ChannelAllocator,
    Connection -> MVar (IntMap (Channel, ThreadId))
connChannels :: MVar (IM.IntMap (Channel, ThreadId)), -- open channels (channelID => (Channel, ChannelThread))

    Connection -> Int
connMaxFrameSize :: Int, --negotiated maximum frame size

    Connection -> MVar (Maybe (CloseType, String))
connClosed :: MVar (Maybe (CloseType, String)),
    Connection -> MVar ()
connClosedLock :: MVar (), -- used by closeConnection to block until connection-close handshake is complete

    Connection -> MVar ()
connWriteLock :: MVar (), -- to ensure atomic writes to the socket

    Connection -> MVar [IO ()]
connClosedHandlers :: MVar [IO ()],
    Connection -> MVar [(Text -> IO (), IO ())]
connBlockedHandlers :: MVar [(Text -> IO (), IO ())],
    Connection -> MVar Int64
connLastReceived :: MVar Int64, -- the timestamp from a monotonic clock when the last frame was received

    Connection -> MVar Int64
connLastSent :: MVar Int64, -- the timestamp from a monotonic clock when the last frame was written

    Connection -> FieldTable
connServerProperties :: FieldTable, -- the server properties sent in Connection_start

    Connection -> MVar (Maybe ThreadId)
connThread :: MVar (Maybe ThreadId)
}

-- | Represents the parameters to connect to a broker or a cluster of brokers.

-- See 'defaultConnectionOpts'.

data ConnectionOpts = ConnectionOpts {
    ConnectionOpts -> [(String, PortNumber)]
coServers :: ![(String, PortNumber)], -- ^ A list of host-port pairs. Useful in a clustered setup to connect to the first available host.

    ConnectionOpts -> Text
coVHost :: !Text, -- ^ The VHost to connect to.

    ConnectionOpts -> [SASLMechanism]
coAuth :: ![SASLMechanism], -- ^ The 'SASLMechanism's to use for authenticating with the broker.

    ConnectionOpts -> Maybe Word32
coMaxFrameSize :: !(Maybe Word32), -- ^ The maximum frame size to be used. If not specified, no limit is assumed.

    ConnectionOpts -> Maybe ShortInt
coHeartbeatDelay :: !(Maybe Word16), -- ^ The delay in seconds, after which the client expects a heartbeat frame from the broker. If 'Nothing', the value suggested by the broker is used. Use @Just 0@ to disable the heartbeat mechnism.

    ConnectionOpts -> Maybe ShortInt
coMaxChannel :: !(Maybe Word16), -- ^ The maximum number of channels the client will use.

    ConnectionOpts -> Maybe TLSSettings
coTLSSettings :: Maybe TLSSettings, -- ^ Whether or not to connect to servers using TLS. See http://www.rabbitmq.com/ssl.html for details.

    ConnectionOpts -> Maybe Text
coName :: !(Maybe Text) -- ^ optional connection name (will be displayed in the RabbitMQ web interface)

}

-- | Represents the kind of TLS connection to establish.

data TLSSettings =
    TLSTrusted   -- ^ Require trusted certificates (Recommended).

  | TLSUntrusted -- ^ Allow untrusted certificates (Discouraged. Vulnerable to man-in-the-middle attacks)

  | TLSCustom Conn.TLSSettings -- ^ Provide your own custom TLS settings


connectionTLSSettings :: TLSSettings -> Maybe Conn.TLSSettings
connectionTLSSettings :: TLSSettings -> Maybe TLSSettings
connectionTLSSettings TLSSettings
tlsSettings =
    TLSSettings -> Maybe TLSSettings
forall a. a -> Maybe a
Just (TLSSettings -> Maybe TLSSettings)
-> TLSSettings -> Maybe TLSSettings
forall a b. (a -> b) -> a -> b
$ case TLSSettings
tlsSettings of
        TLSSettings
TLSTrusted -> Bool -> Bool -> Bool -> TLSSettings
Conn.TLSSettingsSimple Bool
False Bool
False Bool
False
        TLSSettings
TLSUntrusted -> Bool -> Bool -> Bool -> TLSSettings
Conn.TLSSettingsSimple Bool
True Bool
False Bool
False
        TLSCustom TLSSettings
x -> TLSSettings
x

-- | A 'SASLMechanism' is described by its name ('saslName'), its initial response ('saslInitialResponse'), and an optional function ('saslChallengeFunc') that

-- transforms a security challenge provided by the server into response, which is then sent back to the server for verification.

data SASLMechanism = SASLMechanism {
    SASLMechanism -> Text
saslName :: !Text, -- ^ mechanism name

    SASLMechanism -> ByteString
saslInitialResponse :: !BS.ByteString, -- ^ initial response

    SASLMechanism -> Maybe (ByteString -> IO ByteString)
saslChallengeFunc :: !(Maybe (BS.ByteString -> IO BS.ByteString)) -- ^ challenge processing function

}

-- | reads incoming frames from socket and forwards them to the opened channels

connectionReceiver :: Connection -> IO ()
connectionReceiver :: Connection -> IO ()
connectionReceiver Connection
conn = do
    IO () -> (IOException -> IO ()) -> IO ()
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
CE.catch (do
        Frame ShortInt
chanID FramePayload
payload <- Connection -> IO Frame
readFrame (Connection -> Connection
connHandle Connection
conn)
        Connection -> IO ()
updateLastReceived Connection
conn
        ShortInt -> FramePayload -> IO ()
forall a. (Integral a, Show a) => a -> FramePayload -> IO ()
forwardToChannel ShortInt
chanID FramePayload
payload
        )
        (\(IOException
e :: CE.IOException) -> IO ThreadId
myThreadId IO ThreadId -> (ThreadId -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Connection -> CloseType -> SomeException -> ThreadId -> IO ()
killConnection Connection
conn CloseType
Abnormal (IOException -> SomeException
forall e. Exception e => e -> SomeException
CE.toException IOException
e))
    Connection -> IO ()
connectionReceiver Connection
conn
  where
    closedByUserEx :: AMQPException
closedByUserEx = CloseType -> String -> AMQPException
ConnectionClosedException CloseType
Normal String
"closed by user"

    forwardToChannel :: a -> FramePayload -> IO ()
forwardToChannel a
0 (MethodPayload MethodPayload
Connection_close_ok) =
        IO ThreadId
myThreadId IO ThreadId -> (ThreadId -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Connection -> CloseType -> SomeException -> ThreadId -> IO ()
killConnection Connection
conn CloseType
Normal (AMQPException -> SomeException
forall e. Exception e => e -> SomeException
CE.toException AMQPException
closedByUserEx)
    forwardToChannel a
0 (MethodPayload (Connection_close ShortInt
_ (ShortString Text
errorMsg) ShortInt
_ ShortInt
_)) = do
        Connection -> Frame -> IO ()
writeFrame (Connection -> Connection
connHandle Connection
conn) (Frame -> IO ()) -> Frame -> IO ()
forall a b. (a -> b) -> a -> b
$ ShortInt -> FramePayload -> Frame
Frame ShortInt
0 (FramePayload -> Frame) -> FramePayload -> Frame
forall a b. (a -> b) -> a -> b
$ MethodPayload -> FramePayload
MethodPayload MethodPayload
Connection_close_ok
        IO ThreadId
myThreadId IO ThreadId -> (ThreadId -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Connection -> CloseType -> SomeException -> ThreadId -> IO ()
killConnection Connection
conn CloseType
Abnormal (AMQPException -> SomeException
forall e. Exception e => e -> SomeException
CE.toException (AMQPException -> SomeException)
-> (Text -> AMQPException) -> Text -> SomeException
forall b c a. (b -> c) -> (a -> b) -> a -> c
. CloseType -> String -> AMQPException
ConnectionClosedException CloseType
Abnormal (String -> AMQPException)
-> (Text -> String) -> Text -> AMQPException
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> String
T.unpack (Text -> SomeException) -> Text -> SomeException
forall a b. (a -> b) -> a -> b
$ Text
errorMsg)
    forwardToChannel a
0 FramePayload
HeartbeatPayload = () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
    forwardToChannel a
0 (MethodPayload (Connection_blocked ShortString
reason)) = ShortString -> IO ()
handleBlocked ShortString
reason
    forwardToChannel a
0 (MethodPayload MethodPayload
Connection_unblocked) = IO ()
handleUnblocked
    forwardToChannel a
0 FramePayload
payload = Handle -> String -> IO ()
hPutStrLn Handle
stderr (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String
"Got unexpected msg on channel zero: " String -> ShowS
forall a. [a] -> [a] -> [a]
++ FramePayload -> String
forall a. Show a => a -> String
show FramePayload
payload
    forwardToChannel a
chanID FramePayload
payload = do
        --got asynchronous msg => forward to registered channel

        MVar (IntMap (Channel, ThreadId))
-> (IntMap (Channel, ThreadId) -> IO ()) -> IO ()
forall a b. MVar a -> (a -> IO b) -> IO b
withMVar (Connection -> MVar (IntMap (Channel, ThreadId))
connChannels Connection
conn) ((IntMap (Channel, ThreadId) -> IO ()) -> IO ())
-> (IntMap (Channel, ThreadId) -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \IntMap (Channel, ThreadId)
cs -> do
            case Int -> IntMap (Channel, ThreadId) -> Maybe (Channel, ThreadId)
forall a. Int -> IntMap a -> Maybe a
IM.lookup (a -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral a
chanID) IntMap (Channel, ThreadId)
cs of
                Just (Channel, ThreadId)
c -> Chan FramePayload -> FramePayload -> IO ()
forall a. Chan a -> a -> IO ()
writeChan (Channel -> Chan FramePayload
inQueue (Channel -> Chan FramePayload) -> Channel -> Chan FramePayload
forall a b. (a -> b) -> a -> b
$ (Channel, ThreadId) -> Channel
forall a b. (a, b) -> a
fst (Channel, ThreadId)
c) FramePayload
payload
                -- TODO: It's unclear how to handle this, although it probably never

                --   happens in practice.

                Maybe (Channel, ThreadId)
Nothing -> Handle -> String -> IO ()
hPutStrLn Handle
stderr (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String
"ERROR: channel not open " String -> ShowS
forall a. [a] -> [a] -> [a]
++ a -> String
forall a. Show a => a -> String
show a
chanID

    handleBlocked :: ShortString -> IO ()
handleBlocked (ShortString Text
reason) = do
        MVar [(Text -> IO (), IO ())]
-> ([(Text -> IO (), IO ())] -> IO ()) -> IO ()
forall a b. MVar a -> (a -> IO b) -> IO b
withMVar (Connection -> MVar [(Text -> IO (), IO ())]
connBlockedHandlers Connection
conn) (([(Text -> IO (), IO ())] -> IO ()) -> IO ())
-> ([(Text -> IO (), IO ())] -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \[(Text -> IO (), IO ())]
listeners ->
            [(Text -> IO (), IO ())]
-> ((Text -> IO (), IO ()) -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [(Text -> IO (), IO ())]
listeners (((Text -> IO (), IO ()) -> IO ()) -> IO ())
-> ((Text -> IO (), IO ()) -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \(Text -> IO ()
l, IO ()
_) -> IO () -> (SomeException -> IO ()) -> IO ()
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
CE.catch (Text -> IO ()
l Text
reason) ((SomeException -> IO ()) -> IO ())
-> (SomeException -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \(SomeException
ex :: CE.SomeException) ->
                Handle -> String -> IO ()
hPutStrLn Handle
stderr (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String
"connection blocked listener threw exception: "String -> ShowS
forall a. [a] -> [a] -> [a]
++ SomeException -> String
forall a. Show a => a -> String
show SomeException
ex

    handleUnblocked :: IO ()
handleUnblocked = do
        MVar [(Text -> IO (), IO ())]
-> ([(Text -> IO (), IO ())] -> IO ()) -> IO ()
forall a b. MVar a -> (a -> IO b) -> IO b
withMVar (Connection -> MVar [(Text -> IO (), IO ())]
connBlockedHandlers Connection
conn) (([(Text -> IO (), IO ())] -> IO ()) -> IO ())
-> ([(Text -> IO (), IO ())] -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \[(Text -> IO (), IO ())]
listeners ->
            [(Text -> IO (), IO ())]
-> ((Text -> IO (), IO ()) -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [(Text -> IO (), IO ())]
listeners (((Text -> IO (), IO ()) -> IO ()) -> IO ())
-> ((Text -> IO (), IO ()) -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \(Text -> IO ()
_, IO ()
l) -> IO () -> (SomeException -> IO ()) -> IO ()
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
CE.catch IO ()
l ((SomeException -> IO ()) -> IO ())
-> (SomeException -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \(SomeException
ex :: CE.SomeException) ->
                Handle -> String -> IO ()
hPutStrLn Handle
stderr (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String
"connection unblocked listener threw exception: "String -> ShowS
forall a. [a] -> [a] -> [a]
++ SomeException -> String
forall a. Show a => a -> String
show SomeException
ex

-- | Opens a connection to a broker specified by the given 'ConnectionOpts' parameter.

openConnection'' :: ConnectionOpts -> IO Connection
openConnection'' :: ConnectionOpts -> IO Connection
openConnection'' ConnectionOpts
connOpts = IO Connection -> IO Connection
forall a. IO a -> IO a
withSocketsDo (IO Connection -> IO Connection) -> IO Connection -> IO Connection
forall a b. (a -> b) -> a -> b
$ do
    Connection
handle <- [SomeException] -> [(String, PortNumber)] -> IO Connection
connect [] ([(String, PortNumber)] -> IO Connection)
-> [(String, PortNumber)] -> IO Connection
forall a b. (a -> b) -> a -> b
$ ConnectionOpts -> [(String, PortNumber)]
coServers ConnectionOpts
connOpts
    (Word32
maxFrameSize, ShortInt
maxChannel, Maybe ShortInt
heartbeatTimeout, FieldTable
serverProps) <- (IOException -> IO (Word32, ShortInt, Maybe ShortInt, FieldTable))
-> IO (Word32, ShortInt, Maybe ShortInt, FieldTable)
-> IO (Word32, ShortInt, Maybe ShortInt, FieldTable)
forall e a. Exception e => (e -> IO a) -> IO a -> IO a
CE.handle (\(IOException
_ :: CE.IOException) -> AMQPException -> IO (Word32, ShortInt, Maybe ShortInt, FieldTable)
forall e a. Exception e => e -> IO a
CE.throwIO (AMQPException
 -> IO (Word32, ShortInt, Maybe ShortInt, FieldTable))
-> AMQPException
-> IO (Word32, ShortInt, Maybe ShortInt, FieldTable)
forall a b. (a -> b) -> a -> b
$ CloseType -> String -> AMQPException
ConnectionClosedException CloseType
Abnormal String
"Handshake failed. Please check the RabbitMQ logs for more information") (IO (Word32, ShortInt, Maybe ShortInt, FieldTable)
 -> IO (Word32, ShortInt, Maybe ShortInt, FieldTable))
-> IO (Word32, ShortInt, Maybe ShortInt, FieldTable)
-> IO (Word32, ShortInt, Maybe ShortInt, FieldTable)
forall a b. (a -> b) -> a -> b
$ do
        Connection -> ByteString -> IO ()
Conn.connectionPut Connection
handle (ByteString -> IO ()) -> ByteString -> IO ()
forall a b. (a -> b) -> a -> b
$ ByteString -> ByteString -> ByteString
BS.append (String -> ByteString
BC.pack String
"AMQP")
            ([Octet] -> ByteString
BS.pack [
                      Octet
1
                    , Octet
1 --TCP/IP

                    , Octet
0 --Major Version

                    , Octet
9 --Minor Version

                    ])

        -- S: connection.start

        Frame ShortInt
0 (MethodPayload (Connection_start Octet
_ Octet
_ FieldTable
serverProps (LongString ByteString
serverMechanisms) LongString
_)) <- Connection -> IO Frame
readFrame Connection
handle
        SASLMechanism
selectedSASL <- Connection -> ByteString -> IO SASLMechanism
selectSASLMechanism Connection
handle ByteString
serverMechanisms

        -- C: start_ok

        Connection -> Frame -> IO ()
writeFrame Connection
handle (Frame -> IO ()) -> Frame -> IO ()
forall a b. (a -> b) -> a -> b
$ SASLMechanism -> Frame
start_ok SASLMechanism
selectedSASL
        -- S: secure or tune

        Frame ShortInt
0 (MethodPayload (Connection_tune ShortInt
channel_max Word32
frame_max ShortInt
sendHeartbeat)) <- Connection -> SASLMechanism -> IO Frame
handleSecureUntilTune Connection
handle SASLMechanism
selectedSASL
        -- C: tune_ok

        let maxFrameSize :: Word32
maxFrameSize = Word32 -> Maybe Word32 -> Word32
forall a. Ord a => a -> Maybe a -> a
chooseMin Word32
frame_max (Maybe Word32 -> Word32) -> Maybe Word32 -> Word32
forall a b. (a -> b) -> a -> b
$ ConnectionOpts -> Maybe Word32
coMaxFrameSize ConnectionOpts
connOpts
            finalHeartbeatSec :: ShortInt
finalHeartbeatSec = ShortInt -> Maybe ShortInt -> ShortInt
forall a. a -> Maybe a -> a
fromMaybe ShortInt
sendHeartbeat (ConnectionOpts -> Maybe ShortInt
coHeartbeatDelay ConnectionOpts
connOpts)
            heartbeatTimeout :: Maybe ShortInt
heartbeatTimeout = (ShortInt -> Bool) -> Maybe ShortInt -> Maybe ShortInt
forall (m :: * -> *) a. MonadPlus m => (a -> Bool) -> m a -> m a
mfilter (ShortInt -> ShortInt -> Bool
forall a. Eq a => a -> a -> Bool
/=ShortInt
0) (ShortInt -> Maybe ShortInt
forall a. a -> Maybe a
Just ShortInt
finalHeartbeatSec)

            fixChanNum :: p -> p
fixChanNum p
x = if p
x p -> p -> Bool
forall a. Eq a => a -> a -> Bool
== p
0 then p
65535 else p
x
            maxChannel :: ShortInt
maxChannel = ShortInt -> Maybe ShortInt -> ShortInt
forall a. Ord a => a -> Maybe a -> a
chooseMin (ShortInt -> ShortInt
forall p. (Eq p, Num p) => p -> p
fixChanNum ShortInt
channel_max) (Maybe ShortInt -> ShortInt) -> Maybe ShortInt -> ShortInt
forall a b. (a -> b) -> a -> b
$ (ShortInt -> ShortInt) -> Maybe ShortInt -> Maybe ShortInt
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ShortInt -> ShortInt
forall p. (Eq p, Num p) => p -> p
fixChanNum (Maybe ShortInt -> Maybe ShortInt)
-> Maybe ShortInt -> Maybe ShortInt
forall a b. (a -> b) -> a -> b
$ ConnectionOpts -> Maybe ShortInt
coMaxChannel ConnectionOpts
connOpts

        Connection -> Frame -> IO ()
writeFrame Connection
handle (ShortInt -> FramePayload -> Frame
Frame ShortInt
0 (MethodPayload -> FramePayload
MethodPayload
            (ShortInt -> Word32 -> ShortInt -> MethodPayload
Connection_tune_ok ShortInt
maxChannel Word32
maxFrameSize ShortInt
finalHeartbeatSec)
            ))
        -- C: open

        Connection -> Frame -> IO ()
writeFrame Connection
handle Frame
open

        -- S: open_ok

        Frame ShortInt
0 (MethodPayload (Connection_open_ok ShortString
_)) <- Connection -> IO Frame
readFrame Connection
handle

        -- Connection established!

        (Word32, ShortInt, Maybe ShortInt, FieldTable)
-> IO (Word32, ShortInt, Maybe ShortInt, FieldTable)
forall (m :: * -> *) a. Monad m => a -> m a
return (Word32
maxFrameSize, ShortInt
maxChannel, Maybe ShortInt
heartbeatTimeout, FieldTable
serverProps)

    --build Connection object

    MVar (IntMap (Channel, ThreadId))
cChannels <- IntMap (Channel, ThreadId)
-> IO (MVar (IntMap (Channel, ThreadId)))
forall a. a -> IO (MVar a)
newMVar IntMap (Channel, ThreadId)
forall a. IntMap a
IM.empty
    MVar (Maybe (CloseType, String))
cClosed <- Maybe (CloseType, String) -> IO (MVar (Maybe (CloseType, String)))
forall a. a -> IO (MVar a)
newMVar Maybe (CloseType, String)
forall a. Maybe a
Nothing
    ChannelAllocator
cChanAllocator <- Int -> IO ChannelAllocator
newChannelAllocator (Int -> IO ChannelAllocator) -> Int -> IO ChannelAllocator
forall a b. (a -> b) -> a -> b
$ ShortInt -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral ShortInt
maxChannel
    Int
_ <- ChannelAllocator -> IO Int
allocateChannel ChannelAllocator
cChanAllocator -- allocate channel 0

    MVar ()
writeLock <- () -> IO (MVar ())
forall a. a -> IO (MVar a)
newMVar ()
    MVar ()
ccl <- IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
    MVar [IO ()]
cClosedHandlers <- [IO ()] -> IO (MVar [IO ()])
forall a. a -> IO (MVar a)
newMVar []
    MVar [(Text -> IO (), IO ())]
cBlockedHandlers <- [(Text -> IO (), IO ())] -> IO (MVar [(Text -> IO (), IO ())])
forall a. a -> IO (MVar a)
newMVar []
    MVar Int64
cLastReceived <- IO Int64
getTimestamp IO Int64 -> (Int64 -> IO (MVar Int64)) -> IO (MVar Int64)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Int64 -> IO (MVar Int64)
forall a. a -> IO (MVar a)
newMVar
    MVar Int64
cLastSent <- IO Int64
getTimestamp IO Int64 -> (Int64 -> IO (MVar Int64)) -> IO (MVar Int64)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Int64 -> IO (MVar Int64)
forall a. a -> IO (MVar a)
newMVar
    MVar (Maybe ThreadId)
cThread <- Maybe ThreadId -> IO (MVar (Maybe ThreadId))
forall a. a -> IO (MVar a)
newMVar Maybe ThreadId
forall a. Maybe a
Nothing

    let conn :: Connection
conn = Connection
-> ChannelAllocator
-> MVar (IntMap (Channel, ThreadId))
-> Int
-> MVar (Maybe (CloseType, String))
-> MVar ()
-> MVar ()
-> MVar [IO ()]
-> MVar [(Text -> IO (), IO ())]
-> MVar Int64
-> MVar Int64
-> FieldTable
-> MVar (Maybe ThreadId)
-> Connection
Connection Connection
handle ChannelAllocator
cChanAllocator MVar (IntMap (Channel, ThreadId))
cChannels (Word32 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word32
maxFrameSize) MVar (Maybe (CloseType, String))
cClosed MVar ()
ccl MVar ()
writeLock MVar [IO ()]
cClosedHandlers MVar [(Text -> IO (), IO ())]
cBlockedHandlers MVar Int64
cLastReceived MVar Int64
cLastSent FieldTable
serverProps MVar (Maybe ThreadId)
cThread

    --spawn the connectionReceiver

    ThreadId
connThreadId <- IO () -> (Either SomeException () -> IO ()) -> IO ThreadId
forall a. IO a -> (Either SomeException a -> IO ()) -> IO ThreadId
forkFinally' (Connection -> IO ()
connectionReceiver Connection
conn) ((Either SomeException () -> IO ()) -> IO ThreadId)
-> (Either SomeException () -> IO ()) -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ \Either SomeException ()
res -> do
        -- try closing socket

        IO () -> (SomeException -> IO ()) -> IO ()
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
CE.catch (Connection -> IO ()
Conn.connectionClose Connection
handle) (\(SomeException
_ :: CE.SomeException) -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ())

        -- mark as closed

        MVar (Maybe (CloseType, String))
-> (Maybe (CloseType, String) -> IO (Maybe (CloseType, String)))
-> IO ()
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ MVar (Maybe (CloseType, String))
cClosed ((Maybe (CloseType, String) -> IO (Maybe (CloseType, String)))
 -> IO ())
-> (Maybe (CloseType, String) -> IO (Maybe (CloseType, String)))
-> IO ()
forall a b. (a -> b) -> a -> b
$ Maybe (CloseType, String) -> IO (Maybe (CloseType, String))
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe (CloseType, String) -> IO (Maybe (CloseType, String)))
-> (Maybe (CloseType, String) -> Maybe (CloseType, String))
-> Maybe (CloseType, String)
-> IO (Maybe (CloseType, String))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (CloseType, String) -> Maybe (CloseType, String)
forall a. a -> Maybe a
Just ((CloseType, String) -> Maybe (CloseType, String))
-> (Maybe (CloseType, String) -> (CloseType, String))
-> Maybe (CloseType, String)
-> Maybe (CloseType, String)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (CloseType, String)
-> Maybe (CloseType, String) -> (CloseType, String)
forall a. a -> Maybe a -> a
fromMaybe (CloseType
Abnormal, String
"unknown reason")

        -- kill all channel-threads, making sure the channel threads will

        -- be killed taking into account the overall state of the

        -- connection: if the thread died for an unexpected exception,

        -- inform the channel threads downstream accordingly. Otherwise

        -- just use a normal 'killThread' finaliser.

        let finaliser :: ChanThreadKilledException
finaliser = SomeException -> ChanThreadKilledException
ChanThreadKilledException (SomeException -> ChanThreadKilledException)
-> SomeException -> ChanThreadKilledException
forall a b. (a -> b) -> a -> b
$ case Either SomeException ()
res of
                Left SomeException
ex -> SomeException
ex
                Right ()
_ -> AsyncException -> SomeException
forall e. Exception e => e -> SomeException
CE.toException AsyncException
CE.ThreadKilled
        MVar (IntMap (Channel, ThreadId))
-> (IntMap (Channel, ThreadId) -> IO (IntMap (Channel, ThreadId)))
-> IO ()
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ MVar (IntMap (Channel, ThreadId))
cChannels ((IntMap (Channel, ThreadId) -> IO (IntMap (Channel, ThreadId)))
 -> IO ())
-> (IntMap (Channel, ThreadId) -> IO (IntMap (Channel, ThreadId)))
-> IO ()
forall a b. (a -> b) -> a -> b
$ \IntMap (Channel, ThreadId)
x -> do
            ((Channel, ThreadId) -> IO ()) -> [(Channel, ThreadId)] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ ((ThreadId -> ChanThreadKilledException -> IO ())
-> ChanThreadKilledException -> ThreadId -> IO ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip ThreadId -> ChanThreadKilledException -> IO ()
forall e. Exception e => ThreadId -> e -> IO ()
CE.throwTo ChanThreadKilledException
finaliser (ThreadId -> IO ())
-> ((Channel, ThreadId) -> ThreadId)
-> (Channel, ThreadId)
-> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Channel, ThreadId) -> ThreadId
forall a b. (a, b) -> b
snd) ([(Channel, ThreadId)] -> IO ()) -> [(Channel, ThreadId)] -> IO ()
forall a b. (a -> b) -> a -> b
$ IntMap (Channel, ThreadId) -> [(Channel, ThreadId)]
forall a. IntMap a -> [a]
IM.elems IntMap (Channel, ThreadId)
x
            IntMap (Channel, ThreadId) -> IO (IntMap (Channel, ThreadId))
forall (m :: * -> *) a. Monad m => a -> m a
return IntMap (Channel, ThreadId)
forall a. IntMap a
IM.empty

        -- mark connection as closed, so all pending calls to 'closeConnection' can now return

        IO Bool -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Bool -> IO ()) -> IO Bool -> IO ()
forall a b. (a -> b) -> a -> b
$ MVar () -> () -> IO Bool
forall a. MVar a -> a -> IO Bool
tryPutMVar MVar ()
ccl ()

        -- notify connection-close-handlers

        MVar [IO ()] -> ([IO ()] -> IO ()) -> IO ()
forall a b. MVar a -> (a -> IO b) -> IO b
withMVar MVar [IO ()]
cClosedHandlers [IO ()] -> IO ()
forall (t :: * -> *) (m :: * -> *) a.
(Foldable t, Monad m) =>
t (m a) -> m ()
sequence_

    case Maybe ShortInt
heartbeatTimeout of
        Maybe ShortInt
Nothing      -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
        Just ShortInt
timeout -> do
            ThreadId
heartbeatThread <- Connection -> Int -> ThreadId -> IO ThreadId
watchHeartbeats Connection
conn (ShortInt -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral ShortInt
timeout) ThreadId
connThreadId
            Connection -> Bool -> IO () -> IO ()
addConnectionClosedHandler Connection
conn Bool
True (ThreadId -> IO ()
killThread ThreadId
heartbeatThread)

    MVar (Maybe ThreadId)
-> (Maybe ThreadId -> IO (Maybe ThreadId)) -> IO ()
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ MVar (Maybe ThreadId)
cThread ((Maybe ThreadId -> IO (Maybe ThreadId)) -> IO ())
-> (Maybe ThreadId -> IO (Maybe ThreadId)) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Maybe ThreadId
_ -> Maybe ThreadId -> IO (Maybe ThreadId)
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe ThreadId -> IO (Maybe ThreadId))
-> Maybe ThreadId -> IO (Maybe ThreadId)
forall a b. (a -> b) -> a -> b
$ ThreadId -> Maybe ThreadId
forall a. a -> Maybe a
Just ThreadId
connThreadId
    Connection -> IO Connection
forall (m :: * -> *) a. Monad m => a -> m a
return Connection
conn
  where
    connect :: [SomeException] -> [(String, PortNumber)] -> IO Connection
connect [SomeException]
excs ((String
host, PortNumber
port) : [(String, PortNumber)]
rest) = do
        ConnectionContext
ctx <- IO ConnectionContext
Conn.initConnectionContext
        Either SomeException Connection
result <- IO Connection -> IO (Either SomeException Connection)
forall e a. Exception e => IO a -> IO (Either e a)
CE.try (IO Connection -> IO (Either SomeException Connection))
-> IO Connection -> IO (Either SomeException Connection)
forall a b. (a -> b) -> a -> b
$ ConnectionContext -> ConnectionParams -> IO Connection
Conn.connectTo ConnectionContext
ctx (ConnectionParams -> IO Connection)
-> ConnectionParams -> IO Connection
forall a b. (a -> b) -> a -> b
$ ConnectionParams :: String
-> PortNumber
-> Maybe TLSSettings
-> Maybe ProxySettings
-> ConnectionParams
Conn.ConnectionParams
                    { connectionHostname :: String
Conn.connectionHostname  = String
host
                    , connectionPort :: PortNumber
Conn.connectionPort      = PortNumber
port
                    , connectionUseSecure :: Maybe TLSSettings
Conn.connectionUseSecure = Maybe TLSSettings
tlsSettings
                    , connectionUseSocks :: Maybe ProxySettings
Conn.connectionUseSocks  = Maybe ProxySettings
forall a. Maybe a
Nothing
                    }
        (SomeException -> IO Connection)
-> (Connection -> IO Connection)
-> Either SomeException Connection
-> IO Connection
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either (\(SomeException
ex :: CE.SomeException) -> [SomeException] -> [(String, PortNumber)] -> IO Connection
connect (SomeException
exSomeException -> [SomeException] -> [SomeException]
forall a. a -> [a] -> [a]
:[SomeException]
excs) [(String, PortNumber)]
rest)
               Connection -> IO Connection
forall (m :: * -> *) a. Monad m => a -> m a
return
               Either SomeException Connection
result
    connect [SomeException]
excs [] = AMQPException -> IO Connection
forall e a. Exception e => e -> IO a
CE.throwIO (AMQPException -> IO Connection) -> AMQPException -> IO Connection
forall a b. (a -> b) -> a -> b
$ CloseType -> String -> AMQPException
ConnectionClosedException CloseType
Abnormal (String -> AMQPException) -> String -> AMQPException
forall a b. (a -> b) -> a -> b
$ String
"Could not connect to any of the provided brokers: " String -> ShowS
forall a. [a] -> [a] -> [a]
++ [((String, PortNumber), SomeException)] -> String
forall a. Show a => a -> String
show ([(String, PortNumber)]
-> [SomeException] -> [((String, PortNumber), SomeException)]
forall a b. [a] -> [b] -> [(a, b)]
zip (ConnectionOpts -> [(String, PortNumber)]
coServers ConnectionOpts
connOpts) ([SomeException] -> [SomeException]
forall a. [a] -> [a]
reverse [SomeException]
excs))
    tlsSettings :: Maybe TLSSettings
tlsSettings = Maybe TLSSettings
-> (TLSSettings -> Maybe TLSSettings)
-> Maybe TLSSettings
-> Maybe TLSSettings
forall b a. b -> (a -> b) -> Maybe a -> b
maybe Maybe TLSSettings
forall a. Maybe a
Nothing TLSSettings -> Maybe TLSSettings
connectionTLSSettings (ConnectionOpts -> Maybe TLSSettings
coTLSSettings ConnectionOpts
connOpts)
    selectSASLMechanism :: Connection -> ByteString -> IO SASLMechanism
selectSASLMechanism Connection
handle ByteString
serverMechanisms =
        let serverSaslList :: [Text]
serverSaslList = (Char -> Bool) -> Text -> [Text]
T.split (Char -> Char -> Bool
forall a. Eq a => a -> a -> Bool
== Char
' ') (Text -> [Text]) -> Text -> [Text]
forall a b. (a -> b) -> a -> b
$ ByteString -> Text
E.decodeUtf8 ByteString
serverMechanisms
            clientMechanisms :: [SASLMechanism]
clientMechanisms = ConnectionOpts -> [SASLMechanism]
coAuth ConnectionOpts
connOpts
            clientSaslList :: [Text]
clientSaslList = (SASLMechanism -> Text) -> [SASLMechanism] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map SASLMechanism -> Text
saslName [SASLMechanism]
clientMechanisms
            maybeSasl :: Maybe SASLMechanism
maybeSasl = (SASLMechanism -> Bool) -> [SASLMechanism] -> Maybe SASLMechanism
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Maybe a
F.find (\(SASLMechanism Text
name ByteString
_ Maybe (ByteString -> IO ByteString)
_) -> Text -> [Text] -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
elem Text
name [Text]
serverSaslList) [SASLMechanism]
clientMechanisms
        in Maybe SASLMechanism -> Connection -> String -> IO SASLMechanism
forall b. Maybe b -> Connection -> String -> IO b
abortIfNothing Maybe SASLMechanism
maybeSasl Connection
handle
            (String
"None of the provided SASL mechanisms "String -> ShowS
forall a. [a] -> [a] -> [a]
++[Text] -> String
forall a. Show a => a -> String
show [Text]
clientSaslListString -> ShowS
forall a. [a] -> [a] -> [a]
++String
" is supported by the server "String -> ShowS
forall a. [a] -> [a] -> [a]
++[Text] -> String
forall a. Show a => a -> String
show [Text]
serverSaslListString -> ShowS
forall a. [a] -> [a] -> [a]
++String
".")

    start_ok :: SASLMechanism -> Frame
start_ok SASLMechanism
sasl = ShortInt -> FramePayload -> Frame
Frame ShortInt
0 (FramePayload -> Frame) -> FramePayload -> Frame
forall a b. (a -> b) -> a -> b
$ MethodPayload -> FramePayload
MethodPayload (MethodPayload -> FramePayload) -> MethodPayload -> FramePayload
forall a b. (a -> b) -> a -> b
$ FieldTable
-> ShortString -> LongString -> ShortString -> MethodPayload
Connection_start_ok
                        FieldTable
clientProperties
                        (Text -> ShortString
ShortString (Text -> ShortString) -> Text -> ShortString
forall a b. (a -> b) -> a -> b
$ SASLMechanism -> Text
saslName SASLMechanism
sasl)
                        (ByteString -> LongString
LongString (ByteString -> LongString) -> ByteString -> LongString
forall a b. (a -> b) -> a -> b
$ SASLMechanism -> ByteString
saslInitialResponse SASLMechanism
sasl)
                        (Text -> ShortString
ShortString Text
"en_US")
      where
        clientProperties :: FieldTable
clientProperties = Map Text FieldValue -> FieldTable
FieldTable (Map Text FieldValue -> FieldTable)
-> Map Text FieldValue -> FieldTable
forall a b. (a -> b) -> a -> b
$ [(Text, FieldValue)] -> Map Text FieldValue
forall k a. Ord k => [(k, a)] -> Map k a
M.fromList ([(Text, FieldValue)] -> Map Text FieldValue)
-> [(Text, FieldValue)] -> Map Text FieldValue
forall a b. (a -> b) -> a -> b
$ [
            (Text
"platform", ByteString -> FieldValue
FVString ByteString
"Haskell"),
            (Text
"version" , ByteString -> FieldValue
FVString (ByteString -> FieldValue)
-> (String -> ByteString) -> String -> FieldValue
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> ByteString
E.encodeUtf8 (Text -> ByteString) -> (String -> Text) -> String -> ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> Text
T.pack (String -> FieldValue) -> String -> FieldValue
forall a b. (a -> b) -> a -> b
$ Version -> String
showVersion Version
version),
            (Text
"capabilities", FieldTable -> FieldValue
FVFieldTable FieldTable
clientCapabilities)
          ] [(Text, FieldValue)]
-> [(Text, FieldValue)] -> [(Text, FieldValue)]
forall a. [a] -> [a] -> [a]
++ [(Text, FieldValue)]
-> (Text -> [(Text, FieldValue)])
-> Maybe Text
-> [(Text, FieldValue)]
forall b a. b -> (a -> b) -> Maybe a -> b
maybe [] (\Text
x -> [(Text
"connection_name", ByteString -> FieldValue
FVString (ByteString -> FieldValue) -> ByteString -> FieldValue
forall a b. (a -> b) -> a -> b
$ Text -> ByteString
E.encodeUtf8 Text
x)]) (ConnectionOpts -> Maybe Text
coName ConnectionOpts
connOpts)

        clientCapabilities :: FieldTable
clientCapabilities = Map Text FieldValue -> FieldTable
FieldTable (Map Text FieldValue -> FieldTable)
-> Map Text FieldValue -> FieldTable
forall a b. (a -> b) -> a -> b
$ [(Text, FieldValue)] -> Map Text FieldValue
forall k a. Ord k => [(k, a)] -> Map k a
M.fromList [
            (Text
"consumer_cancel_notify", Bool -> FieldValue
FVBool Bool
True),
            (Text
"connection.blocked", Bool -> FieldValue
FVBool Bool
True)
          ]

    handleSecureUntilTune :: Connection -> SASLMechanism -> IO Frame
handleSecureUntilTune Connection
handle SASLMechanism
sasl = do
        Frame
tuneOrSecure <- Connection -> IO Frame
readFrame Connection
handle
        case Frame
tuneOrSecure of
            Frame ShortInt
0 (MethodPayload (Connection_secure (LongString ByteString
challenge))) -> do
                ByteString -> IO ByteString
processChallenge <- Maybe (ByteString -> IO ByteString)
-> Connection -> String -> IO (ByteString -> IO ByteString)
forall b. Maybe b -> Connection -> String -> IO b
abortIfNothing (SASLMechanism -> Maybe (ByteString -> IO ByteString)
saslChallengeFunc SASLMechanism
sasl)
                    Connection
handle (String -> IO (ByteString -> IO ByteString))
-> String -> IO (ByteString -> IO ByteString)
forall a b. (a -> b) -> a -> b
$ String
"The server provided a challenge, but the selected SASL mechanism "String -> ShowS
forall a. [a] -> [a] -> [a]
++Text -> String
forall a. Show a => a -> String
show (SASLMechanism -> Text
saslName SASLMechanism
sasl)String -> ShowS
forall a. [a] -> [a] -> [a]
++String
" is not equipped with a challenge processing function."
                ByteString
challengeResponse <- ByteString -> IO ByteString
processChallenge ByteString
challenge
                Connection -> Frame -> IO ()
writeFrame Connection
handle (ShortInt -> FramePayload -> Frame
Frame ShortInt
0 (MethodPayload -> FramePayload
MethodPayload (LongString -> MethodPayload
Connection_secure_ok (ByteString -> LongString
LongString ByteString
challengeResponse))))
                Connection -> SASLMechanism -> IO Frame
handleSecureUntilTune Connection
handle SASLMechanism
sasl

            tune :: Frame
tune@(Frame ShortInt
0 (MethodPayload Connection_tune{})) -> Frame -> IO Frame
forall (m :: * -> *) a. Monad m => a -> m a
return Frame
tune
            Frame
x -> String -> IO Frame
forall a. HasCallStack => String -> a
error (String -> IO Frame) -> String -> IO Frame
forall a b. (a -> b) -> a -> b
$ String
"handleSecureUntilTune fail. received message: "String -> ShowS
forall a. [a] -> [a] -> [a]
++Frame -> String
forall a. Show a => a -> String
show Frame
x

    open :: Frame
open = ShortInt -> FramePayload -> Frame
Frame ShortInt
0 (FramePayload -> Frame) -> FramePayload -> Frame
forall a b. (a -> b) -> a -> b
$ MethodPayload -> FramePayload
MethodPayload (MethodPayload -> FramePayload) -> MethodPayload -> FramePayload
forall a b. (a -> b) -> a -> b
$ ShortString -> ShortString -> Bool -> MethodPayload
Connection_open
        (Text -> ShortString
ShortString (Text -> ShortString) -> Text -> ShortString
forall a b. (a -> b) -> a -> b
$ ConnectionOpts -> Text
coVHost ConnectionOpts
connOpts)
        (Text -> ShortString
ShortString (Text -> ShortString) -> Text -> ShortString
forall a b. (a -> b) -> a -> b
$ String -> Text
T.pack String
"") -- capabilities; deprecated in 0-9-1

        Bool
True -- insist; deprecated in 0-9-1


    abortHandshake :: Connection -> String -> IO b
abortHandshake Connection
handle String
msg = do
        Connection -> IO ()
Conn.connectionClose Connection
handle
        AMQPException -> IO b
forall e a. Exception e => e -> IO a
CE.throwIO (AMQPException -> IO b) -> AMQPException -> IO b
forall a b. (a -> b) -> a -> b
$ CloseType -> String -> AMQPException
ConnectionClosedException CloseType
Abnormal String
msg

    abortIfNothing :: Maybe b -> Connection -> String -> IO b
abortIfNothing Maybe b
m Connection
handle String
msg = case Maybe b
m of
        Maybe b
Nothing -> Connection -> String -> IO b
forall b. Connection -> String -> IO b
abortHandshake Connection
handle String
msg
        Just b
a  -> b -> IO b
forall (m :: * -> *) a. Monad m => a -> m a
return b
a


watchHeartbeats :: Connection -> Int -> ThreadId -> IO ThreadId
watchHeartbeats :: Connection -> Int -> ThreadId -> IO ThreadId
watchHeartbeats Connection
conn Int
timeout ThreadId
connThread = Int -> IO () -> IO ThreadId
scheduleAtFixedRate Int
rate (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ do
    IO ()
checkSendTimeout
    IO ()
checkReceiveTimeout
  where
    rate :: Int
rate = Int
timeout Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1000 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
250 -- timeout / 4 in µs

    receiveTimeout :: Int64
receiveTimeout = Int -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
rate Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
* Int64
4 Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
* Int64
2 -- 2*timeout in µs

    sendTimeout :: Int64
sendTimeout = Int -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
rate Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
* Int64
2 -- timeout/2 in µs


    skippedBeatEx :: AMQPException
skippedBeatEx = CloseType -> String -> AMQPException
ConnectionClosedException CloseType
Abnormal String
"killed connection after missing 2 consecutive heartbeats"

    checkReceiveTimeout :: IO ()
checkReceiveTimeout = MVar Int64 -> Int64 -> IO () -> IO ()
doCheck (Connection -> MVar Int64
connLastReceived Connection
conn) Int64
receiveTimeout (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
        Connection -> CloseType -> SomeException -> ThreadId -> IO ()
killConnection Connection
conn CloseType
Abnormal (AMQPException -> SomeException
forall e. Exception e => e -> SomeException
CE.toException AMQPException
skippedBeatEx) ThreadId
connThread

    checkSendTimeout :: IO ()
checkSendTimeout = MVar Int64 -> Int64 -> IO () -> IO ()
doCheck (Connection -> MVar Int64
connLastSent Connection
conn) Int64
sendTimeout (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
        Connection -> Frame -> IO ()
writeFrame (Connection -> Connection
connHandle Connection
conn) (ShortInt -> FramePayload -> Frame
Frame ShortInt
0 FramePayload
HeartbeatPayload)

    doCheck :: MVar Int64 -> Int64 -> IO () -> IO ()
doCheck MVar Int64
var Int64
timeout_µs IO ()
action = MVar Int64 -> (Int64 -> IO ()) -> IO ()
forall a b. MVar a -> (a -> IO b) -> IO b
withMVar MVar Int64
var ((Int64 -> IO ()) -> IO ()) -> (Int64 -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Int64
lastFrameTime -> do
        Int64
time <- IO Int64
getTimestamp
        Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int64
time Int64 -> Int64 -> Bool
forall a. Ord a => a -> a -> Bool
>= Int64
lastFrameTime Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
+ Int64
timeout_µs) IO ()
action

updateLastSent :: Connection -> IO ()
updateLastSent :: Connection -> IO ()
updateLastSent Connection
conn = MVar Int64 -> (Int64 -> IO Int64) -> IO ()
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ (Connection -> MVar Int64
connLastSent Connection
conn) (IO Int64 -> Int64 -> IO Int64
forall a b. a -> b -> a
const IO Int64
getTimestamp)

updateLastReceived :: Connection -> IO ()
updateLastReceived :: Connection -> IO ()
updateLastReceived Connection
conn = MVar Int64 -> (Int64 -> IO Int64) -> IO ()
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ (Connection -> MVar Int64
connLastReceived Connection
conn) (IO Int64 -> Int64 -> IO Int64
forall a b. a -> b -> a
const IO Int64
getTimestamp)

-- | kill the connection thread abruptly

killConnection :: Connection -> CloseType -> CE.SomeException -> ThreadId -> IO ()
killConnection :: Connection -> CloseType -> SomeException -> ThreadId -> IO ()
killConnection Connection
conn CloseType
closeType SomeException
ex ThreadId
connThreadId = do
    MVar (Maybe (CloseType, String))
-> (Maybe (CloseType, String) -> IO (Maybe (CloseType, String)))
-> IO ()
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ (Connection -> MVar (Maybe (CloseType, String))
connClosed Connection
conn) ((Maybe (CloseType, String) -> IO (Maybe (CloseType, String)))
 -> IO ())
-> (Maybe (CloseType, String) -> IO (Maybe (CloseType, String)))
-> IO ()
forall a b. (a -> b) -> a -> b
$ IO (Maybe (CloseType, String))
-> Maybe (CloseType, String) -> IO (Maybe (CloseType, String))
forall a b. a -> b -> a
const (IO (Maybe (CloseType, String))
 -> Maybe (CloseType, String) -> IO (Maybe (CloseType, String)))
-> IO (Maybe (CloseType, String))
-> Maybe (CloseType, String)
-> IO (Maybe (CloseType, String))
forall a b. (a -> b) -> a -> b
$ Maybe (CloseType, String) -> IO (Maybe (CloseType, String))
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe (CloseType, String) -> IO (Maybe (CloseType, String)))
-> Maybe (CloseType, String) -> IO (Maybe (CloseType, String))
forall a b. (a -> b) -> a -> b
$ (CloseType, String) -> Maybe (CloseType, String)
forall a. a -> Maybe a
Just (CloseType
closeType, SomeException -> String
forall a. Show a => a -> String
show SomeException
ex)
    ThreadId -> SomeException -> IO ()
forall e. Exception e => ThreadId -> e -> IO ()
throwTo ThreadId
connThreadId SomeException
ex

-- | closes a connection

--

-- Make sure to call this function before your program exits to ensure that all published messages are received by the server.

closeConnection :: Connection -> IO ()
closeConnection :: Connection -> IO ()
closeConnection Connection
c = do
    IO () -> (IOException -> IO ()) -> IO ()
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
CE.catch (
        MVar () -> (() -> IO ()) -> IO ()
forall a b. MVar a -> (a -> IO b) -> IO b
withMVar (Connection -> MVar ()
connWriteLock Connection
c) ((() -> IO ()) -> IO ()) -> (() -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \()
_ -> Connection -> Frame -> IO ()
writeFrame (Connection -> Connection
connHandle Connection
c) (Frame -> IO ()) -> Frame -> IO ()
forall a b. (a -> b) -> a -> b
$ ShortInt -> FramePayload -> Frame
Frame ShortInt
0 (FramePayload -> Frame) -> FramePayload -> Frame
forall a b. (a -> b) -> a -> b
$ MethodPayload -> FramePayload
MethodPayload (MethodPayload -> FramePayload) -> MethodPayload -> FramePayload
forall a b. (a -> b) -> a -> b
$ ShortInt -> ShortString -> ShortInt -> ShortInt -> MethodPayload
Connection_close
            --TODO: set these values

            ShortInt
0 -- reply_code

            (Text -> ShortString
ShortString Text
"") -- reply_text

            ShortInt
0 -- class_id

            ShortInt
0 -- method_id

        )
        (\ (IOException
e :: CE.IOException) -> do
            -- Make sure the connection is closed.

            -- (This pattern match can't fail, since openConnection'' will always fill

            --  the variable in, and there's no other way to get a connection.)

            Just ThreadId
thrID <- MVar (Maybe ThreadId) -> IO (Maybe ThreadId)
forall a. MVar a -> IO a
readMVar (Connection -> MVar (Maybe ThreadId)
connThread Connection
c)
            Connection -> CloseType -> SomeException -> ThreadId -> IO ()
killConnection Connection
c CloseType
Abnormal (IOException -> SomeException
forall e. Exception e => e -> SomeException
CE.toException IOException
e) ThreadId
thrID
        )

    -- wait for connection_close_ok by the server; this MVar gets filled in the CE.finally handler in openConnection'

    MVar () -> IO ()
forall a. MVar a -> IO a
readMVar (MVar () -> IO ()) -> MVar () -> IO ()
forall a b. (a -> b) -> a -> b
$ Connection -> MVar ()
connClosedLock Connection
c
    () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()

-- | get the server properties sent in connection.start

getServerProperties :: Connection -> IO FieldTable
getServerProperties :: Connection -> IO FieldTable
getServerProperties = FieldTable -> IO FieldTable
forall (m :: * -> *) a. Monad m => a -> m a
return (FieldTable -> IO FieldTable)
-> (Connection -> FieldTable) -> Connection -> IO FieldTable
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Connection -> FieldTable
connServerProperties

-- | @addConnectionClosedHandler conn ifClosed handler@ adds a @handler@ that will be called after the connection is closed (either by calling @closeConnection@ or by an exception). If the @ifClosed@ parameter is True and the connection is already closed, the handler will be called immediately. If @ifClosed == False@ and the connection is already closed, the handler will never be called

addConnectionClosedHandler :: Connection -> Bool -> IO () -> IO ()
addConnectionClosedHandler :: Connection -> Bool -> IO () -> IO ()
addConnectionClosedHandler Connection
conn Bool
ifClosed IO ()
handler = do
    MVar (Maybe (CloseType, String))
-> (Maybe (CloseType, String) -> IO ()) -> IO ()
forall a b. MVar a -> (a -> IO b) -> IO b
withMVar (Connection -> MVar (Maybe (CloseType, String))
connClosed Connection
conn) ((Maybe (CloseType, String) -> IO ()) -> IO ())
-> (Maybe (CloseType, String) -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \case
        -- connection is already closed, so call the handler directly

        Just (CloseType, String)
_ | Bool
ifClosed -> IO ()
handler

        -- otherwise add it to the list

        Maybe (CloseType, String)
_ -> MVar [IO ()] -> ([IO ()] -> IO [IO ()]) -> IO ()
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ (Connection -> MVar [IO ()]
connClosedHandlers Connection
conn) (([IO ()] -> IO [IO ()]) -> IO ())
-> ([IO ()] -> IO [IO ()]) -> IO ()
forall a b. (a -> b) -> a -> b
$ \[IO ()]
old -> [IO ()] -> IO [IO ()]
forall (m :: * -> *) a. Monad m => a -> m a
return ([IO ()] -> IO [IO ()]) -> [IO ()] -> IO [IO ()]
forall a b. (a -> b) -> a -> b
$ IO ()
handlerIO () -> [IO ()] -> [IO ()]
forall a. a -> [a] -> [a]
:[IO ()]
old

-- | @addConnectionBlockedHandler conn blockedHandler unblockedHandler@ adds handlers that will be called

-- when a connection gets blocked/unlocked due to server resource constraints.

--

-- More information: <https://www.rabbitmq.com/connection-blocked.html>

addConnectionBlockedHandler :: Connection -> (Text -> IO ()) -> IO () -> IO ()
addConnectionBlockedHandler :: Connection -> (Text -> IO ()) -> IO () -> IO ()
addConnectionBlockedHandler Connection
conn Text -> IO ()
blockedHandler IO ()
unblockedHandler =
    MVar [(Text -> IO (), IO ())]
-> ([(Text -> IO (), IO ())] -> IO [(Text -> IO (), IO ())])
-> IO ()
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ (Connection -> MVar [(Text -> IO (), IO ())]
connBlockedHandlers Connection
conn) (([(Text -> IO (), IO ())] -> IO [(Text -> IO (), IO ())])
 -> IO ())
-> ([(Text -> IO (), IO ())] -> IO [(Text -> IO (), IO ())])
-> IO ()
forall a b. (a -> b) -> a -> b
$ \[(Text -> IO (), IO ())]
old -> [(Text -> IO (), IO ())] -> IO [(Text -> IO (), IO ())]
forall (m :: * -> *) a. Monad m => a -> m a
return ([(Text -> IO (), IO ())] -> IO [(Text -> IO (), IO ())])
-> [(Text -> IO (), IO ())] -> IO [(Text -> IO (), IO ())]
forall a b. (a -> b) -> a -> b
$ (Text -> IO ()
blockedHandler, IO ()
unblockedHandler)(Text -> IO (), IO ())
-> [(Text -> IO (), IO ())] -> [(Text -> IO (), IO ())]
forall a. a -> [a] -> [a]
:[(Text -> IO (), IO ())]
old

readFrame :: Conn.Connection -> IO Frame
readFrame :: Connection -> IO Frame
readFrame Connection
handle = do
    ByteString
strictDat <- Connection -> Int -> IO ByteString
connectionGetExact Connection
handle Int
7
    let dat :: ByteString
dat = ByteString -> ByteString
toLazy ByteString
strictDat
    -- NB: userError returns an IOException so it will be catched in 'connectionReceiver'

    Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (ByteString -> Bool
BL.null ByteString
dat) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ IOException -> IO ()
forall e a. Exception e => e -> IO a
CE.throwIO (IOException -> IO ()) -> IOException -> IO ()
forall a b. (a -> b) -> a -> b
$ String -> IOException
userError String
"connection not open"
    let len :: Int
len = Word32 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Word32 -> Int) -> Word32 -> Int
forall a b. (a -> b) -> a -> b
$ ByteString -> Word32
peekFrameSize ByteString
dat
    ByteString
strictDat' <- Connection -> Int -> IO ByteString
connectionGetExact Connection
handle (Int
lenInt -> Int -> Int
forall a. Num a => a -> a -> a
+Int
1) -- +1 for the terminating 0xCE

    let dat' :: ByteString
dat' = ByteString -> ByteString
toLazy ByteString
strictDat'
    Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (ByteString -> Bool
BL.null ByteString
dat') (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ IOException -> IO ()
forall e a. Exception e => e -> IO a
CE.throwIO (IOException -> IO ()) -> IOException -> IO ()
forall a b. (a -> b) -> a -> b
$ String -> IOException
userError String
"connection not open"
#if MIN_VERSION_binary(0, 7, 0)
    let ret :: Either (ByteString, Int64, String) (ByteString, Int64, Frame)
ret = Get Frame
-> ByteString
-> Either (ByteString, Int64, String) (ByteString, Int64, Frame)
forall a.
Get a
-> ByteString
-> Either (ByteString, Int64, String) (ByteString, Int64, a)
runGetOrFail Get Frame
forall t. Binary t => Get t
get (ByteString -> ByteString -> ByteString
BL.append ByteString
dat ByteString
dat')
    case Either (ByteString, Int64, String) (ByteString, Int64, Frame)
ret of
        Left (ByteString
_, Int64
_, String
errMsg) -> String -> IO Frame
forall a. HasCallStack => String -> a
error (String -> IO Frame) -> String -> IO Frame
forall a b. (a -> b) -> a -> b
$ String
"readFrame fail: " String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
errMsg
        Right (ByteString
_, Int64
consumedBytes, Frame
_) | Int64
consumedBytes Int64 -> Int64 -> Bool
forall a. Eq a => a -> a -> Bool
/= Int -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int
lenInt -> Int -> Int
forall a. Num a => a -> a -> a
+Int
8) ->
            String -> IO Frame
forall a. HasCallStack => String -> a
error (String -> IO Frame) -> String -> IO Frame
forall a b. (a -> b) -> a -> b
$ String
"readFrame: parser should read " String -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> String
forall a. Show a => a -> String
show (Int
lenInt -> Int -> Int
forall a. Num a => a -> a -> a
+Int
8) String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
" bytes; but read " String -> ShowS
forall a. [a] -> [a] -> [a]
++ Int64 -> String
forall a. Show a => a -> String
show Int64
consumedBytes
        Right (ByteString
_, Int64
_, Frame
frame) -> Frame -> IO Frame
forall (m :: * -> *) a. Monad m => a -> m a
return Frame
frame
#else
    let (frame, _, consumedBytes) = runGetState get (BL.append dat dat') 0
    if consumedBytes /= fromIntegral (len+8)
        then error $ "readFrameSock: parser should read "++show (len+8)++" bytes; but read "++show consumedBytes
        else return ()
    return frame
#endif

-- belongs in connection package and will be removed once it lands there

connectionGetExact :: Conn.Connection -> Int -> IO BS.ByteString
connectionGetExact :: Connection -> Int -> IO ByteString
connectionGetExact Connection
conn Int
x = ByteString -> Int -> IO ByteString
loop ByteString
BS.empty Int
0
  where loop :: ByteString -> Int -> IO ByteString
loop ByteString
bs Int
y
          | Int
y Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
x = ByteString -> IO ByteString
forall (m :: * -> *) a. Monad m => a -> m a
return ByteString
bs
          | Bool
otherwise = do
            ByteString
next <- Connection -> Int -> IO ByteString
Conn.connectionGet Connection
conn (Int
x Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
y)
            ByteString -> Int -> IO ByteString
loop (ByteString -> ByteString -> ByteString
BS.append ByteString
bs ByteString
next) (Int
y Int -> Int -> Int
forall a. Num a => a -> a -> a
+ ByteString -> Int
BS.length ByteString
next)

writeFrame :: Conn.Connection -> Frame -> IO ()
writeFrame :: Connection -> Frame -> IO ()
writeFrame Connection
handle Frame
f = do
    Connection -> ByteString -> IO ()
Conn.connectionPut Connection
handle (ByteString -> IO ()) -> (Frame -> ByteString) -> Frame -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> ByteString
toStrict (ByteString -> ByteString)
-> (Frame -> ByteString) -> Frame -> ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Put -> ByteString
runPut (Put -> ByteString) -> (Frame -> Put) -> Frame -> ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Frame -> Put
forall t. Binary t => t -> Put
put (Frame -> IO ()) -> Frame -> IO ()
forall a b. (a -> b) -> a -> b
$ Frame
f

------------------------ CHANNEL -----------------------------


{- | A connection to an AMQP server is made up of separate channels. It is recommended to use a separate channel for each thread in your application that talks to the AMQP server (but you don't have to as channels are thread-safe)
-}
data Channel = Channel {
    Channel -> Connection
connection :: Connection,
    Channel -> Chan FramePayload
inQueue :: Chan FramePayload, -- incoming frames (from Connection)

    Channel -> MVar (Seq (MVar Assembly))
outstandingResponses :: MVar (Seq.Seq (MVar Assembly)), -- for every request an MVar is stored here waiting for the response

    Channel -> ShortInt
channelID :: Word16,
    Channel -> MVar Int
lastConsumerTag :: MVar Int,

    Channel -> MVar Int
nextPublishSeqNum :: MVar Int,
    Channel -> TVar IntSet
unconfirmedSet :: TVar IntSet.IntSet,
    Channel -> TVar IntSet
ackedSet :: TVar IntSet.IntSet,  -- delivery tags

    Channel -> TVar IntSet
nackedSet :: TVar IntSet.IntSet, -- accumulate here.


    Channel -> Lock
chanActive :: Lock, -- used for flow-control. if lock is closed, no content methods will be sent

    Channel -> MVar (Maybe (CloseType, String))
chanClosed :: MVar (Maybe (CloseType, String)),
    Channel
-> MVar (Map Text ((Message, Envelope) -> IO (), Text -> IO ()))
consumers :: MVar (M.Map Text ((Message, Envelope) -> IO (), -- who is consumer of a queue? (consumerTag => callback)

                                   ConsumerTag -> IO ())),    -- cancellation notification callback

    Channel -> MVar [(Message, PublishError) -> IO ()]
returnListeners :: MVar [(Message, PublishError) -> IO ()],
    Channel -> MVar [(Timestamp, Bool, AckType) -> IO ()]
confirmListeners :: MVar [(Word64, Bool, AckType) -> IO ()],
    Channel -> MVar [SomeException -> IO ()]
chanExceptionHandlers :: MVar [CE.SomeException -> IO ()]
}

-- | Thrown in the channel thread when the connection gets closed.

-- When handling exceptions in a subscription callback, make sure to re-throw this so the channel thread can be stopped.

data ChanThreadKilledException = ChanThreadKilledException { ChanThreadKilledException -> SomeException
cause :: CE.SomeException }
  deriving (Int -> ChanThreadKilledException -> ShowS
[ChanThreadKilledException] -> ShowS
ChanThreadKilledException -> String
(Int -> ChanThreadKilledException -> ShowS)
-> (ChanThreadKilledException -> String)
-> ([ChanThreadKilledException] -> ShowS)
-> Show ChanThreadKilledException
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [ChanThreadKilledException] -> ShowS
$cshowList :: [ChanThreadKilledException] -> ShowS
show :: ChanThreadKilledException -> String
$cshow :: ChanThreadKilledException -> String
showsPrec :: Int -> ChanThreadKilledException -> ShowS
$cshowsPrec :: Int -> ChanThreadKilledException -> ShowS
Show)

instance CE.Exception ChanThreadKilledException

-- | If the given exception is an instance of ChanThreadKilledException, this method returns

--    the inner exception. Otherwise the exception is returned unchanged.

unwrapChanThreadKilledException :: CE.SomeException -> CE.SomeException
unwrapChanThreadKilledException :: SomeException -> SomeException
unwrapChanThreadKilledException SomeException
e = SomeException
-> (ChanThreadKilledException -> SomeException)
-> Maybe ChanThreadKilledException
-> SomeException
forall b a. b -> (a -> b) -> Maybe a -> b
maybe SomeException
e ChanThreadKilledException -> SomeException
cause (Maybe ChanThreadKilledException -> SomeException)
-> Maybe ChanThreadKilledException -> SomeException
forall a b. (a -> b) -> a -> b
$ SomeException -> Maybe ChanThreadKilledException
forall e. Exception e => SomeException -> Maybe e
CE.fromException SomeException
e

msgFromContentHeaderProperties :: ContentHeaderProperties -> BL.ByteString -> Message
msgFromContentHeaderProperties :: ContentHeaderProperties -> ByteString -> Message
msgFromContentHeaderProperties (CHBasic Maybe ShortString
content_type Maybe ShortString
content_encoding Maybe FieldTable
headers Maybe Octet
delivery_mode Maybe Octet
priority Maybe ShortString
correlation_id Maybe ShortString
reply_to Maybe ShortString
expiration Maybe ShortString
message_id Maybe Timestamp
timestamp Maybe ShortString
message_type Maybe ShortString
user_id Maybe ShortString
application_id Maybe ShortString
cluster_id) ByteString
body =
    let msgId :: Maybe Text
msgId = Maybe ShortString -> Maybe Text
fromShortString Maybe ShortString
message_id
        contentType :: Maybe Text
contentType = Maybe ShortString -> Maybe Text
fromShortString Maybe ShortString
content_type
        contentEncoding :: Maybe Text
contentEncoding = Maybe ShortString -> Maybe Text
fromShortString Maybe ShortString
content_encoding
        replyTo :: Maybe Text
replyTo = Maybe ShortString -> Maybe Text
fromShortString Maybe ShortString
reply_to
        correlationID :: Maybe Text
correlationID = Maybe ShortString -> Maybe Text
fromShortString Maybe ShortString
correlation_id
        messageType :: Maybe Text
messageType = Maybe ShortString -> Maybe Text
fromShortString Maybe ShortString
message_type
        userId :: Maybe Text
userId = Maybe ShortString -> Maybe Text
fromShortString Maybe ShortString
user_id
        applicationId :: Maybe Text
applicationId = Maybe ShortString -> Maybe Text
fromShortString Maybe ShortString
application_id
        clusterId :: Maybe Text
clusterId = Maybe ShortString -> Maybe Text
fromShortString Maybe ShortString
cluster_id
    in ByteString
-> Maybe DeliveryMode
-> Maybe Timestamp
-> Maybe Text
-> Maybe Text
-> Maybe Text
-> Maybe Text
-> Maybe Text
-> Maybe Text
-> Maybe Text
-> Maybe Text
-> Maybe Octet
-> Maybe Text
-> Maybe Text
-> Maybe FieldTable
-> Message
Message ByteString
body ((Octet -> DeliveryMode) -> Maybe Octet -> Maybe DeliveryMode
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Octet -> DeliveryMode
intToDeliveryMode Maybe Octet
delivery_mode) Maybe Timestamp
timestamp Maybe Text
msgId Maybe Text
messageType Maybe Text
userId Maybe Text
applicationId Maybe Text
clusterId Maybe Text
contentType Maybe Text
contentEncoding Maybe Text
replyTo Maybe Octet
priority Maybe Text
correlationID (Maybe ShortString -> Maybe Text
fromShortString Maybe ShortString
expiration) Maybe FieldTable
headers
  where
    fromShortString :: Maybe ShortString -> Maybe Text
fromShortString (Just (ShortString Text
s)) = Text -> Maybe Text
forall a. a -> Maybe a
Just Text
s
    fromShortString Maybe ShortString
_ = Maybe Text
forall a. Maybe a
Nothing
msgFromContentHeaderProperties ContentHeaderProperties
c ByteString
_ = String -> Message
forall a. HasCallStack => String -> a
error (String
"Unknown content header properties: " String -> ShowS
forall a. [a] -> [a] -> [a]
++ ContentHeaderProperties -> String
forall a. Show a => a -> String
show ContentHeaderProperties
c)

-- | The thread that is run for every channel

channelReceiver :: Channel -> IO ()
channelReceiver :: Channel -> IO ()
channelReceiver Channel
chan = do
    --read incoming frames; they are put there by a Connection thread

    Assembly
p <- Chan FramePayload -> IO Assembly
readAssembly (Chan FramePayload -> IO Assembly)
-> Chan FramePayload -> IO Assembly
forall a b. (a -> b) -> a -> b
$ Channel -> Chan FramePayload
inQueue Channel
chan
    if Assembly -> Bool
isResponse Assembly
p
        then do
            IO ()
action <- MVar (Seq (MVar Assembly))
-> (Seq (MVar Assembly) -> IO (Seq (MVar Assembly), IO ()))
-> IO (IO ())
forall a b. MVar a -> (a -> IO (a, b)) -> IO b
modifyMVar (Channel -> MVar (Seq (MVar Assembly))
outstandingResponses Channel
chan) ((Seq (MVar Assembly) -> IO (Seq (MVar Assembly), IO ()))
 -> IO (IO ()))
-> (Seq (MVar Assembly) -> IO (Seq (MVar Assembly), IO ()))
-> IO (IO ())
forall a b. (a -> b) -> a -> b
$ \Seq (MVar Assembly)
val -> do
                        case Seq (MVar Assembly) -> ViewL (MVar Assembly)
forall a. Seq a -> ViewL a
Seq.viewl Seq (MVar Assembly)
val of
                            MVar Assembly
x Seq.:< Seq (MVar Assembly)
rest -> do
                                (Seq (MVar Assembly), IO ()) -> IO (Seq (MVar Assembly), IO ())
forall (m :: * -> *) a. Monad m => a -> m a
return (Seq (MVar Assembly)
rest, MVar Assembly -> Assembly -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar Assembly
x Assembly
p)
                            ViewL (MVar Assembly)
Seq.EmptyL -> do
                                (Seq (MVar Assembly), IO ()) -> IO (Seq (MVar Assembly), IO ())
forall (m :: * -> *) a. Monad m => a -> m a
return (Seq (MVar Assembly)
val, IOException -> IO ()
forall e a. Exception e => e -> IO a
CE.throwIO (IOException -> IO ()) -> IOException -> IO ()
forall a b. (a -> b) -> a -> b
$ String -> IOException
userError String
"got response, but have no corresponding request")
            IO ()
action

        --handle asynchronous assemblies

        else Assembly -> IO ()
handleAsync Assembly
p
    Channel -> IO ()
channelReceiver Channel
chan
  where
    isResponse :: Assembly -> Bool
    isResponse :: Assembly -> Bool
isResponse (ContentMethod Basic_deliver{} ContentHeaderProperties
_ ByteString
_) = Bool
False
    isResponse (ContentMethod Basic_return{} ContentHeaderProperties
_ ByteString
_) = Bool
False
    isResponse (SimpleMethod (Channel_flow Bool
_)) = Bool
False
    isResponse (SimpleMethod Channel_close{}) = Bool
False
    isResponse (SimpleMethod (Basic_ack Timestamp
_ Bool
_)) = Bool
False
    isResponse (SimpleMethod Basic_nack{}) = Bool
False
    isResponse (SimpleMethod (Basic_cancel ShortString
_ Bool
_)) = Bool
False
    isResponse Assembly
_ = Bool
True

    --Basic.Deliver: forward msg to registered consumer

    handleAsync :: Assembly -> IO ()
handleAsync (ContentMethod (Basic_deliver (ShortString Text
consumerTag) Timestamp
deliveryTag Bool
redelivered (ShortString Text
exchange)
                                                (ShortString Text
routingKey))
                                ContentHeaderProperties
properties ByteString
body) =
        MVar (Map Text ((Message, Envelope) -> IO (), Text -> IO ()))
-> (Map Text ((Message, Envelope) -> IO (), Text -> IO ())
    -> IO ())
-> IO ()
forall a b. MVar a -> (a -> IO b) -> IO b
withMVar (Channel
-> MVar (Map Text ((Message, Envelope) -> IO (), Text -> IO ()))
consumers Channel
chan) (\Map Text ((Message, Envelope) -> IO (), Text -> IO ())
s -> do
            case Text
-> Map Text ((Message, Envelope) -> IO (), Text -> IO ())
-> Maybe ((Message, Envelope) -> IO (), Text -> IO ())
forall k a. Ord k => k -> Map k a -> Maybe a
M.lookup Text
consumerTag Map Text ((Message, Envelope) -> IO (), Text -> IO ())
s of
                Just ((Message, Envelope) -> IO ()
subscriber, Text -> IO ()
_) -> do
                    let msg :: Message
msg = ContentHeaderProperties -> ByteString -> Message
msgFromContentHeaderProperties ContentHeaderProperties
properties ByteString
body
                    let env :: Envelope
env = Envelope :: Timestamp -> Bool -> Text -> Text -> Channel -> Envelope
Envelope {envDeliveryTag :: Timestamp
envDeliveryTag = Timestamp
deliveryTag, envRedelivered :: Bool
envRedelivered = Bool
redelivered,
                                    envExchangeName :: Text
envExchangeName = Text
exchange, envRoutingKey :: Text
envRoutingKey = Text
routingKey, envChannel :: Channel
envChannel = Channel
chan}

                    IO () -> [Handler ()] -> IO ()
forall a. IO a -> [Handler a] -> IO a
CE.catches ((Message, Envelope) -> IO ()
subscriber (Message
msg, Envelope
env)) [
                        (ChanThreadKilledException -> IO ()) -> Handler ()
forall a e. Exception e => (e -> IO a) -> Handler a
CE.Handler (\(ChanThreadKilledException
e::ChanThreadKilledException) -> SomeException -> IO ()
forall e a. Exception e => e -> IO a
CE.throwIO (SomeException -> IO ()) -> SomeException -> IO ()
forall a b. (a -> b) -> a -> b
$ ChanThreadKilledException -> SomeException
cause ChanThreadKilledException
e),
                        (SomeException -> IO ()) -> Handler ()
forall a e. Exception e => (e -> IO a) -> Handler a
CE.Handler (\(SomeException
e::CE.SomeException) -> Handle -> String -> IO ()
hPutStrLn Handle
stderr (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String
"AMQP callback threw exception: " String -> ShowS
forall a. [a] -> [a] -> [a]
++ SomeException -> String
forall a. Show a => a -> String
show SomeException
e)
                      ]
                Maybe ((Message, Envelope) -> IO (), Text -> IO ())
Nothing ->
                    -- got a message, but have no registered subscriber; so drop it

                    () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
            )
    handleAsync (SimpleMethod (Channel_close ShortInt
_ (ShortString Text
errorMsg) ShortInt
_ ShortInt
_)) = do
        IO () -> (IOException -> IO ()) -> IO ()
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
CE.catch (Channel -> Assembly -> IO ()
writeAssembly' Channel
chan (MethodPayload -> Assembly
SimpleMethod MethodPayload
Channel_close_ok))
            (\ (IOException
_ :: CE.IOException) ->
              -- do nothing if connection is already closed

              () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
            )
        Channel -> CloseType -> Text -> IO ()
closeChannel' Channel
chan CloseType
Abnormal Text
errorMsg
        IO ThreadId
myThreadId IO ThreadId -> (ThreadId -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (ThreadId -> AMQPException -> IO ())
-> AMQPException -> ThreadId -> IO ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip ThreadId -> AMQPException -> IO ()
forall e. Exception e => ThreadId -> e -> IO ()
CE.throwTo (CloseType -> String -> AMQPException
ChannelClosedException CloseType
Abnormal (String -> AMQPException)
-> (Text -> String) -> Text -> AMQPException
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> String
T.unpack (Text -> AMQPException) -> Text -> AMQPException
forall a b. (a -> b) -> a -> b
$ Text
errorMsg)
    handleAsync (SimpleMethod (Channel_flow Bool
active)) = do
        if Bool
active
            then Lock -> IO ()
openLock (Lock -> IO ()) -> Lock -> IO ()
forall a b. (a -> b) -> a -> b
$ Channel -> Lock
chanActive Channel
chan
            else Lock -> IO ()
closeLock (Lock -> IO ()) -> Lock -> IO ()
forall a b. (a -> b) -> a -> b
$ Channel -> Lock
chanActive Channel
chan
        -- in theory we should respond with flow_ok but rabbitMQ 1.7 ignores that, so it doesn't matter

        () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
    --Basic.return

    handleAsync (ContentMethod basicReturn :: MethodPayload
basicReturn@Basic_return{} ContentHeaderProperties
props ByteString
body) = do
        let msg :: Message
msg      = ContentHeaderProperties -> ByteString -> Message
msgFromContentHeaderProperties ContentHeaderProperties
props ByteString
body
            pubError :: PublishError
pubError = MethodPayload -> PublishError
basicReturnToPublishError MethodPayload
basicReturn
        MVar [(Message, PublishError) -> IO ()]
-> ([(Message, PublishError) -> IO ()] -> IO ()) -> IO ()
forall a b. MVar a -> (a -> IO b) -> IO b
withMVar (Channel -> MVar [(Message, PublishError) -> IO ()]
returnListeners Channel
chan) (([(Message, PublishError) -> IO ()] -> IO ()) -> IO ())
-> ([(Message, PublishError) -> IO ()] -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \[(Message, PublishError) -> IO ()]
listeners ->
            [(Message, PublishError) -> IO ()]
-> (((Message, PublishError) -> IO ()) -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [(Message, PublishError) -> IO ()]
listeners ((((Message, PublishError) -> IO ()) -> IO ()) -> IO ())
-> (((Message, PublishError) -> IO ()) -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \(Message, PublishError) -> IO ()
l -> IO () -> (SomeException -> IO ()) -> IO ()
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
CE.catch ((Message, PublishError) -> IO ()
l (Message
msg, PublishError
pubError)) ((SomeException -> IO ()) -> IO ())
-> (SomeException -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \(SomeException
ex :: CE.SomeException) ->
                Handle -> String -> IO ()
hPutStrLn Handle
stderr (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String
"return listener on channel ["String -> ShowS
forall a. [a] -> [a] -> [a]
++ShortInt -> String
forall a. Show a => a -> String
show (Channel -> ShortInt
channelID Channel
chan)String -> ShowS
forall a. [a] -> [a] -> [a]
++String
"] handling error ["String -> ShowS
forall a. [a] -> [a] -> [a]
++PublishError -> String
forall a. Show a => a -> String
show PublishError
pubErrorString -> ShowS
forall a. [a] -> [a] -> [a]
++String
"] threw exception: "String -> ShowS
forall a. [a] -> [a] -> [a]
++SomeException -> String
forall a. Show a => a -> String
show SomeException
ex
    handleAsync (SimpleMethod (Basic_ack Timestamp
deliveryTag Bool
multiple)) = Timestamp -> Bool -> AckType -> IO ()
handleConfirm Timestamp
deliveryTag Bool
multiple AckType
BasicAck
    handleAsync (SimpleMethod (Basic_nack Timestamp
deliveryTag Bool
multiple Bool
_)) = Timestamp -> Bool -> AckType -> IO ()
handleConfirm Timestamp
deliveryTag Bool
multiple AckType
BasicNack
    handleAsync (SimpleMethod (Basic_cancel ShortString
consumerTag Bool
_)) = ShortString -> IO ()
handleCancel ShortString
consumerTag
    handleAsync Assembly
m = String -> IO ()
forall a. HasCallStack => String -> a
error (String
"Unknown method: " String -> ShowS
forall a. [a] -> [a] -> [a]
++ Assembly -> String
forall a. Show a => a -> String
show Assembly
m)

    handleConfirm :: Timestamp -> Bool -> AckType -> IO ()
handleConfirm Timestamp
deliveryTag Bool
multiple AckType
k = do
        MVar [(Timestamp, Bool, AckType) -> IO ()]
-> ([(Timestamp, Bool, AckType) -> IO ()] -> IO ()) -> IO ()
forall a b. MVar a -> (a -> IO b) -> IO b
withMVar (Channel -> MVar [(Timestamp, Bool, AckType) -> IO ()]
confirmListeners Channel
chan) (([(Timestamp, Bool, AckType) -> IO ()] -> IO ()) -> IO ())
-> ([(Timestamp, Bool, AckType) -> IO ()] -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \[(Timestamp, Bool, AckType) -> IO ()]
listeners ->
            [(Timestamp, Bool, AckType) -> IO ()]
-> (((Timestamp, Bool, AckType) -> IO ()) -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [(Timestamp, Bool, AckType) -> IO ()]
listeners ((((Timestamp, Bool, AckType) -> IO ()) -> IO ()) -> IO ())
-> (((Timestamp, Bool, AckType) -> IO ()) -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \(Timestamp, Bool, AckType) -> IO ()
l -> IO () -> (SomeException -> IO ()) -> IO ()
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
CE.catch ((Timestamp, Bool, AckType) -> IO ()
l (Timestamp
deliveryTag, Bool
multiple, AckType
k)) ((SomeException -> IO ()) -> IO ())
-> (SomeException -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \(SomeException
ex :: CE.SomeException) ->
                Handle -> String -> IO ()
hPutStrLn Handle
stderr (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String
"confirm listener on channel ["String -> ShowS
forall a. [a] -> [a] -> [a]
++ShortInt -> String
forall a. Show a => a -> String
show (Channel -> ShortInt
channelID Channel
chan)String -> ShowS
forall a. [a] -> [a] -> [a]
++String
"] handling method "String -> ShowS
forall a. [a] -> [a] -> [a]
++AckType -> String
forall a. Show a => a -> String
show AckType
kString -> ShowS
forall a. [a] -> [a] -> [a]
++String
" threw exception: "String -> ShowS
forall a. [a] -> [a] -> [a]
++SomeException -> String
forall a. Show a => a -> String
show SomeException
ex

        let seqNum :: Int
seqNum = Timestamp -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Timestamp
deliveryTag
        let targetSet :: TVar IntSet
targetSet = case AckType
k of
              AckType
BasicAck  -> Channel -> TVar IntSet
ackedSet Channel
chan
              AckType
BasicNack -> Channel -> TVar IntSet
nackedSet Channel
chan
        STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
            IntSet
unconfSet <- TVar IntSet -> STM IntSet
forall a. TVar a -> STM a
readTVar (Channel -> TVar IntSet
unconfirmedSet Channel
chan)
            let (IntSet -> IntSet
merge, IntSet
pending) = if Bool
multiple
                                       then (IntSet -> IntSet -> IntSet
IntSet.union IntSet
confs, IntSet
pending')
                                       else (Int -> IntSet -> IntSet
IntSet.insert Int
seqNum, Int -> IntSet -> IntSet
IntSet.delete Int
seqNum IntSet
unconfSet)
                                    where
                                      confs :: IntSet
confs = (IntSet, IntSet) -> IntSet
forall a b. (a, b) -> a
fst (IntSet, IntSet)
parts
                                      pending' :: IntSet
pending' = (IntSet, IntSet) -> IntSet
forall a b. (a, b) -> b
snd (IntSet, IntSet)
parts
                                      parts :: (IntSet, IntSet)
parts = (Int -> Bool) -> IntSet -> (IntSet, IntSet)
IntSet.partition (\Int
n -> Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
seqNum) IntSet
unconfSet
            TVar IntSet -> (IntSet -> IntSet) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar IntSet
targetSet (\IntSet
ts -> IntSet -> IntSet
merge IntSet
ts)
            TVar IntSet -> IntSet -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar (Channel -> TVar IntSet
unconfirmedSet Channel
chan) IntSet
pending

    handleCancel :: ShortString -> IO ()
handleCancel (ShortString Text
consumerTag) =
        MVar (Map Text ((Message, Envelope) -> IO (), Text -> IO ()))
-> (Map Text ((Message, Envelope) -> IO (), Text -> IO ())
    -> IO ())
-> IO ()
forall a b. MVar a -> (a -> IO b) -> IO b
withMVar (Channel
-> MVar (Map Text ((Message, Envelope) -> IO (), Text -> IO ()))
consumers Channel
chan) (\Map Text ((Message, Envelope) -> IO (), Text -> IO ())
s -> do
            case Text
-> Map Text ((Message, Envelope) -> IO (), Text -> IO ())
-> Maybe ((Message, Envelope) -> IO (), Text -> IO ())
forall k a. Ord k => k -> Map k a -> Maybe a
M.lookup Text
consumerTag Map Text ((Message, Envelope) -> IO (), Text -> IO ())
s of
                Just ((Message, Envelope) -> IO ()
_, Text -> IO ()
cancelCB) ->
                    IO () -> (SomeException -> IO ()) -> IO ()
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
CE.catch (Text -> IO ()
cancelCB Text
consumerTag) ((SomeException -> IO ()) -> IO ())
-> (SomeException -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \(SomeException
ex :: CE.SomeException) ->
                        Handle -> String -> IO ()
hPutStrLn Handle
stderr (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String
"consumer cancellation listener "String -> ShowS
forall a. [a] -> [a] -> [a]
++Text -> String
forall a. Show a => a -> String
show Text
consumerTagString -> ShowS
forall a. [a] -> [a] -> [a]
++String
" on channel ["String -> ShowS
forall a. [a] -> [a] -> [a]
++ShortInt -> String
forall a. Show a => a -> String
show (Channel -> ShortInt
channelID Channel
chan)String -> ShowS
forall a. [a] -> [a] -> [a]
++String
"] threw exception: "String -> ShowS
forall a. [a] -> [a] -> [a]
++ SomeException -> String
forall a. Show a => a -> String
show SomeException
ex
                Maybe ((Message, Envelope) -> IO (), Text -> IO ())
Nothing ->
                    -- got a cancellation notification, but have no registered subscriber; so drop it

                    () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
            )

    basicReturnToPublishError :: MethodPayload -> PublishError
basicReturnToPublishError (Basic_return ShortInt
code (ShortString Text
errText) (ShortString Text
exchange) (ShortString Text
routingKey)) =
        let replyError :: ReturnReplyCode
replyError = case ShortInt
code of
                ShortInt
312 -> Text -> ReturnReplyCode
Unroutable Text
errText
                ShortInt
313 -> Text -> ReturnReplyCode
NoConsumers Text
errText
                ShortInt
404 -> Text -> ReturnReplyCode
NotFound Text
errText
                ShortInt
num -> String -> ReturnReplyCode
forall a. HasCallStack => String -> a
error (String -> ReturnReplyCode) -> String -> ReturnReplyCode
forall a b. (a -> b) -> a -> b
$ String
"unexpected return error code: " String -> ShowS
forall a. [a] -> [a] -> [a]
++ShortInt -> String
forall a. Show a => a -> String
show ShortInt
num
            pubError :: PublishError
pubError = ReturnReplyCode -> Maybe Text -> Text -> PublishError
PublishError ReturnReplyCode
replyError (Text -> Maybe Text
forall a. a -> Maybe a
Just Text
exchange) Text
routingKey
        in PublishError
pubError
    basicReturnToPublishError MethodPayload
x = String -> PublishError
forall a. HasCallStack => String -> a
error (String -> PublishError) -> String -> PublishError
forall a b. (a -> b) -> a -> b
$ String
"basicReturnToPublishError fail: "String -> ShowS
forall a. [a] -> [a] -> [a]
++MethodPayload -> String
forall a. Show a => a -> String
show MethodPayload
x

-- | Registers a callback function that is called whenever a message is returned from the broker ('basic.return').

addReturnListener :: Channel -> ((Message, PublishError) -> IO ()) -> IO ()
addReturnListener :: Channel -> ((Message, PublishError) -> IO ()) -> IO ()
addReturnListener Channel
chan (Message, PublishError) -> IO ()
listener = do
    MVar [(Message, PublishError) -> IO ()]
-> ([(Message, PublishError) -> IO ()]
    -> IO [(Message, PublishError) -> IO ()])
-> IO ()
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ (Channel -> MVar [(Message, PublishError) -> IO ()]
returnListeners Channel
chan) (([(Message, PublishError) -> IO ()]
  -> IO [(Message, PublishError) -> IO ()])
 -> IO ())
-> ([(Message, PublishError) -> IO ()]
    -> IO [(Message, PublishError) -> IO ()])
-> IO ()
forall a b. (a -> b) -> a -> b
$ \[(Message, PublishError) -> IO ()]
listeners -> [(Message, PublishError) -> IO ()]
-> IO [(Message, PublishError) -> IO ()]
forall (m :: * -> *) a. Monad m => a -> m a
return ([(Message, PublishError) -> IO ()]
 -> IO [(Message, PublishError) -> IO ()])
-> [(Message, PublishError) -> IO ()]
-> IO [(Message, PublishError) -> IO ()]
forall a b. (a -> b) -> a -> b
$ (Message, PublishError) -> IO ()
listener((Message, PublishError) -> IO ())
-> [(Message, PublishError) -> IO ()]
-> [(Message, PublishError) -> IO ()]
forall a. a -> [a] -> [a]
:[(Message, PublishError) -> IO ()]
listeners

-- | Registers a callback function that is called whenever a channel is closed by an exception.

--

-- This method will always be called when a channel is closed, whether through normal means

--  ('closeChannel', 'closeConnection') or by some AMQP exception.

--

-- You can use 'isNormalChannelClose' to figure out if the exception was normal or due to an

--  AMQP exception.

addChannelExceptionHandler :: Channel -> (CE.SomeException -> IO ()) -> IO ()
addChannelExceptionHandler :: Channel -> (SomeException -> IO ()) -> IO ()
addChannelExceptionHandler Channel
chan SomeException -> IO ()
handler = do
    MVar [SomeException -> IO ()]
-> ([SomeException -> IO ()] -> IO [SomeException -> IO ()])
-> IO ()
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ (Channel -> MVar [SomeException -> IO ()]
chanExceptionHandlers Channel
chan) (([SomeException -> IO ()] -> IO [SomeException -> IO ()])
 -> IO ())
-> ([SomeException -> IO ()] -> IO [SomeException -> IO ()])
-> IO ()
forall a b. (a -> b) -> a -> b
$ \[SomeException -> IO ()]
handlers -> [SomeException -> IO ()] -> IO [SomeException -> IO ()]
forall (m :: * -> *) a. Monad m => a -> m a
return ([SomeException -> IO ()] -> IO [SomeException -> IO ()])
-> [SomeException -> IO ()] -> IO [SomeException -> IO ()]
forall a b. (a -> b) -> a -> b
$ SomeException -> IO ()
handler(SomeException -> IO ())
-> [SomeException -> IO ()] -> [SomeException -> IO ()]
forall a. a -> [a] -> [a]
:[SomeException -> IO ()]
handlers

-- | This can be used with the exception passed to 'addChannelExceptionHandler'.

--

-- Returns True if the argument is a 'ConnectionClosedException' or 'ChannelClosedException' that happened

--  normally (i.e. by the user calling 'closeChannel' or 'closeConnection') and not due to some

--  AMQP exception.

isNormalChannelClose :: CE.SomeException -> Bool
isNormalChannelClose :: SomeException -> Bool
isNormalChannelClose SomeException
e = case SomeException -> Maybe AMQPException
forall e. Exception e => SomeException -> Maybe e
CE.fromException SomeException
e :: Maybe AMQPException of
    Just (ChannelClosedException CloseType
Normal String
_) -> Bool
True
    Just (ConnectionClosedException CloseType
Normal String
_) -> Bool
True
    Maybe AMQPException
_ -> Bool
False

-- closes the channel internally; but doesn't tell the server

closeChannel' :: Channel -> CloseType -> Text -> IO ()
closeChannel' :: Channel -> CloseType -> Text -> IO ()
closeChannel' Channel
c CloseType
closeType Text
reason = do
    MVar (Maybe (CloseType, String))
-> (Maybe (CloseType, String) -> IO (Maybe (CloseType, String)))
-> IO ()
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ (Channel -> MVar (Maybe (CloseType, String))
chanClosed Channel
c) ((Maybe (CloseType, String) -> IO (Maybe (CloseType, String)))
 -> IO ())
-> (Maybe (CloseType, String) -> IO (Maybe (CloseType, String)))
-> IO ()
forall a b. (a -> b) -> a -> b
$ \Maybe (CloseType, String)
x -> do
        if Maybe (CloseType, String) -> Bool
forall a. Maybe a -> Bool
isNothing Maybe (CloseType, String)
x
            then do
                MVar (IntMap (Channel, ThreadId))
-> (IntMap (Channel, ThreadId) -> IO (IntMap (Channel, ThreadId)))
-> IO ()
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ (Connection -> MVar (IntMap (Channel, ThreadId))
connChannels (Connection -> MVar (IntMap (Channel, ThreadId)))
-> Connection -> MVar (IntMap (Channel, ThreadId))
forall a b. (a -> b) -> a -> b
$ Channel -> Connection
connection Channel
c) ((IntMap (Channel, ThreadId) -> IO (IntMap (Channel, ThreadId)))
 -> IO ())
-> (IntMap (Channel, ThreadId) -> IO (IntMap (Channel, ThreadId)))
-> IO ()
forall a b. (a -> b) -> a -> b
$ \IntMap (Channel, ThreadId)
old -> do
                    Bool
ret <- ChannelAllocator -> Int -> IO Bool
freeChannel (Connection -> ChannelAllocator
connChanAllocator (Connection -> ChannelAllocator) -> Connection -> ChannelAllocator
forall a b. (a -> b) -> a -> b
$ Channel -> Connection
connection Channel
c) (Int -> IO Bool) -> Int -> IO Bool
forall a b. (a -> b) -> a -> b
$ ShortInt -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (ShortInt -> Int) -> ShortInt -> Int
forall a b. (a -> b) -> a -> b
$ Channel -> ShortInt
channelID Channel
c
                    Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
ret (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Handle -> String -> IO ()
hPutStrLn Handle
stderr String
"closeChannel error: channel already freed"
                    IntMap (Channel, ThreadId) -> IO (IntMap (Channel, ThreadId))
forall (m :: * -> *) a. Monad m => a -> m a
return (IntMap (Channel, ThreadId) -> IO (IntMap (Channel, ThreadId)))
-> IntMap (Channel, ThreadId) -> IO (IntMap (Channel, ThreadId))
forall a b. (a -> b) -> a -> b
$ Int -> IntMap (Channel, ThreadId) -> IntMap (Channel, ThreadId)
forall a. Int -> IntMap a -> IntMap a
IM.delete (ShortInt -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (ShortInt -> Int) -> ShortInt -> Int
forall a b. (a -> b) -> a -> b
$ Channel -> ShortInt
channelID Channel
c) IntMap (Channel, ThreadId)
old

                IO Bool -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Bool -> IO ()) -> IO Bool -> IO ()
forall a b. (a -> b) -> a -> b
$ Lock -> IO Bool
killLock (Lock -> IO Bool) -> Lock -> IO Bool
forall a b. (a -> b) -> a -> b
$ Channel -> Lock
chanActive Channel
c
                MVar (Seq (MVar Assembly)) -> IO ()
forall a. MVar (Seq (MVar a)) -> IO ()
killOutstandingResponses (MVar (Seq (MVar Assembly)) -> IO ())
-> MVar (Seq (MVar Assembly)) -> IO ()
forall a b. (a -> b) -> a -> b
$ Channel -> MVar (Seq (MVar Assembly))
outstandingResponses Channel
c
                Maybe (CloseType, String) -> IO (Maybe (CloseType, String))
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe (CloseType, String) -> IO (Maybe (CloseType, String)))
-> Maybe (CloseType, String) -> IO (Maybe (CloseType, String))
forall a b. (a -> b) -> a -> b
$ (CloseType, String) -> Maybe (CloseType, String)
forall a. a -> Maybe a
Just (CloseType
closeType, Text -> String
T.unpack Text
reason)
            else Maybe (CloseType, String) -> IO (Maybe (CloseType, String))
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (CloseType, String)
x
  where
    killOutstandingResponses :: MVar (Seq.Seq (MVar a)) -> IO ()
    killOutstandingResponses :: MVar (Seq (MVar a)) -> IO ()
killOutstandingResponses MVar (Seq (MVar a))
outResps = do
        MVar (Seq (MVar a)) -> (Seq (MVar a) -> IO (Seq (MVar a))) -> IO ()
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ MVar (Seq (MVar a))
outResps ((Seq (MVar a) -> IO (Seq (MVar a))) -> IO ())
-> (Seq (MVar a) -> IO (Seq (MVar a))) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Seq (MVar a)
val -> do
            (MVar a -> IO Bool) -> Seq (MVar a) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
F.mapM_ (\MVar a
x -> MVar a -> a -> IO Bool
forall a. MVar a -> a -> IO Bool
tryPutMVar MVar a
x (a -> IO Bool) -> a -> IO Bool
forall a b. (a -> b) -> a -> b
$ String -> a
forall a. HasCallStack => String -> a
error String
"channel closed") Seq (MVar a)
val
            -- Intentionally put 'undefined' into the MVar, so that reading from it will throw.

            -- This MVar will be read from the 'request' method, where we appropriately catch ErrorCall exceptions.

            Seq (MVar a) -> IO (Seq (MVar a))
forall (m :: * -> *) a. Monad m => a -> m a
return Seq (MVar a)
forall a. HasCallStack => a
undefined

-- | opens a new channel on the connection

--

-- By default, if a channel is closed by an AMQP exception, this exception will be printed to stderr. You can prevent this behaviour by setting a custom exception handler (using 'addChannelExceptionHandler').

--

-- Example of adding an exception-handler:

--

-- > chan <- openChannel conn

-- > addChannelExceptionHandler chan $ \e -> do

-- >     unless (isNormalChannelClose e) $ do

-- >         putStrLn $ "channel exception: "++show e

openChannel :: Connection -> IO Channel
openChannel :: Connection -> IO Channel
openChannel Connection
c = do
    Chan FramePayload
newInQueue <- IO (Chan FramePayload)
forall a. IO (Chan a)
newChan
    MVar (Seq (MVar Assembly))
outRes <- Seq (MVar Assembly) -> IO (MVar (Seq (MVar Assembly)))
forall a. a -> IO (MVar a)
newMVar Seq (MVar Assembly)
forall a. Seq a
Seq.empty
    MVar Int
lastConsTag <- Int -> IO (MVar Int)
forall a. a -> IO (MVar a)
newMVar Int
0
    Lock
ca <- IO Lock
newLock
    MVar (Maybe (CloseType, String))
closed <- Maybe (CloseType, String) -> IO (MVar (Maybe (CloseType, String)))
forall a. a -> IO (MVar a)
newMVar Maybe (CloseType, String)
forall a. Maybe a
Nothing
    MVar (Map Text ((Message, Envelope) -> IO (), Text -> IO ()))
conss <- Map Text ((Message, Envelope) -> IO (), Text -> IO ())
-> IO
     (MVar (Map Text ((Message, Envelope) -> IO (), Text -> IO ())))
forall a. a -> IO (MVar a)
newMVar Map Text ((Message, Envelope) -> IO (), Text -> IO ())
forall k a. Map k a
M.empty
    MVar [(Message, PublishError) -> IO ()]
retListeners <- [(Message, PublishError) -> IO ()]
-> IO (MVar [(Message, PublishError) -> IO ()])
forall a. a -> IO (MVar a)
newMVar []
    TVar IntSet
aSet <- IntSet -> IO (TVar IntSet)
forall a. a -> IO (TVar a)
newTVarIO IntSet
IntSet.empty
    TVar IntSet
nSet <- IntSet -> IO (TVar IntSet)
forall a. a -> IO (TVar a)
newTVarIO IntSet
IntSet.empty
    MVar Int
nxtSeq <- Int -> IO (MVar Int)
forall a. a -> IO (MVar a)
newMVar Int
0
    TVar IntSet
unconfSet <- IntSet -> IO (TVar IntSet)
forall a. a -> IO (TVar a)
newTVarIO IntSet
IntSet.empty
    MVar [(Timestamp, Bool, AckType) -> IO ()]
cnfListeners <- [(Timestamp, Bool, AckType) -> IO ()]
-> IO (MVar [(Timestamp, Bool, AckType) -> IO ()])
forall a. a -> IO (MVar a)
newMVar []
    MVar [SomeException -> IO ()]
handlers <- [SomeException -> IO ()] -> IO (MVar [SomeException -> IO ()])
forall a. a -> IO (MVar a)
newMVar []

    -- add new channel to connection's channel map

    Channel
newChannel <- MVar (IntMap (Channel, ThreadId))
-> (IntMap (Channel, ThreadId)
    -> IO (IntMap (Channel, ThreadId), Channel))
-> IO Channel
forall a b. MVar a -> (a -> IO (a, b)) -> IO b
modifyMVar (Connection -> MVar (IntMap (Channel, ThreadId))
connChannels Connection
c) ((IntMap (Channel, ThreadId)
  -> IO (IntMap (Channel, ThreadId), Channel))
 -> IO Channel)
-> (IntMap (Channel, ThreadId)
    -> IO (IntMap (Channel, ThreadId), Channel))
-> IO Channel
forall a b. (a -> b) -> a -> b
$ \IntMap (Channel, ThreadId)
mp -> do
        Int
newChannelID <- ChannelAllocator -> IO Int
allocateChannel (Connection -> ChannelAllocator
connChanAllocator Connection
c)
        let newChannel :: Channel
newChannel = Connection
-> Chan FramePayload
-> MVar (Seq (MVar Assembly))
-> ShortInt
-> MVar Int
-> MVar Int
-> TVar IntSet
-> TVar IntSet
-> TVar IntSet
-> Lock
-> MVar (Maybe (CloseType, String))
-> MVar (Map Text ((Message, Envelope) -> IO (), Text -> IO ()))
-> MVar [(Message, PublishError) -> IO ()]
-> MVar [(Timestamp, Bool, AckType) -> IO ()]
-> MVar [SomeException -> IO ()]
-> Channel
Channel Connection
c Chan FramePayload
newInQueue MVar (Seq (MVar Assembly))
outRes (Int -> ShortInt
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
newChannelID) MVar Int
lastConsTag MVar Int
nxtSeq TVar IntSet
unconfSet TVar IntSet
aSet TVar IntSet
nSet Lock
ca MVar (Maybe (CloseType, String))
closed MVar (Map Text ((Message, Envelope) -> IO (), Text -> IO ()))
conss MVar [(Message, PublishError) -> IO ()]
retListeners MVar [(Timestamp, Bool, AckType) -> IO ()]
cnfListeners MVar [SomeException -> IO ()]
handlers
        ThreadId
thrID <- IO () -> (Either SomeException () -> IO ()) -> IO ThreadId
forall a. IO a -> (Either SomeException a -> IO ()) -> IO ThreadId
forkFinally' (Channel -> IO ()
channelReceiver Channel
newChannel) ((Either SomeException () -> IO ()) -> IO ThreadId)
-> (Either SomeException () -> IO ()) -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ \Either SomeException ()
res -> do
            Channel -> CloseType -> Text -> IO ()
closeChannel' Channel
newChannel CloseType
Normal Text
"closed"
            case Either SomeException ()
res of
                Right ()
_ -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
                Left SomeException
ex -> do
                   let unwrappedExc :: SomeException
unwrappedExc = SomeException -> SomeException
unwrapChanThreadKilledException SomeException
ex
                   [SomeException -> IO ()]
handlers' <- MVar [SomeException -> IO ()] -> IO [SomeException -> IO ()]
forall a. MVar a -> IO a
readMVar MVar [SomeException -> IO ()]
handlers

                   case ([SomeException -> IO ()] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [SomeException -> IO ()]
handlers', SomeException -> Maybe String
fromAbnormalChannelClose SomeException
unwrappedExc) of
                       (Bool
True, Just String
reason) -> Handle -> String -> IO ()
hPutStrLn Handle
stderr (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String
"unhandled AMQP channel exception (chanId="String -> ShowS
forall a. [a] -> [a] -> [a]
++Int -> String
forall a. Show a => a -> String
show Int
newChannelIDString -> ShowS
forall a. [a] -> [a] -> [a]
++String
"): "String -> ShowS
forall a. [a] -> [a] -> [a]
++String
reason
                       (Bool, Maybe String)
_ -> ((SomeException -> IO ()) -> IO ())
-> [SomeException -> IO ()] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ ((SomeException -> IO ()) -> SomeException -> IO ()
forall a b. (a -> b) -> a -> b
$ SomeException
unwrappedExc) [SomeException -> IO ()]
handlers'
        Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int -> IntMap (Channel, ThreadId) -> Bool
forall a. Int -> IntMap a -> Bool
IM.member Int
newChannelID IntMap (Channel, ThreadId)
mp) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ IOException -> IO ()
forall e a. Exception e => e -> IO a
CE.throwIO (IOException -> IO ()) -> IOException -> IO ()
forall a b. (a -> b) -> a -> b
$ String -> IOException
userError String
"openChannel fail: channel already open"
        (IntMap (Channel, ThreadId), Channel)
-> IO (IntMap (Channel, ThreadId), Channel)
forall (m :: * -> *) a. Monad m => a -> m a
return (Int
-> (Channel, ThreadId)
-> IntMap (Channel, ThreadId)
-> IntMap (Channel, ThreadId)
forall a. Int -> a -> IntMap a -> IntMap a
IM.insert Int
newChannelID (Channel
newChannel, ThreadId
thrID) IntMap (Channel, ThreadId)
mp, Channel
newChannel)

    SimpleMethod (Channel_open_ok LongString
_) <- Channel -> Assembly -> IO Assembly
request Channel
newChannel (Assembly -> IO Assembly) -> Assembly -> IO Assembly
forall a b. (a -> b) -> a -> b
$ MethodPayload -> Assembly
SimpleMethod (MethodPayload -> Assembly) -> MethodPayload -> Assembly
forall a b. (a -> b) -> a -> b
$ ShortString -> MethodPayload
Channel_open (Text -> ShortString
ShortString Text
"")
    Channel -> IO Channel
forall (m :: * -> *) a. Monad m => a -> m a
return Channel
newChannel

  where
    fromAbnormalChannelClose :: CE.SomeException -> Maybe String
    fromAbnormalChannelClose :: SomeException -> Maybe String
fromAbnormalChannelClose SomeException
exc =
        case SomeException -> Maybe AMQPException
forall e. Exception e => SomeException -> Maybe e
CE.fromException SomeException
exc :: Maybe AMQPException of
            Just (ConnectionClosedException CloseType
_ String
_) -> Maybe String
forall a. Maybe a
Nothing
            Just (ChannelClosedException CloseType
Normal String
_) -> Maybe String
forall a. Maybe a
Nothing
            Just (ChannelClosedException CloseType
Abnormal String
reason) -> String -> Maybe String
forall a. a -> Maybe a
Just String
reason
            Just (AllChannelsAllocatedException Int
_) -> String -> Maybe String
forall a. a -> Maybe a
Just String
"all channels allocated"
            Maybe AMQPException
Nothing -> String -> Maybe String
forall a. a -> Maybe a
Just (String -> Maybe String) -> String -> Maybe String
forall a b. (a -> b) -> a -> b
$ SomeException -> String
forall a. Show a => a -> String
show SomeException
exc

-- | closes a channel. It is typically not necessary to manually call this as closing a connection will implicitly close all channels.

closeChannel :: Channel -> IO ()
closeChannel :: Channel -> IO ()
closeChannel Channel
c = do
    SimpleMethod MethodPayload
Channel_close_ok <- Channel -> Assembly -> IO Assembly
request Channel
c (Assembly -> IO Assembly) -> Assembly -> IO Assembly
forall a b. (a -> b) -> a -> b
$ MethodPayload -> Assembly
SimpleMethod (MethodPayload -> Assembly) -> MethodPayload -> Assembly
forall a b. (a -> b) -> a -> b
$ ShortInt -> ShortString -> ShortInt -> ShortInt -> MethodPayload
Channel_close ShortInt
0 (Text -> ShortString
ShortString Text
"") ShortInt
0 ShortInt
0
    MVar (IntMap (Channel, ThreadId))
-> (IntMap (Channel, ThreadId) -> IO ()) -> IO ()
forall a b. MVar a -> (a -> IO b) -> IO b
withMVar (Connection -> MVar (IntMap (Channel, ThreadId))
connChannels (Connection -> MVar (IntMap (Channel, ThreadId)))
-> Connection -> MVar (IntMap (Channel, ThreadId))
forall a b. (a -> b) -> a -> b
$ Channel -> Connection
connection Channel
c) ((IntMap (Channel, ThreadId) -> IO ()) -> IO ())
-> (IntMap (Channel, ThreadId) -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \IntMap (Channel, ThreadId)
chans -> do
        case Int -> IntMap (Channel, ThreadId) -> Maybe (Channel, ThreadId)
forall a. Int -> IntMap a -> Maybe a
IM.lookup (ShortInt -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (ShortInt -> Int) -> ShortInt -> Int
forall a b. (a -> b) -> a -> b
$ Channel -> ShortInt
channelID Channel
c) IntMap (Channel, ThreadId)
chans of
            Just (Channel
_, ThreadId
thrID) -> ThreadId -> AMQPException -> IO ()
forall e. Exception e => ThreadId -> e -> IO ()
throwTo ThreadId
thrID (AMQPException -> IO ()) -> AMQPException -> IO ()
forall a b. (a -> b) -> a -> b
$ CloseType -> String -> AMQPException
ChannelClosedException CloseType
Normal String
"closeChannel was called"
            Maybe (Channel, ThreadId)
Nothing -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()

-- | writes multiple frames to the channel atomically

writeFrames :: Channel -> [FramePayload] -> IO ()
writeFrames :: Channel -> [FramePayload] -> IO ()
writeFrames Channel
chan [FramePayload]
payloads = do
    let conn :: Connection
conn = Channel -> Connection
connection Channel
chan
    MVar (IntMap (Channel, ThreadId))
-> (IntMap (Channel, ThreadId) -> IO ()) -> IO ()
forall a b. MVar a -> (a -> IO b) -> IO b
withMVar (Connection -> MVar (IntMap (Channel, ThreadId))
connChannels Connection
conn) ((IntMap (Channel, ThreadId) -> IO ()) -> IO ())
-> (IntMap (Channel, ThreadId) -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \IntMap (Channel, ThreadId)
chans ->
        if Int -> IntMap (Channel, ThreadId) -> Bool
forall a. Int -> IntMap a -> Bool
IM.member (ShortInt -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (ShortInt -> Int) -> ShortInt -> Int
forall a b. (a -> b) -> a -> b
$ Channel -> ShortInt
channelID Channel
chan) IntMap (Channel, ThreadId)
chans
            then IO () -> (IOException -> IO ()) -> IO ()
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
CE.catch
                -- ensure at most one thread is writing to the socket at any time

                (do
                    MVar () -> (() -> IO ()) -> IO ()
forall a b. MVar a -> (a -> IO b) -> IO b
withMVar (Connection -> MVar ()
connWriteLock Connection
conn) ((() -> IO ()) -> IO ()) -> (() -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \()
_ ->
                        (FramePayload -> IO ()) -> [FramePayload] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (\FramePayload
payload -> Connection -> Frame -> IO ()
writeFrame (Connection -> Connection
connHandle Connection
conn) (ShortInt -> FramePayload -> Frame
Frame (Channel -> ShortInt
channelID Channel
chan) FramePayload
payload)) [FramePayload]
payloads
                    Connection -> IO ()
updateLastSent Connection
conn)
                (\(IOException
_ :: CE.IOException) -> do
                    IOException -> IO ()
forall e a. Exception e => e -> IO a
CE.throwIO (IOException -> IO ()) -> IOException -> IO ()
forall a b. (a -> b) -> a -> b
$ String -> IOException
userError String
"connection not open"
                )
            else do
                IOException -> IO ()
forall e a. Exception e => e -> IO a
CE.throwIO (IOException -> IO ()) -> IOException -> IO ()
forall a b. (a -> b) -> a -> b
$ String -> IOException
userError String
"channel not open"

writeAssembly' :: Channel -> Assembly -> IO ()
writeAssembly' :: Channel -> Assembly -> IO ()
writeAssembly' Channel
chan (ContentMethod MethodPayload
m ContentHeaderProperties
properties ByteString
msg) = do
    -- wait iff the AMQP server instructed us to withhold sending content data (flow control)

    Lock -> IO ()
waitLock (Lock -> IO ()) -> Lock -> IO ()
forall a b. (a -> b) -> a -> b
$ Channel -> Lock
chanActive Channel
chan
    let !toWrite :: [FramePayload]
toWrite = [
            MethodPayload -> FramePayload
MethodPayload MethodPayload
m,
            ShortInt
-> ShortInt -> Timestamp -> ContentHeaderProperties -> FramePayload
ContentHeaderPayload
                (ContentHeaderProperties -> ShortInt
getClassIDOf ContentHeaderProperties
properties) --classID

                ShortInt
0 --weight is deprecated in AMQP 0-9

                (Int64 -> Timestamp
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int64 -> Timestamp) -> Int64 -> Timestamp
forall a b. (a -> b) -> a -> b
$ ByteString -> Int64
BL.length ByteString
msg) --bodySize

                ContentHeaderProperties
properties] [FramePayload] -> [FramePayload] -> [FramePayload]
forall a. [a] -> [a] -> [a]
++
            (if ByteString -> Int64
BL.length ByteString
msg Int64 -> Int64 -> Bool
forall a. Ord a => a -> a -> Bool
> Int64
0
             then do
                -- split into frames of maxFrameSize

                -- (need to substract 8 bytes to account for frame header and end-marker)

                (ByteString -> FramePayload) -> [ByteString] -> [FramePayload]
forall a b. (a -> b) -> [a] -> [b]
map ByteString -> FramePayload
ContentBodyPayload
                    (ByteString -> Int64 -> [ByteString]
splitLen ByteString
msg (Int64 -> [ByteString]) -> Int64 -> [ByteString]
forall a b. (a -> b) -> a -> b
$ Int -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Connection -> Int
connMaxFrameSize (Connection -> Int) -> Connection -> Int
forall a b. (a -> b) -> a -> b
$ Channel -> Connection
connection Channel
chan) Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
- Int64
8)
             else []
            )
    Channel -> [FramePayload] -> IO ()
writeFrames Channel
chan [FramePayload]
toWrite
  where
    splitLen :: ByteString -> Int64 -> [ByteString]
splitLen ByteString
str Int64
len | ByteString -> Int64
BL.length ByteString
str Int64 -> Int64 -> Bool
forall a. Ord a => a -> a -> Bool
> Int64
len = Int64 -> ByteString -> ByteString
BL.take Int64
len ByteString
str ByteString -> [ByteString] -> [ByteString]
forall a. a -> [a] -> [a]
: ByteString -> Int64 -> [ByteString]
splitLen (Int64 -> ByteString -> ByteString
BL.drop Int64
len ByteString
str) Int64
len
    splitLen ByteString
str Int64
_ = [ByteString
str]
writeAssembly' Channel
chan (SimpleMethod MethodPayload
m) = Channel -> [FramePayload] -> IO ()
writeFrames Channel
chan [MethodPayload -> FramePayload
MethodPayload MethodPayload
m]

-- most exported functions in this module will use either 'writeAssembly' or 'request' to talk to the server

-- so we perform the exception handling here


-- | writes an assembly to the channel

writeAssembly :: Channel -> Assembly -> IO ()
writeAssembly :: Channel -> Assembly -> IO ()
writeAssembly Channel
chan Assembly
m =
    IO () -> [Handler ()] -> IO ()
forall a. IO a -> [Handler a] -> IO a
CE.catches
        (Channel -> Assembly -> IO ()
writeAssembly' Channel
chan Assembly
m)
        [(AMQPException -> IO ()) -> Handler ()
forall a e. Exception e => (e -> IO a) -> Handler a
CE.Handler (\ (AMQPException
_ :: AMQPException) -> Channel -> IO ()
forall a. Channel -> IO a
throwMostRelevantAMQPException Channel
chan),
         (ErrorCall -> IO ()) -> Handler ()
forall a e. Exception e => (e -> IO a) -> Handler a
CE.Handler (\ (ErrorCall
_ :: CE.ErrorCall) -> Channel -> IO ()
forall a. Channel -> IO a
throwMostRelevantAMQPException Channel
chan),
         (IOException -> IO ()) -> Handler ()
forall a e. Exception e => (e -> IO a) -> Handler a
CE.Handler (\ (IOException
_ :: CE.IOException) -> Channel -> IO ()
forall a. Channel -> IO a
throwMostRelevantAMQPException Channel
chan)]

-- | sends an assembly and receives the response

request :: Channel -> Assembly -> IO Assembly
request :: Channel -> Assembly -> IO Assembly
request Channel
chan Assembly
m = do
    MVar Assembly
res <- IO (MVar Assembly)
forall a. IO (MVar a)
newEmptyMVar
    IO Assembly -> [Handler Assembly] -> IO Assembly
forall a. IO a -> [Handler a] -> IO a
CE.catches (do
            MVar (Maybe (CloseType, String))
-> (Maybe (CloseType, String) -> IO ()) -> IO ()
forall a b. MVar a -> (a -> IO b) -> IO b
withMVar (Channel -> MVar (Maybe (CloseType, String))
chanClosed Channel
chan) ((Maybe (CloseType, String) -> IO ()) -> IO ())
-> (Maybe (CloseType, String) -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Maybe (CloseType, String)
cc -> do
                if Maybe (CloseType, String) -> Bool
forall a. Maybe a -> Bool
isNothing Maybe (CloseType, String)
cc
                    then do
                        MVar (Seq (MVar Assembly))
-> (Seq (MVar Assembly) -> IO (Seq (MVar Assembly))) -> IO ()
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ (Channel -> MVar (Seq (MVar Assembly))
outstandingResponses Channel
chan) ((Seq (MVar Assembly) -> IO (Seq (MVar Assembly))) -> IO ())
-> (Seq (MVar Assembly) -> IO (Seq (MVar Assembly))) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Seq (MVar Assembly)
val -> Seq (MVar Assembly) -> IO (Seq (MVar Assembly))
forall (m :: * -> *) a. Monad m => a -> m a
return (Seq (MVar Assembly) -> IO (Seq (MVar Assembly)))
-> Seq (MVar Assembly) -> IO (Seq (MVar Assembly))
forall a b. (a -> b) -> a -> b
$! Seq (MVar Assembly)
val Seq (MVar Assembly) -> MVar Assembly -> Seq (MVar Assembly)
forall a. Seq a -> a -> Seq a
Seq.|> MVar Assembly
res
                        Channel -> Assembly -> IO ()
writeAssembly' Channel
chan Assembly
m
                    else IOException -> IO ()
forall e a. Exception e => e -> IO a
CE.throwIO (IOException -> IO ()) -> IOException -> IO ()
forall a b. (a -> b) -> a -> b
$ String -> IOException
userError String
"closed"

            -- res might contain an exception, so evaluate it here

            !Assembly
r <- MVar Assembly -> IO Assembly
forall a. MVar a -> IO a
takeMVar MVar Assembly
res
            Assembly -> IO Assembly
forall (m :: * -> *) a. Monad m => a -> m a
return Assembly
r
            )
        [(AMQPException -> IO Assembly) -> Handler Assembly
forall a e. Exception e => (e -> IO a) -> Handler a
CE.Handler (\ (AMQPException
_ :: AMQPException) -> Channel -> IO Assembly
forall a. Channel -> IO a
throwMostRelevantAMQPException Channel
chan),
         (ErrorCall -> IO Assembly) -> Handler Assembly
forall a e. Exception e => (e -> IO a) -> Handler a
CE.Handler (\ (ErrorCall
_ :: CE.ErrorCall) -> Channel -> IO Assembly
forall a. Channel -> IO a
throwMostRelevantAMQPException Channel
chan),
         (IOException -> IO Assembly) -> Handler Assembly
forall a e. Exception e => (e -> IO a) -> Handler a
CE.Handler (\ (IOException
_ :: CE.IOException) -> Channel -> IO Assembly
forall a. Channel -> IO a
throwMostRelevantAMQPException Channel
chan)]

-- this throws an AMQPException based on the status of the connection and the channel

-- if both connection and channel are closed, it will throw a ConnectionClosedException

throwMostRelevantAMQPException :: Channel -> IO a
throwMostRelevantAMQPException :: Channel -> IO a
throwMostRelevantAMQPException Channel
chan = do
    Maybe (CloseType, String)
cc <- MVar (Maybe (CloseType, String)) -> IO (Maybe (CloseType, String))
forall a. MVar a -> IO a
readMVar (MVar (Maybe (CloseType, String))
 -> IO (Maybe (CloseType, String)))
-> MVar (Maybe (CloseType, String))
-> IO (Maybe (CloseType, String))
forall a b. (a -> b) -> a -> b
$ Connection -> MVar (Maybe (CloseType, String))
connClosed (Connection -> MVar (Maybe (CloseType, String)))
-> Connection -> MVar (Maybe (CloseType, String))
forall a b. (a -> b) -> a -> b
$ Channel -> Connection
connection Channel
chan
    case Maybe (CloseType, String)
cc of
        Just (CloseType
closeType, String
r) -> AMQPException -> IO a
forall e a. Exception e => e -> IO a
CE.throwIO (AMQPException -> IO a) -> AMQPException -> IO a
forall a b. (a -> b) -> a -> b
$ CloseType -> String -> AMQPException
ConnectionClosedException CloseType
closeType String
r
        Maybe (CloseType, String)
Nothing -> do
            Maybe (CloseType, String)
chc <- MVar (Maybe (CloseType, String)) -> IO (Maybe (CloseType, String))
forall a. MVar a -> IO a
readMVar (MVar (Maybe (CloseType, String))
 -> IO (Maybe (CloseType, String)))
-> MVar (Maybe (CloseType, String))
-> IO (Maybe (CloseType, String))
forall a b. (a -> b) -> a -> b
$ Channel -> MVar (Maybe (CloseType, String))
chanClosed Channel
chan
            case Maybe (CloseType, String)
chc of
                Just (CloseType
ct, String
r) -> AMQPException -> IO a
forall e a. Exception e => e -> IO a
CE.throwIO (AMQPException -> IO a) -> AMQPException -> IO a
forall a b. (a -> b) -> a -> b
$ CloseType -> String -> AMQPException
ChannelClosedException CloseType
ct String
r
                Maybe (CloseType, String)
Nothing -> AMQPException -> IO a
forall e a. Exception e => e -> IO a
CE.throwIO (AMQPException -> IO a) -> AMQPException -> IO a
forall a b. (a -> b) -> a -> b
$ CloseType -> String -> AMQPException
ConnectionClosedException CloseType
Abnormal String
"unknown reason"

waitForAllConfirms :: Channel -> STM (IntSet.IntSet, IntSet.IntSet)
waitForAllConfirms :: Channel -> STM (IntSet, IntSet)
waitForAllConfirms Channel
chan = do
    IntSet
pending <- TVar IntSet -> STM IntSet
forall a. TVar a -> STM a
readTVar (TVar IntSet -> STM IntSet) -> TVar IntSet -> STM IntSet
forall a b. (a -> b) -> a -> b
$ Channel -> TVar IntSet
unconfirmedSet Channel
chan
    Bool -> STM ()
check (IntSet -> Bool
IntSet.null IntSet
pending)
    (,) (IntSet -> IntSet -> (IntSet, IntSet))
-> STM IntSet -> STM (IntSet -> (IntSet, IntSet))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TVar IntSet -> IntSet -> STM IntSet
forall a. TVar a -> a -> STM a
swapTVar (Channel -> TVar IntSet
ackedSet Channel
chan) IntSet
IntSet.empty
        STM (IntSet -> (IntSet, IntSet))
-> STM IntSet -> STM (IntSet, IntSet)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> TVar IntSet -> IntSet -> STM IntSet
forall a. TVar a -> a -> STM a
swapTVar (Channel -> TVar IntSet
nackedSet Channel
chan) IntSet
IntSet.empty