{-# LANGUAGE BangPatterns, CPP, OverloadedStrings, ScopedTypeVariables, LambdaCase #-}
module Network.AMQP.Internal where
import Paths_amqp(version)
import Data.Version(showVersion)
import Control.Applicative
import Control.Concurrent
import Control.Concurrent.STM
import Control.Monad
import Data.Binary
import Data.Binary.Get
import Data.Binary.Put as BPut
import Data.Default.Class
import Data.Int (Int64)
import Data.Maybe
import Data.Text (Text)
import Network.Socket (PortNumber, withSocketsDo)
import System.IO (hPutStrLn, stderr)
import qualified Control.Exception as CE
import qualified Data.ByteString as BS
import qualified Data.ByteString.Char8 as BC
import qualified Data.ByteString.Lazy as BL
import qualified Data.Map as M
import qualified Data.Foldable as F
import qualified Data.IntMap as IM
import qualified Data.IntSet as IntSet
import qualified Data.Sequence as Seq
import qualified Data.Text as T
import qualified Data.Text.Encoding as E
import qualified Network.Connection as Conn
import Network.AMQP.Protocol
import Network.AMQP.Types
import Network.AMQP.Helpers
import Network.AMQP.Generated
import Network.AMQP.ChannelAllocator
data AckType = BasicAck | BasicNack deriving Int -> AckType -> ShowS
[AckType] -> ShowS
AckType -> [Char]
(Int -> AckType -> ShowS)
-> (AckType -> [Char]) -> ([AckType] -> ShowS) -> Show AckType
forall a.
(Int -> a -> ShowS) -> (a -> [Char]) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> AckType -> ShowS
showsPrec :: Int -> AckType -> ShowS
$cshow :: AckType -> [Char]
show :: AckType -> [Char]
$cshowList :: [AckType] -> ShowS
showList :: [AckType] -> ShowS
Show
data DeliveryMode = Persistent
| NonPersistent
deriving (DeliveryMode -> DeliveryMode -> Bool
(DeliveryMode -> DeliveryMode -> Bool)
-> (DeliveryMode -> DeliveryMode -> Bool) -> Eq DeliveryMode
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: DeliveryMode -> DeliveryMode -> Bool
== :: DeliveryMode -> DeliveryMode -> Bool
$c/= :: DeliveryMode -> DeliveryMode -> Bool
/= :: DeliveryMode -> DeliveryMode -> Bool
Eq, Eq DeliveryMode
Eq DeliveryMode =>
(DeliveryMode -> DeliveryMode -> Ordering)
-> (DeliveryMode -> DeliveryMode -> Bool)
-> (DeliveryMode -> DeliveryMode -> Bool)
-> (DeliveryMode -> DeliveryMode -> Bool)
-> (DeliveryMode -> DeliveryMode -> Bool)
-> (DeliveryMode -> DeliveryMode -> DeliveryMode)
-> (DeliveryMode -> DeliveryMode -> DeliveryMode)
-> Ord DeliveryMode
DeliveryMode -> DeliveryMode -> Bool
DeliveryMode -> DeliveryMode -> Ordering
DeliveryMode -> DeliveryMode -> DeliveryMode
forall a.
Eq a =>
(a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
$ccompare :: DeliveryMode -> DeliveryMode -> Ordering
compare :: DeliveryMode -> DeliveryMode -> Ordering
$c< :: DeliveryMode -> DeliveryMode -> Bool
< :: DeliveryMode -> DeliveryMode -> Bool
$c<= :: DeliveryMode -> DeliveryMode -> Bool
<= :: DeliveryMode -> DeliveryMode -> Bool
$c> :: DeliveryMode -> DeliveryMode -> Bool
> :: DeliveryMode -> DeliveryMode -> Bool
$c>= :: DeliveryMode -> DeliveryMode -> Bool
>= :: DeliveryMode -> DeliveryMode -> Bool
$cmax :: DeliveryMode -> DeliveryMode -> DeliveryMode
max :: DeliveryMode -> DeliveryMode -> DeliveryMode
$cmin :: DeliveryMode -> DeliveryMode -> DeliveryMode
min :: DeliveryMode -> DeliveryMode -> DeliveryMode
Ord, ReadPrec [DeliveryMode]
ReadPrec DeliveryMode
Int -> ReadS DeliveryMode
ReadS [DeliveryMode]
(Int -> ReadS DeliveryMode)
-> ReadS [DeliveryMode]
-> ReadPrec DeliveryMode
-> ReadPrec [DeliveryMode]
-> Read DeliveryMode
forall a.
(Int -> ReadS a)
-> ReadS [a] -> ReadPrec a -> ReadPrec [a] -> Read a
$creadsPrec :: Int -> ReadS DeliveryMode
readsPrec :: Int -> ReadS DeliveryMode
$creadList :: ReadS [DeliveryMode]
readList :: ReadS [DeliveryMode]
$creadPrec :: ReadPrec DeliveryMode
readPrec :: ReadPrec DeliveryMode
$creadListPrec :: ReadPrec [DeliveryMode]
readListPrec :: ReadPrec [DeliveryMode]
Read, Int -> DeliveryMode -> ShowS
[DeliveryMode] -> ShowS
DeliveryMode -> [Char]
(Int -> DeliveryMode -> ShowS)
-> (DeliveryMode -> [Char])
-> ([DeliveryMode] -> ShowS)
-> Show DeliveryMode
forall a.
(Int -> a -> ShowS) -> (a -> [Char]) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> DeliveryMode -> ShowS
showsPrec :: Int -> DeliveryMode -> ShowS
$cshow :: DeliveryMode -> [Char]
show :: DeliveryMode -> [Char]
$cshowList :: [DeliveryMode] -> ShowS
showList :: [DeliveryMode] -> ShowS
Show)
deliveryModeToInt :: DeliveryMode -> Octet
deliveryModeToInt :: DeliveryMode -> Word8
deliveryModeToInt DeliveryMode
NonPersistent = Word8
1
deliveryModeToInt DeliveryMode
Persistent = Word8
2
intToDeliveryMode :: Octet -> DeliveryMode
intToDeliveryMode :: Word8 -> DeliveryMode
intToDeliveryMode Word8
1 = DeliveryMode
NonPersistent
intToDeliveryMode Word8
2 = DeliveryMode
Persistent
intToDeliveryMode Word8
n = [Char] -> DeliveryMode
forall a. HasCallStack => [Char] -> a
error ([Char]
"Unknown delivery mode int: " [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ Word8 -> [Char]
forall a. Show a => a -> [Char]
show Word8
n)
data Message = Message {
Message -> ByteString
msgBody :: BL.ByteString,
Message -> Maybe DeliveryMode
msgDeliveryMode :: Maybe DeliveryMode,
Message -> Maybe LongLongInt
msgTimestamp :: Maybe Timestamp,
Message -> Maybe Text
msgID :: Maybe Text,
Message -> Maybe Text
msgType :: Maybe Text,
Message -> Maybe Text
msgUserID :: Maybe Text,
Message -> Maybe Text
msgApplicationID :: Maybe Text,
Message -> Maybe Text
msgClusterID :: Maybe Text,
Message -> Maybe Text
msgContentType :: Maybe Text,
Message -> Maybe Text
msgContentEncoding :: Maybe Text,
Message -> Maybe Text
msgReplyTo :: Maybe Text,
Message -> Maybe Word8
msgPriority :: Maybe Octet,
Message -> Maybe Text
msgCorrelationID :: Maybe Text,
Message -> Maybe Text
msgExpiration :: Maybe Text,
:: Maybe FieldTable
} deriving (Message -> Message -> Bool
(Message -> Message -> Bool)
-> (Message -> Message -> Bool) -> Eq Message
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: Message -> Message -> Bool
== :: Message -> Message -> Bool
$c/= :: Message -> Message -> Bool
/= :: Message -> Message -> Bool
Eq, Eq Message
Eq Message =>
(Message -> Message -> Ordering)
-> (Message -> Message -> Bool)
-> (Message -> Message -> Bool)
-> (Message -> Message -> Bool)
-> (Message -> Message -> Bool)
-> (Message -> Message -> Message)
-> (Message -> Message -> Message)
-> Ord Message
Message -> Message -> Bool
Message -> Message -> Ordering
Message -> Message -> Message
forall a.
Eq a =>
(a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
$ccompare :: Message -> Message -> Ordering
compare :: Message -> Message -> Ordering
$c< :: Message -> Message -> Bool
< :: Message -> Message -> Bool
$c<= :: Message -> Message -> Bool
<= :: Message -> Message -> Bool
$c> :: Message -> Message -> Bool
> :: Message -> Message -> Bool
$c>= :: Message -> Message -> Bool
>= :: Message -> Message -> Bool
$cmax :: Message -> Message -> Message
max :: Message -> Message -> Message
$cmin :: Message -> Message -> Message
min :: Message -> Message -> Message
Ord, ReadPrec [Message]
ReadPrec Message
Int -> ReadS Message
ReadS [Message]
(Int -> ReadS Message)
-> ReadS [Message]
-> ReadPrec Message
-> ReadPrec [Message]
-> Read Message
forall a.
(Int -> ReadS a)
-> ReadS [a] -> ReadPrec a -> ReadPrec [a] -> Read a
$creadsPrec :: Int -> ReadS Message
readsPrec :: Int -> ReadS Message
$creadList :: ReadS [Message]
readList :: ReadS [Message]
$creadPrec :: ReadPrec Message
readPrec :: ReadPrec Message
$creadListPrec :: ReadPrec [Message]
readListPrec :: ReadPrec [Message]
Read, Int -> Message -> ShowS
[Message] -> ShowS
Message -> [Char]
(Int -> Message -> ShowS)
-> (Message -> [Char]) -> ([Message] -> ShowS) -> Show Message
forall a.
(Int -> a -> ShowS) -> (a -> [Char]) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> Message -> ShowS
showsPrec :: Int -> Message -> ShowS
$cshow :: Message -> [Char]
show :: Message -> [Char]
$cshowList :: [Message] -> ShowS
showList :: [Message] -> ShowS
Show)
data Envelope = Envelope {
Envelope -> LongLongInt
envDeliveryTag :: LongLongInt,
Envelope -> Bool
envRedelivered :: Bool,
Envelope -> Text
envExchangeName :: Text,
Envelope -> Text
envRoutingKey :: Text,
Envelope -> Channel
envChannel :: Channel
}
data PublishError = PublishError {
PublishError -> ReturnReplyCode
errReplyCode :: ReturnReplyCode,
PublishError -> Maybe Text
errExchange :: Maybe Text,
PublishError -> Text
errRoutingKey :: Text
} deriving (PublishError -> PublishError -> Bool
(PublishError -> PublishError -> Bool)
-> (PublishError -> PublishError -> Bool) -> Eq PublishError
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: PublishError -> PublishError -> Bool
== :: PublishError -> PublishError -> Bool
$c/= :: PublishError -> PublishError -> Bool
/= :: PublishError -> PublishError -> Bool
Eq, ReadPrec [PublishError]
ReadPrec PublishError
Int -> ReadS PublishError
ReadS [PublishError]
(Int -> ReadS PublishError)
-> ReadS [PublishError]
-> ReadPrec PublishError
-> ReadPrec [PublishError]
-> Read PublishError
forall a.
(Int -> ReadS a)
-> ReadS [a] -> ReadPrec a -> ReadPrec [a] -> Read a
$creadsPrec :: Int -> ReadS PublishError
readsPrec :: Int -> ReadS PublishError
$creadList :: ReadS [PublishError]
readList :: ReadS [PublishError]
$creadPrec :: ReadPrec PublishError
readPrec :: ReadPrec PublishError
$creadListPrec :: ReadPrec [PublishError]
readListPrec :: ReadPrec [PublishError]
Read, Int -> PublishError -> ShowS
[PublishError] -> ShowS
PublishError -> [Char]
(Int -> PublishError -> ShowS)
-> (PublishError -> [Char])
-> ([PublishError] -> ShowS)
-> Show PublishError
forall a.
(Int -> a -> ShowS) -> (a -> [Char]) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> PublishError -> ShowS
showsPrec :: Int -> PublishError -> ShowS
$cshow :: PublishError -> [Char]
show :: PublishError -> [Char]
$cshowList :: [PublishError] -> ShowS
showList :: [PublishError] -> ShowS
Show)
data ReturnReplyCode = Unroutable Text
| NoConsumers Text
| NotFound Text
deriving (ReturnReplyCode -> ReturnReplyCode -> Bool
(ReturnReplyCode -> ReturnReplyCode -> Bool)
-> (ReturnReplyCode -> ReturnReplyCode -> Bool)
-> Eq ReturnReplyCode
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: ReturnReplyCode -> ReturnReplyCode -> Bool
== :: ReturnReplyCode -> ReturnReplyCode -> Bool
$c/= :: ReturnReplyCode -> ReturnReplyCode -> Bool
/= :: ReturnReplyCode -> ReturnReplyCode -> Bool
Eq, ReadPrec [ReturnReplyCode]
ReadPrec ReturnReplyCode
Int -> ReadS ReturnReplyCode
ReadS [ReturnReplyCode]
(Int -> ReadS ReturnReplyCode)
-> ReadS [ReturnReplyCode]
-> ReadPrec ReturnReplyCode
-> ReadPrec [ReturnReplyCode]
-> Read ReturnReplyCode
forall a.
(Int -> ReadS a)
-> ReadS [a] -> ReadPrec a -> ReadPrec [a] -> Read a
$creadsPrec :: Int -> ReadS ReturnReplyCode
readsPrec :: Int -> ReadS ReturnReplyCode
$creadList :: ReadS [ReturnReplyCode]
readList :: ReadS [ReturnReplyCode]
$creadPrec :: ReadPrec ReturnReplyCode
readPrec :: ReadPrec ReturnReplyCode
$creadListPrec :: ReadPrec [ReturnReplyCode]
readListPrec :: ReadPrec [ReturnReplyCode]
Read, Int -> ReturnReplyCode -> ShowS
[ReturnReplyCode] -> ShowS
ReturnReplyCode -> [Char]
(Int -> ReturnReplyCode -> ShowS)
-> (ReturnReplyCode -> [Char])
-> ([ReturnReplyCode] -> ShowS)
-> Show ReturnReplyCode
forall a.
(Int -> a -> ShowS) -> (a -> [Char]) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> ReturnReplyCode -> ShowS
showsPrec :: Int -> ReturnReplyCode -> ShowS
$cshow :: ReturnReplyCode -> [Char]
show :: ReturnReplyCode -> [Char]
$cshowList :: [ReturnReplyCode] -> ShowS
showList :: [ReturnReplyCode] -> ShowS
Show)
data Assembly = SimpleMethod MethodPayload
| ContentMethod MethodPayload ContentHeaderProperties BL.ByteString
deriving Int -> Assembly -> ShowS
[Assembly] -> ShowS
Assembly -> [Char]
(Int -> Assembly -> ShowS)
-> (Assembly -> [Char]) -> ([Assembly] -> ShowS) -> Show Assembly
forall a.
(Int -> a -> ShowS) -> (a -> [Char]) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> Assembly -> ShowS
showsPrec :: Int -> Assembly -> ShowS
$cshow :: Assembly -> [Char]
show :: Assembly -> [Char]
$cshowList :: [Assembly] -> ShowS
showList :: [Assembly] -> ShowS
Show
readAssembly :: Chan FramePayload -> IO Assembly
readAssembly :: Chan FramePayload -> IO Assembly
readAssembly Chan FramePayload
chan = do
FramePayload
m <- Chan FramePayload -> IO FramePayload
forall a. Chan a -> IO a
readChan Chan FramePayload
chan
case FramePayload
m of
MethodPayload MethodPayload
p ->
if FramePayload -> Bool
hasContent FramePayload
m
then do
(ContentHeaderProperties
props, ByteString
msg) <- Chan FramePayload -> IO (ContentHeaderProperties, ByteString)
collectContent Chan FramePayload
chan
Assembly -> IO Assembly
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Assembly -> IO Assembly) -> Assembly -> IO Assembly
forall a b. (a -> b) -> a -> b
$ MethodPayload -> ContentHeaderProperties -> ByteString -> Assembly
ContentMethod MethodPayload
p ContentHeaderProperties
props ByteString
msg
else do
Assembly -> IO Assembly
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Assembly -> IO Assembly) -> Assembly -> IO Assembly
forall a b. (a -> b) -> a -> b
$ MethodPayload -> Assembly
SimpleMethod MethodPayload
p
FramePayload
x -> [Char] -> IO Assembly
forall a. HasCallStack => [Char] -> a
error ([Char] -> IO Assembly) -> [Char] -> IO Assembly
forall a b. (a -> b) -> a -> b
$ [Char]
"didn't expect frame: " [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ FramePayload -> [Char]
forall a. Show a => a -> [Char]
show FramePayload
x
collectContent :: Chan FramePayload -> IO (ContentHeaderProperties, BL.ByteString)
collectContent :: Chan FramePayload -> IO (ContentHeaderProperties, ByteString)
collectContent Chan FramePayload
chan = do
(ContentHeaderPayload Word16
_ Word16
_ LongLongInt
bodySize ContentHeaderProperties
props) <- Chan FramePayload -> IO FramePayload
forall a. Chan a -> IO a
readChan Chan FramePayload
chan
[ByteString]
content <- ByteOffset -> IO [ByteString]
collect (ByteOffset -> IO [ByteString]) -> ByteOffset -> IO [ByteString]
forall a b. (a -> b) -> a -> b
$ LongLongInt -> ByteOffset
forall a b. (Integral a, Num b) => a -> b
fromIntegral LongLongInt
bodySize
(ContentHeaderProperties, ByteString)
-> IO (ContentHeaderProperties, ByteString)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (ContentHeaderProperties
props, [ByteString] -> ByteString
BL.concat [ByteString]
content)
where
collect :: ByteOffset -> IO [ByteString]
collect ByteOffset
x | ByteOffset
x ByteOffset -> ByteOffset -> Bool
forall a. Ord a => a -> a -> Bool
<= ByteOffset
0 = [ByteString] -> IO [ByteString]
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return []
collect ByteOffset
x = do
(ContentBodyPayload ByteString
payload) <- Chan FramePayload -> IO FramePayload
forall a. Chan a -> IO a
readChan Chan FramePayload
chan
[ByteString]
r <- ByteOffset -> IO [ByteString]
collect (ByteOffset
x ByteOffset -> ByteOffset -> ByteOffset
forall a. Num a => a -> a -> a
- ByteString -> ByteOffset
BL.length ByteString
payload)
[ByteString] -> IO [ByteString]
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ([ByteString] -> IO [ByteString])
-> [ByteString] -> IO [ByteString]
forall a b. (a -> b) -> a -> b
$ ByteString
payload ByteString -> [ByteString] -> [ByteString]
forall a. a -> [a] -> [a]
: [ByteString]
r
data Connection = Connection {
Connection -> Connection
connHandle :: Conn.Connection,
Connection -> ChannelAllocator
connChanAllocator :: ChannelAllocator,
Connection -> MVar (IntMap (Channel, ThreadId))
connChannels :: MVar (IM.IntMap (Channel, ThreadId)),
Connection -> Int
connMaxFrameSize :: Int,
Connection -> MVar (Maybe (CloseType, [Char]))
connClosed :: MVar (Maybe (CloseType, String)),
Connection -> MVar ()
connClosedLock :: MVar (),
Connection -> MVar ()
connWriteLock :: MVar (),
Connection -> MVar [IO ()]
connClosedHandlers :: MVar [IO ()],
Connection -> MVar [(Text -> IO (), IO ())]
connBlockedHandlers :: MVar [(Text -> IO (), IO ())],
Connection -> MVar ByteOffset
connLastReceived :: MVar Int64,
Connection -> MVar ByteOffset
connLastSent :: MVar Int64,
Connection -> FieldTable
connServerProperties :: FieldTable,
Connection -> MVar (Maybe ThreadId)
connThread :: MVar (Maybe ThreadId)
}
data ConnectionOpts = ConnectionOpts {
ConnectionOpts -> [([Char], PortNumber)]
coServers :: ![(String, PortNumber)],
ConnectionOpts -> Text
coVHost :: !Text,
ConnectionOpts -> [SASLMechanism]
coAuth :: ![SASLMechanism],
ConnectionOpts -> Maybe Word32
coMaxFrameSize :: !(Maybe Word32),
ConnectionOpts -> Maybe Word16
coHeartbeatDelay :: !(Maybe Word16),
ConnectionOpts -> Maybe Word16
coMaxChannel :: !(Maybe Word16),
ConnectionOpts -> Maybe TLSSettings
coTLSSettings :: Maybe TLSSettings,
ConnectionOpts -> Maybe Text
coName :: !(Maybe Text)
} deriving Int -> ConnectionOpts -> ShowS
[ConnectionOpts] -> ShowS
ConnectionOpts -> [Char]
(Int -> ConnectionOpts -> ShowS)
-> (ConnectionOpts -> [Char])
-> ([ConnectionOpts] -> ShowS)
-> Show ConnectionOpts
forall a.
(Int -> a -> ShowS) -> (a -> [Char]) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> ConnectionOpts -> ShowS
showsPrec :: Int -> ConnectionOpts -> ShowS
$cshow :: ConnectionOpts -> [Char]
show :: ConnectionOpts -> [Char]
$cshowList :: [ConnectionOpts] -> ShowS
showList :: [ConnectionOpts] -> ShowS
Show
data TLSSettings =
TLSTrusted
| TLSUntrusted
| TLSCustom Conn.TLSSettings
deriving Int -> TLSSettings -> ShowS
[TLSSettings] -> ShowS
TLSSettings -> [Char]
(Int -> TLSSettings -> ShowS)
-> (TLSSettings -> [Char])
-> ([TLSSettings] -> ShowS)
-> Show TLSSettings
forall a.
(Int -> a -> ShowS) -> (a -> [Char]) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> TLSSettings -> ShowS
showsPrec :: Int -> TLSSettings -> ShowS
$cshow :: TLSSettings -> [Char]
show :: TLSSettings -> [Char]
$cshowList :: [TLSSettings] -> ShowS
showList :: [TLSSettings] -> ShowS
Show
connectionTLSSettings :: TLSSettings -> Maybe Conn.TLSSettings
connectionTLSSettings :: TLSSettings -> Maybe TLSSettings
connectionTLSSettings TLSSettings
tlsSettings =
TLSSettings -> Maybe TLSSettings
forall a. a -> Maybe a
Just (TLSSettings -> Maybe TLSSettings)
-> TLSSettings -> Maybe TLSSettings
forall a b. (a -> b) -> a -> b
$ case TLSSettings
tlsSettings of
TLSSettings
TLSTrusted -> Bool -> Bool -> Bool -> Supported -> TLSSettings
Conn.TLSSettingsSimple Bool
False Bool
False Bool
False Supported
forall a. Default a => a
def
TLSSettings
TLSUntrusted -> Bool -> Bool -> Bool -> Supported -> TLSSettings
Conn.TLSSettingsSimple Bool
True Bool
False Bool
False Supported
forall a. Default a => a
def
TLSCustom TLSSettings
x -> TLSSettings
x
data SASLMechanism = SASLMechanism {
SASLMechanism -> Text
saslName :: !Text,
SASLMechanism -> ByteString
saslInitialResponse :: !BS.ByteString,
SASLMechanism -> Maybe (ByteString -> IO ByteString)
saslChallengeFunc :: !(Maybe (BS.ByteString -> IO BS.ByteString))
}
instance Show SASLMechanism where
show :: SASLMechanism -> [Char]
show SASLMechanism
x = [Char]
"SASL"
connectionReceiver :: Connection -> IO ()
connectionReceiver :: Connection -> IO ()
connectionReceiver Connection
conn = do
IO () -> (IOError -> IO ()) -> IO ()
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
CE.catch (do
Frame Word16
chanID FramePayload
payload <- Connection -> IO Frame
readFrame (Connection -> Connection
connHandle Connection
conn)
Connection -> IO ()
updateLastReceived Connection
conn
Word16 -> FramePayload -> IO ()
forall {a}. (Integral a, Show a) => a -> FramePayload -> IO ()
forwardToChannel Word16
chanID FramePayload
payload
)
(\(IOError
e :: CE.IOException) -> IO ThreadId
myThreadId IO ThreadId -> (ThreadId -> IO ()) -> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Connection -> CloseType -> SomeException -> ThreadId -> IO ()
killConnection Connection
conn CloseType
Abnormal (IOError -> SomeException
forall e. Exception e => e -> SomeException
CE.toException IOError
e))
Connection -> IO ()
connectionReceiver Connection
conn
where
closedByUserEx :: AMQPException
closedByUserEx = CloseType -> [Char] -> AMQPException
ConnectionClosedException CloseType
Normal [Char]
"closed by user"
forwardToChannel :: a -> FramePayload -> IO ()
forwardToChannel a
0 (MethodPayload MethodPayload
Connection_close_ok) =
IO ThreadId
myThreadId IO ThreadId -> (ThreadId -> IO ()) -> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Connection -> CloseType -> SomeException -> ThreadId -> IO ()
killConnection Connection
conn CloseType
Normal (AMQPException -> SomeException
forall e. Exception e => e -> SomeException
CE.toException AMQPException
closedByUserEx)
forwardToChannel a
0 (MethodPayload (Connection_close Word16
_ (ShortString Text
errorMsg) Word16
_ Word16
_)) = do
Connection -> Frame -> IO ()
writeFrame (Connection -> Connection
connHandle Connection
conn) (Frame -> IO ()) -> Frame -> IO ()
forall a b. (a -> b) -> a -> b
$ Word16 -> FramePayload -> Frame
Frame Word16
0 (FramePayload -> Frame) -> FramePayload -> Frame
forall a b. (a -> b) -> a -> b
$ MethodPayload -> FramePayload
MethodPayload MethodPayload
Connection_close_ok
IO ThreadId
myThreadId IO ThreadId -> (ThreadId -> IO ()) -> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Connection -> CloseType -> SomeException -> ThreadId -> IO ()
killConnection Connection
conn CloseType
Abnormal (AMQPException -> SomeException
forall e. Exception e => e -> SomeException
CE.toException (AMQPException -> SomeException)
-> (Text -> AMQPException) -> Text -> SomeException
forall b c a. (b -> c) -> (a -> b) -> a -> c
. CloseType -> [Char] -> AMQPException
ConnectionClosedException CloseType
Abnormal ([Char] -> AMQPException)
-> (Text -> [Char]) -> Text -> AMQPException
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> [Char]
T.unpack (Text -> SomeException) -> Text -> SomeException
forall a b. (a -> b) -> a -> b
$ Text
errorMsg)
forwardToChannel a
0 FramePayload
HeartbeatPayload = () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
forwardToChannel a
0 (MethodPayload (Connection_blocked ShortString
reason)) = ShortString -> IO ()
handleBlocked ShortString
reason
forwardToChannel a
0 (MethodPayload MethodPayload
Connection_unblocked) = IO ()
handleUnblocked
forwardToChannel a
0 FramePayload
payload = Handle -> [Char] -> IO ()
hPutStrLn Handle
stderr ([Char] -> IO ()) -> [Char] -> IO ()
forall a b. (a -> b) -> a -> b
$ [Char]
"Got unexpected msg on channel zero: " [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ FramePayload -> [Char]
forall a. Show a => a -> [Char]
show FramePayload
payload
forwardToChannel a
chanID FramePayload
payload = do
MVar (IntMap (Channel, ThreadId))
-> (IntMap (Channel, ThreadId) -> IO ()) -> IO ()
forall a b. MVar a -> (a -> IO b) -> IO b
withMVar (Connection -> MVar (IntMap (Channel, ThreadId))
connChannels Connection
conn) ((IntMap (Channel, ThreadId) -> IO ()) -> IO ())
-> (IntMap (Channel, ThreadId) -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \IntMap (Channel, ThreadId)
cs -> do
case Int -> IntMap (Channel, ThreadId) -> Maybe (Channel, ThreadId)
forall a. Int -> IntMap a -> Maybe a
IM.lookup (a -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral a
chanID) IntMap (Channel, ThreadId)
cs of
Just (Channel, ThreadId)
c -> Chan FramePayload -> FramePayload -> IO ()
forall a. Chan a -> a -> IO ()
writeChan (Channel -> Chan FramePayload
inQueue (Channel -> Chan FramePayload) -> Channel -> Chan FramePayload
forall a b. (a -> b) -> a -> b
$ (Channel, ThreadId) -> Channel
forall a b. (a, b) -> a
fst (Channel, ThreadId)
c) FramePayload
payload
Maybe (Channel, ThreadId)
Nothing -> Handle -> [Char] -> IO ()
hPutStrLn Handle
stderr ([Char] -> IO ()) -> [Char] -> IO ()
forall a b. (a -> b) -> a -> b
$ [Char]
"ERROR: channel not open " [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ a -> [Char]
forall a. Show a => a -> [Char]
show a
chanID
handleBlocked :: ShortString -> IO ()
handleBlocked (ShortString Text
reason) = do
MVar [(Text -> IO (), IO ())]
-> ([(Text -> IO (), IO ())] -> IO ()) -> IO ()
forall a b. MVar a -> (a -> IO b) -> IO b
withMVar (Connection -> MVar [(Text -> IO (), IO ())]
connBlockedHandlers Connection
conn) (([(Text -> IO (), IO ())] -> IO ()) -> IO ())
-> ([(Text -> IO (), IO ())] -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \[(Text -> IO (), IO ())]
listeners ->
[(Text -> IO (), IO ())]
-> ((Text -> IO (), IO ()) -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [(Text -> IO (), IO ())]
listeners (((Text -> IO (), IO ()) -> IO ()) -> IO ())
-> ((Text -> IO (), IO ()) -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \(Text -> IO ()
l, IO ()
_) -> IO () -> (SomeException -> IO ()) -> IO ()
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
CE.catch (Text -> IO ()
l Text
reason) ((SomeException -> IO ()) -> IO ())
-> (SomeException -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \(SomeException
ex :: CE.SomeException) ->
Handle -> [Char] -> IO ()
hPutStrLn Handle
stderr ([Char] -> IO ()) -> [Char] -> IO ()
forall a b. (a -> b) -> a -> b
$ [Char]
"connection blocked listener threw exception: "[Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ SomeException -> [Char]
forall a. Show a => a -> [Char]
show SomeException
ex
handleUnblocked :: IO ()
handleUnblocked = do
MVar [(Text -> IO (), IO ())]
-> ([(Text -> IO (), IO ())] -> IO ()) -> IO ()
forall a b. MVar a -> (a -> IO b) -> IO b
withMVar (Connection -> MVar [(Text -> IO (), IO ())]
connBlockedHandlers Connection
conn) (([(Text -> IO (), IO ())] -> IO ()) -> IO ())
-> ([(Text -> IO (), IO ())] -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \[(Text -> IO (), IO ())]
listeners ->
[(Text -> IO (), IO ())]
-> ((Text -> IO (), IO ()) -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [(Text -> IO (), IO ())]
listeners (((Text -> IO (), IO ()) -> IO ()) -> IO ())
-> ((Text -> IO (), IO ()) -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \(Text -> IO ()
_, IO ()
l) -> IO () -> (SomeException -> IO ()) -> IO ()
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
CE.catch IO ()
l ((SomeException -> IO ()) -> IO ())
-> (SomeException -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \(SomeException
ex :: CE.SomeException) ->
Handle -> [Char] -> IO ()
hPutStrLn Handle
stderr ([Char] -> IO ()) -> [Char] -> IO ()
forall a b. (a -> b) -> a -> b
$ [Char]
"connection unblocked listener threw exception: "[Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ SomeException -> [Char]
forall a. Show a => a -> [Char]
show SomeException
ex
openConnection'' :: ConnectionOpts -> IO Connection
openConnection'' :: ConnectionOpts -> IO Connection
openConnection'' ConnectionOpts
connOpts = IO Connection -> IO Connection
forall a. IO a -> IO a
withSocketsDo (IO Connection -> IO Connection) -> IO Connection -> IO Connection
forall a b. (a -> b) -> a -> b
$ do
Connection
handle <- [SomeException] -> [([Char], PortNumber)] -> IO Connection
connect [] ([([Char], PortNumber)] -> IO Connection)
-> [([Char], PortNumber)] -> IO Connection
forall a b. (a -> b) -> a -> b
$ ConnectionOpts -> [([Char], PortNumber)]
coServers ConnectionOpts
connOpts
(Word32
maxFrameSize, Word16
maxChannel, Maybe Word16
heartbeatTimeout, FieldTable
serverProps) <- (IOError -> IO (Word32, Word16, Maybe Word16, FieldTable))
-> IO (Word32, Word16, Maybe Word16, FieldTable)
-> IO (Word32, Word16, Maybe Word16, FieldTable)
forall e a. Exception e => (e -> IO a) -> IO a -> IO a
CE.handle (\(IOError
_ :: CE.IOException) -> AMQPException -> IO (Word32, Word16, Maybe Word16, FieldTable)
forall e a. Exception e => e -> IO a
CE.throwIO (AMQPException -> IO (Word32, Word16, Maybe Word16, FieldTable))
-> AMQPException -> IO (Word32, Word16, Maybe Word16, FieldTable)
forall a b. (a -> b) -> a -> b
$ CloseType -> [Char] -> AMQPException
ConnectionClosedException CloseType
Abnormal [Char]
"Handshake failed. Please check the RabbitMQ logs for more information") (IO (Word32, Word16, Maybe Word16, FieldTable)
-> IO (Word32, Word16, Maybe Word16, FieldTable))
-> IO (Word32, Word16, Maybe Word16, FieldTable)
-> IO (Word32, Word16, Maybe Word16, FieldTable)
forall a b. (a -> b) -> a -> b
$ do
Connection -> ByteString -> IO ()
Conn.connectionPut Connection
handle (ByteString -> IO ()) -> ByteString -> IO ()
forall a b. (a -> b) -> a -> b
$ ByteString -> ByteString -> ByteString
BS.append ([Char] -> ByteString
BC.pack [Char]
"AMQP")
([Word8] -> ByteString
BS.pack [
Word8
1
, Word8
1
, Word8
0
, Word8
9
])
Frame Word16
0 (MethodPayload (Connection_start Word8
_ Word8
_ FieldTable
serverProps (LongString ByteString
serverMechanisms) LongString
_)) <- Connection -> IO Frame
readFrame Connection
handle
SASLMechanism
selectedSASL <- Connection -> ByteString -> IO SASLMechanism
selectSASLMechanism Connection
handle ByteString
serverMechanisms
Connection -> Frame -> IO ()
writeFrame Connection
handle (Frame -> IO ()) -> Frame -> IO ()
forall a b. (a -> b) -> a -> b
$ SASLMechanism -> Frame
start_ok SASLMechanism
selectedSASL
Frame Word16
0 (MethodPayload (Connection_tune Word16
channel_max Word32
frame_max Word16
sendHeartbeat)) <- Connection -> SASLMechanism -> IO Frame
handleSecureUntilTune Connection
handle SASLMechanism
selectedSASL
let maxFrameSize :: Word32
maxFrameSize = Word32 -> Maybe Word32 -> Word32
forall a. Ord a => a -> Maybe a -> a
chooseMin Word32
frame_max (Maybe Word32 -> Word32) -> Maybe Word32 -> Word32
forall a b. (a -> b) -> a -> b
$ ConnectionOpts -> Maybe Word32
coMaxFrameSize ConnectionOpts
connOpts
finalHeartbeatSec :: Word16
finalHeartbeatSec = Word16 -> Maybe Word16 -> Word16
forall a. a -> Maybe a -> a
fromMaybe Word16
sendHeartbeat (ConnectionOpts -> Maybe Word16
coHeartbeatDelay ConnectionOpts
connOpts)
heartbeatTimeout :: Maybe Word16
heartbeatTimeout = (Word16 -> Bool) -> Maybe Word16 -> Maybe Word16
forall (m :: * -> *) a. MonadPlus m => (a -> Bool) -> m a -> m a
mfilter (Word16 -> Word16 -> Bool
forall a. Eq a => a -> a -> Bool
/=Word16
0) (Word16 -> Maybe Word16
forall a. a -> Maybe a
Just Word16
finalHeartbeatSec)
fixChanNum :: a -> a
fixChanNum a
x = if a
x a -> a -> Bool
forall a. Eq a => a -> a -> Bool
== a
0 then a
65535 else a
x
maxChannel :: Word16
maxChannel = Word16 -> Maybe Word16 -> Word16
forall a. Ord a => a -> Maybe a -> a
chooseMin (Word16 -> Word16
forall {a}. (Eq a, Num a) => a -> a
fixChanNum Word16
channel_max) (Maybe Word16 -> Word16) -> Maybe Word16 -> Word16
forall a b. (a -> b) -> a -> b
$ (Word16 -> Word16) -> Maybe Word16 -> Maybe Word16
forall a b. (a -> b) -> Maybe a -> Maybe b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Word16 -> Word16
forall {a}. (Eq a, Num a) => a -> a
fixChanNum (Maybe Word16 -> Maybe Word16) -> Maybe Word16 -> Maybe Word16
forall a b. (a -> b) -> a -> b
$ ConnectionOpts -> Maybe Word16
coMaxChannel ConnectionOpts
connOpts
Connection -> Frame -> IO ()
writeFrame Connection
handle (Word16 -> FramePayload -> Frame
Frame Word16
0 (MethodPayload -> FramePayload
MethodPayload
(Word16 -> Word32 -> Word16 -> MethodPayload
Connection_tune_ok Word16
maxChannel Word32
maxFrameSize Word16
finalHeartbeatSec)
))
Connection -> Frame -> IO ()
writeFrame Connection
handle Frame
open
Frame Word16
0 (MethodPayload (Connection_open_ok ShortString
_)) <- Connection -> IO Frame
readFrame Connection
handle
(Word32, Word16, Maybe Word16, FieldTable)
-> IO (Word32, Word16, Maybe Word16, FieldTable)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Word32
maxFrameSize, Word16
maxChannel, Maybe Word16
heartbeatTimeout, FieldTable
serverProps)
MVar (IntMap (Channel, ThreadId))
cChannels <- IntMap (Channel, ThreadId)
-> IO (MVar (IntMap (Channel, ThreadId)))
forall a. a -> IO (MVar a)
newMVar IntMap (Channel, ThreadId)
forall a. IntMap a
IM.empty
MVar (Maybe (CloseType, [Char]))
cClosed <- Maybe (CloseType, [Char]) -> IO (MVar (Maybe (CloseType, [Char])))
forall a. a -> IO (MVar a)
newMVar Maybe (CloseType, [Char])
forall a. Maybe a
Nothing
ChannelAllocator
cChanAllocator <- Int -> IO ChannelAllocator
newChannelAllocator (Int -> IO ChannelAllocator) -> Int -> IO ChannelAllocator
forall a b. (a -> b) -> a -> b
$ Word16 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word16
maxChannel
Int
_ <- ChannelAllocator -> IO Int
allocateChannel ChannelAllocator
cChanAllocator
MVar ()
writeLock <- () -> IO (MVar ())
forall a. a -> IO (MVar a)
newMVar ()
MVar ()
ccl <- IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
MVar [IO ()]
cClosedHandlers <- [IO ()] -> IO (MVar [IO ()])
forall a. a -> IO (MVar a)
newMVar []
MVar [(Text -> IO (), IO ())]
cBlockedHandlers <- [(Text -> IO (), IO ())] -> IO (MVar [(Text -> IO (), IO ())])
forall a. a -> IO (MVar a)
newMVar []
MVar ByteOffset
cLastReceived <- IO ByteOffset
getTimestamp IO ByteOffset
-> (ByteOffset -> IO (MVar ByteOffset)) -> IO (MVar ByteOffset)
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ByteOffset -> IO (MVar ByteOffset)
forall a. a -> IO (MVar a)
newMVar
MVar ByteOffset
cLastSent <- IO ByteOffset
getTimestamp IO ByteOffset
-> (ByteOffset -> IO (MVar ByteOffset)) -> IO (MVar ByteOffset)
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ByteOffset -> IO (MVar ByteOffset)
forall a. a -> IO (MVar a)
newMVar
MVar (Maybe ThreadId)
cThread <- Maybe ThreadId -> IO (MVar (Maybe ThreadId))
forall a. a -> IO (MVar a)
newMVar Maybe ThreadId
forall a. Maybe a
Nothing
let conn :: Connection
conn = Connection
-> ChannelAllocator
-> MVar (IntMap (Channel, ThreadId))
-> Int
-> MVar (Maybe (CloseType, [Char]))
-> MVar ()
-> MVar ()
-> MVar [IO ()]
-> MVar [(Text -> IO (), IO ())]
-> MVar ByteOffset
-> MVar ByteOffset
-> FieldTable
-> MVar (Maybe ThreadId)
-> Connection
Connection Connection
handle ChannelAllocator
cChanAllocator MVar (IntMap (Channel, ThreadId))
cChannels (Word32 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word32
maxFrameSize) MVar (Maybe (CloseType, [Char]))
cClosed MVar ()
ccl MVar ()
writeLock MVar [IO ()]
cClosedHandlers MVar [(Text -> IO (), IO ())]
cBlockedHandlers MVar ByteOffset
cLastReceived MVar ByteOffset
cLastSent FieldTable
serverProps MVar (Maybe ThreadId)
cThread
ThreadId
connThreadId <- IO () -> (Either SomeException () -> IO ()) -> IO ThreadId
forall a. IO a -> (Either SomeException a -> IO ()) -> IO ThreadId
forkFinally' (Connection -> IO ()
connectionReceiver Connection
conn) ((Either SomeException () -> IO ()) -> IO ThreadId)
-> (Either SomeException () -> IO ()) -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ \Either SomeException ()
res -> do
IO () -> (SomeException -> IO ()) -> IO ()
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
CE.catch (Connection -> IO ()
Conn.connectionClose Connection
handle) (\(SomeException
_ :: CE.SomeException) -> () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ())
MVar (Maybe (CloseType, [Char]))
-> (Maybe (CloseType, [Char]) -> IO (Maybe (CloseType, [Char])))
-> IO ()
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ MVar (Maybe (CloseType, [Char]))
cClosed ((Maybe (CloseType, [Char]) -> IO (Maybe (CloseType, [Char])))
-> IO ())
-> (Maybe (CloseType, [Char]) -> IO (Maybe (CloseType, [Char])))
-> IO ()
forall a b. (a -> b) -> a -> b
$ Maybe (CloseType, [Char]) -> IO (Maybe (CloseType, [Char]))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe (CloseType, [Char]) -> IO (Maybe (CloseType, [Char])))
-> (Maybe (CloseType, [Char]) -> Maybe (CloseType, [Char]))
-> Maybe (CloseType, [Char])
-> IO (Maybe (CloseType, [Char]))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (CloseType, [Char]) -> Maybe (CloseType, [Char])
forall a. a -> Maybe a
Just ((CloseType, [Char]) -> Maybe (CloseType, [Char]))
-> (Maybe (CloseType, [Char]) -> (CloseType, [Char]))
-> Maybe (CloseType, [Char])
-> Maybe (CloseType, [Char])
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (CloseType, [Char])
-> Maybe (CloseType, [Char]) -> (CloseType, [Char])
forall a. a -> Maybe a -> a
fromMaybe (CloseType
Abnormal, [Char]
"unknown reason")
let finaliser :: ChanThreadKilledException
finaliser = SomeException -> ChanThreadKilledException
ChanThreadKilledException (SomeException -> ChanThreadKilledException)
-> SomeException -> ChanThreadKilledException
forall a b. (a -> b) -> a -> b
$ case Either SomeException ()
res of
Left SomeException
ex -> SomeException
ex
Right ()
_ -> AsyncException -> SomeException
forall e. Exception e => e -> SomeException
CE.toException AsyncException
CE.ThreadKilled
MVar (IntMap (Channel, ThreadId))
-> (IntMap (Channel, ThreadId) -> IO (IntMap (Channel, ThreadId)))
-> IO ()
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ MVar (IntMap (Channel, ThreadId))
cChannels ((IntMap (Channel, ThreadId) -> IO (IntMap (Channel, ThreadId)))
-> IO ())
-> (IntMap (Channel, ThreadId) -> IO (IntMap (Channel, ThreadId)))
-> IO ()
forall a b. (a -> b) -> a -> b
$ \IntMap (Channel, ThreadId)
x -> do
((Channel, ThreadId) -> IO ()) -> [(Channel, ThreadId)] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ ((ThreadId -> ChanThreadKilledException -> IO ())
-> ChanThreadKilledException -> ThreadId -> IO ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip ThreadId -> ChanThreadKilledException -> IO ()
forall e. Exception e => ThreadId -> e -> IO ()
CE.throwTo ChanThreadKilledException
finaliser (ThreadId -> IO ())
-> ((Channel, ThreadId) -> ThreadId)
-> (Channel, ThreadId)
-> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Channel, ThreadId) -> ThreadId
forall a b. (a, b) -> b
snd) ([(Channel, ThreadId)] -> IO ()) -> [(Channel, ThreadId)] -> IO ()
forall a b. (a -> b) -> a -> b
$ IntMap (Channel, ThreadId) -> [(Channel, ThreadId)]
forall a. IntMap a -> [a]
IM.elems IntMap (Channel, ThreadId)
x
IntMap (Channel, ThreadId) -> IO (IntMap (Channel, ThreadId))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return IntMap (Channel, ThreadId)
forall a. IntMap a
IM.empty
IO Bool -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Bool -> IO ()) -> IO Bool -> IO ()
forall a b. (a -> b) -> a -> b
$ MVar () -> () -> IO Bool
forall a. MVar a -> a -> IO Bool
tryPutMVar MVar ()
ccl ()
MVar [IO ()] -> ([IO ()] -> IO ()) -> IO ()
forall a b. MVar a -> (a -> IO b) -> IO b
withMVar MVar [IO ()]
cClosedHandlers [IO ()] -> IO ()
forall (t :: * -> *) (m :: * -> *) a.
(Foldable t, Monad m) =>
t (m a) -> m ()
sequence_
case Maybe Word16
heartbeatTimeout of
Maybe Word16
Nothing -> () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Just Word16
timeout -> do
ThreadId
heartbeatThread <- Connection -> Int -> ThreadId -> IO ThreadId
watchHeartbeats Connection
conn (Word16 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word16
timeout) ThreadId
connThreadId
Connection -> Bool -> IO () -> IO ()
addConnectionClosedHandler Connection
conn Bool
True (ThreadId -> IO ()
killThread ThreadId
heartbeatThread)
MVar (Maybe ThreadId)
-> (Maybe ThreadId -> IO (Maybe ThreadId)) -> IO ()
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ MVar (Maybe ThreadId)
cThread ((Maybe ThreadId -> IO (Maybe ThreadId)) -> IO ())
-> (Maybe ThreadId -> IO (Maybe ThreadId)) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Maybe ThreadId
_ -> Maybe ThreadId -> IO (Maybe ThreadId)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe ThreadId -> IO (Maybe ThreadId))
-> Maybe ThreadId -> IO (Maybe ThreadId)
forall a b. (a -> b) -> a -> b
$ ThreadId -> Maybe ThreadId
forall a. a -> Maybe a
Just ThreadId
connThreadId
Connection -> IO Connection
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Connection
conn
where
connect :: [SomeException] -> [([Char], PortNumber)] -> IO Connection
connect [SomeException]
excs (([Char]
host, PortNumber
port) : [([Char], PortNumber)]
rest) = do
ConnectionContext
ctx <- IO ConnectionContext
Conn.initConnectionContext
Either SomeException Connection
result <- IO Connection -> IO (Either SomeException Connection)
forall e a. Exception e => IO a -> IO (Either e a)
CE.try (IO Connection -> IO (Either SomeException Connection))
-> IO Connection -> IO (Either SomeException Connection)
forall a b. (a -> b) -> a -> b
$ ConnectionContext -> ConnectionParams -> IO Connection
Conn.connectTo ConnectionContext
ctx (ConnectionParams -> IO Connection)
-> ConnectionParams -> IO Connection
forall a b. (a -> b) -> a -> b
$ Conn.ConnectionParams
{ connectionHostname :: [Char]
Conn.connectionHostname = [Char]
host
, connectionPort :: PortNumber
Conn.connectionPort = PortNumber
port
, connectionUseSecure :: Maybe TLSSettings
Conn.connectionUseSecure = Maybe TLSSettings
tlsSettings
, connectionUseSocks :: Maybe ProxySettings
Conn.connectionUseSocks = Maybe ProxySettings
forall a. Maybe a
Nothing
}
(SomeException -> IO Connection)
-> (Connection -> IO Connection)
-> Either SomeException Connection
-> IO Connection
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either (\(SomeException
ex :: CE.SomeException) -> [SomeException] -> [([Char], PortNumber)] -> IO Connection
connect (SomeException
exSomeException -> [SomeException] -> [SomeException]
forall a. a -> [a] -> [a]
:[SomeException]
excs) [([Char], PortNumber)]
rest)
Connection -> IO Connection
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return
Either SomeException Connection
result
connect [SomeException]
excs [] = AMQPException -> IO Connection
forall e a. Exception e => e -> IO a
CE.throwIO (AMQPException -> IO Connection) -> AMQPException -> IO Connection
forall a b. (a -> b) -> a -> b
$ CloseType -> [Char] -> AMQPException
ConnectionClosedException CloseType
Abnormal ([Char] -> AMQPException) -> [Char] -> AMQPException
forall a b. (a -> b) -> a -> b
$ [Char]
"Could not connect to any of the provided brokers: " [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [(([Char], PortNumber), SomeException)] -> [Char]
forall a. Show a => a -> [Char]
show ([([Char], PortNumber)]
-> [SomeException] -> [(([Char], PortNumber), SomeException)]
forall a b. [a] -> [b] -> [(a, b)]
zip (ConnectionOpts -> [([Char], PortNumber)]
coServers ConnectionOpts
connOpts) ([SomeException] -> [SomeException]
forall a. [a] -> [a]
reverse [SomeException]
excs))
tlsSettings :: Maybe TLSSettings
tlsSettings = Maybe TLSSettings
-> (TLSSettings -> Maybe TLSSettings)
-> Maybe TLSSettings
-> Maybe TLSSettings
forall b a. b -> (a -> b) -> Maybe a -> b
maybe Maybe TLSSettings
forall a. Maybe a
Nothing TLSSettings -> Maybe TLSSettings
connectionTLSSettings (ConnectionOpts -> Maybe TLSSettings
coTLSSettings ConnectionOpts
connOpts)
selectSASLMechanism :: Connection -> ByteString -> IO SASLMechanism
selectSASLMechanism Connection
handle ByteString
serverMechanisms =
let serverSaslList :: [Text]
serverSaslList = (Char -> Bool) -> Text -> [Text]
T.split (Char -> Char -> Bool
forall a. Eq a => a -> a -> Bool
== Char
' ') (Text -> [Text]) -> Text -> [Text]
forall a b. (a -> b) -> a -> b
$ ByteString -> Text
E.decodeUtf8 ByteString
serverMechanisms
clientMechanisms :: [SASLMechanism]
clientMechanisms = ConnectionOpts -> [SASLMechanism]
coAuth ConnectionOpts
connOpts
clientSaslList :: [Text]
clientSaslList = (SASLMechanism -> Text) -> [SASLMechanism] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map SASLMechanism -> Text
saslName [SASLMechanism]
clientMechanisms
maybeSasl :: Maybe SASLMechanism
maybeSasl = (SASLMechanism -> Bool) -> [SASLMechanism] -> Maybe SASLMechanism
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Maybe a
F.find (\(SASLMechanism Text
name ByteString
_ Maybe (ByteString -> IO ByteString)
_) -> Text -> [Text] -> Bool
forall a. Eq a => a -> [a] -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
elem Text
name [Text]
serverSaslList) [SASLMechanism]
clientMechanisms
in Maybe SASLMechanism -> Connection -> [Char] -> IO SASLMechanism
forall {b}. Maybe b -> Connection -> [Char] -> IO b
abortIfNothing Maybe SASLMechanism
maybeSasl Connection
handle
([Char]
"None of the provided SASL mechanisms "[Char] -> ShowS
forall a. [a] -> [a] -> [a]
++[Text] -> [Char]
forall a. Show a => a -> [Char]
show [Text]
clientSaslList[Char] -> ShowS
forall a. [a] -> [a] -> [a]
++[Char]
" is supported by the server "[Char] -> ShowS
forall a. [a] -> [a] -> [a]
++[Text] -> [Char]
forall a. Show a => a -> [Char]
show [Text]
serverSaslList[Char] -> ShowS
forall a. [a] -> [a] -> [a]
++[Char]
".")
start_ok :: SASLMechanism -> Frame
start_ok SASLMechanism
sasl = Word16 -> FramePayload -> Frame
Frame Word16
0 (FramePayload -> Frame) -> FramePayload -> Frame
forall a b. (a -> b) -> a -> b
$ MethodPayload -> FramePayload
MethodPayload (MethodPayload -> FramePayload) -> MethodPayload -> FramePayload
forall a b. (a -> b) -> a -> b
$ FieldTable
-> ShortString -> LongString -> ShortString -> MethodPayload
Connection_start_ok
FieldTable
clientProperties
(Text -> ShortString
ShortString (Text -> ShortString) -> Text -> ShortString
forall a b. (a -> b) -> a -> b
$ SASLMechanism -> Text
saslName SASLMechanism
sasl)
(ByteString -> LongString
LongString (ByteString -> LongString) -> ByteString -> LongString
forall a b. (a -> b) -> a -> b
$ SASLMechanism -> ByteString
saslInitialResponse SASLMechanism
sasl)
(Text -> ShortString
ShortString Text
"en_US")
where
clientProperties :: FieldTable
clientProperties = Map Text FieldValue -> FieldTable
FieldTable (Map Text FieldValue -> FieldTable)
-> Map Text FieldValue -> FieldTable
forall a b. (a -> b) -> a -> b
$ [(Text, FieldValue)] -> Map Text FieldValue
forall k a. Ord k => [(k, a)] -> Map k a
M.fromList ([(Text, FieldValue)] -> Map Text FieldValue)
-> [(Text, FieldValue)] -> Map Text FieldValue
forall a b. (a -> b) -> a -> b
$ [
(Text
"platform", ByteString -> FieldValue
FVString ByteString
"Haskell"),
(Text
"version" , ByteString -> FieldValue
FVString (ByteString -> FieldValue)
-> ([Char] -> ByteString) -> [Char] -> FieldValue
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> ByteString
E.encodeUtf8 (Text -> ByteString) -> ([Char] -> Text) -> [Char] -> ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [Char] -> Text
T.pack ([Char] -> FieldValue) -> [Char] -> FieldValue
forall a b. (a -> b) -> a -> b
$ Version -> [Char]
showVersion Version
version),
(Text
"capabilities", FieldTable -> FieldValue
FVFieldTable FieldTable
clientCapabilities)
] [(Text, FieldValue)]
-> [(Text, FieldValue)] -> [(Text, FieldValue)]
forall a. [a] -> [a] -> [a]
++ [(Text, FieldValue)]
-> (Text -> [(Text, FieldValue)])
-> Maybe Text
-> [(Text, FieldValue)]
forall b a. b -> (a -> b) -> Maybe a -> b
maybe [] (\Text
x -> [(Text
"connection_name", ByteString -> FieldValue
FVString (ByteString -> FieldValue) -> ByteString -> FieldValue
forall a b. (a -> b) -> a -> b
$ Text -> ByteString
E.encodeUtf8 Text
x)]) (ConnectionOpts -> Maybe Text
coName ConnectionOpts
connOpts)
clientCapabilities :: FieldTable
clientCapabilities = Map Text FieldValue -> FieldTable
FieldTable (Map Text FieldValue -> FieldTable)
-> Map Text FieldValue -> FieldTable
forall a b. (a -> b) -> a -> b
$ [(Text, FieldValue)] -> Map Text FieldValue
forall k a. Ord k => [(k, a)] -> Map k a
M.fromList [
(Text
"consumer_cancel_notify", Bool -> FieldValue
FVBool Bool
True),
(Text
"connection.blocked", Bool -> FieldValue
FVBool Bool
True)
]
handleSecureUntilTune :: Connection -> SASLMechanism -> IO Frame
handleSecureUntilTune Connection
handle SASLMechanism
sasl = do
Frame
tuneOrSecure <- Connection -> IO Frame
readFrame Connection
handle
case Frame
tuneOrSecure of
Frame Word16
0 (MethodPayload (Connection_secure (LongString ByteString
challenge))) -> do
ByteString -> IO ByteString
processChallenge <- Maybe (ByteString -> IO ByteString)
-> Connection -> [Char] -> IO (ByteString -> IO ByteString)
forall {b}. Maybe b -> Connection -> [Char] -> IO b
abortIfNothing (SASLMechanism -> Maybe (ByteString -> IO ByteString)
saslChallengeFunc SASLMechanism
sasl)
Connection
handle ([Char] -> IO (ByteString -> IO ByteString))
-> [Char] -> IO (ByteString -> IO ByteString)
forall a b. (a -> b) -> a -> b
$ [Char]
"The server provided a challenge, but the selected SASL mechanism "[Char] -> ShowS
forall a. [a] -> [a] -> [a]
++Text -> [Char]
forall a. Show a => a -> [Char]
show (SASLMechanism -> Text
saslName SASLMechanism
sasl)[Char] -> ShowS
forall a. [a] -> [a] -> [a]
++[Char]
" is not equipped with a challenge processing function."
ByteString
challengeResponse <- ByteString -> IO ByteString
processChallenge ByteString
challenge
Connection -> Frame -> IO ()
writeFrame Connection
handle (Word16 -> FramePayload -> Frame
Frame Word16
0 (MethodPayload -> FramePayload
MethodPayload (LongString -> MethodPayload
Connection_secure_ok (ByteString -> LongString
LongString ByteString
challengeResponse))))
Connection -> SASLMechanism -> IO Frame
handleSecureUntilTune Connection
handle SASLMechanism
sasl
tune :: Frame
tune@(Frame Word16
0 (MethodPayload Connection_tune{})) -> Frame -> IO Frame
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Frame
tune
Frame
x -> [Char] -> IO Frame
forall a. HasCallStack => [Char] -> a
error ([Char] -> IO Frame) -> [Char] -> IO Frame
forall a b. (a -> b) -> a -> b
$ [Char]
"handleSecureUntilTune fail. received message: "[Char] -> ShowS
forall a. [a] -> [a] -> [a]
++Frame -> [Char]
forall a. Show a => a -> [Char]
show Frame
x
open :: Frame
open = Word16 -> FramePayload -> Frame
Frame Word16
0 (FramePayload -> Frame) -> FramePayload -> Frame
forall a b. (a -> b) -> a -> b
$ MethodPayload -> FramePayload
MethodPayload (MethodPayload -> FramePayload) -> MethodPayload -> FramePayload
forall a b. (a -> b) -> a -> b
$ ShortString -> ShortString -> Bool -> MethodPayload
Connection_open
(Text -> ShortString
ShortString (Text -> ShortString) -> Text -> ShortString
forall a b. (a -> b) -> a -> b
$ ConnectionOpts -> Text
coVHost ConnectionOpts
connOpts)
(Text -> ShortString
ShortString (Text -> ShortString) -> Text -> ShortString
forall a b. (a -> b) -> a -> b
$ [Char] -> Text
T.pack [Char]
"")
Bool
True
abortHandshake :: Connection -> [Char] -> IO b
abortHandshake Connection
handle [Char]
msg = do
Connection -> IO ()
Conn.connectionClose Connection
handle
AMQPException -> IO b
forall e a. Exception e => e -> IO a
CE.throwIO (AMQPException -> IO b) -> AMQPException -> IO b
forall a b. (a -> b) -> a -> b
$ CloseType -> [Char] -> AMQPException
ConnectionClosedException CloseType
Abnormal [Char]
msg
abortIfNothing :: Maybe b -> Connection -> [Char] -> IO b
abortIfNothing Maybe b
m Connection
handle [Char]
msg = case Maybe b
m of
Maybe b
Nothing -> Connection -> [Char] -> IO b
forall {b}. Connection -> [Char] -> IO b
abortHandshake Connection
handle [Char]
msg
Just b
a -> b -> IO b
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return b
a
watchHeartbeats :: Connection -> Int -> ThreadId -> IO ThreadId
watchHeartbeats :: Connection -> Int -> ThreadId -> IO ThreadId
watchHeartbeats Connection
conn Int
timeout ThreadId
connThread = Int -> IO () -> IO ThreadId
scheduleAtFixedRate Int
rate (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ do
IO ()
checkSendTimeout
IO ()
checkReceiveTimeout
where
rate :: Int
rate = Int
timeout Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1000 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
250
receiveTimeout :: ByteOffset
receiveTimeout = Int -> ByteOffset
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
rate ByteOffset -> ByteOffset -> ByteOffset
forall a. Num a => a -> a -> a
* ByteOffset
4 ByteOffset -> ByteOffset -> ByteOffset
forall a. Num a => a -> a -> a
* ByteOffset
2
sendTimeout :: ByteOffset
sendTimeout = Int -> ByteOffset
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
rate ByteOffset -> ByteOffset -> ByteOffset
forall a. Num a => a -> a -> a
* ByteOffset
2
skippedBeatEx :: AMQPException
skippedBeatEx = CloseType -> [Char] -> AMQPException
ConnectionClosedException CloseType
Abnormal [Char]
"killed connection after missing 2 consecutive heartbeats"
checkReceiveTimeout :: IO ()
checkReceiveTimeout = MVar ByteOffset -> ByteOffset -> IO () -> IO ()
doCheck (Connection -> MVar ByteOffset
connLastReceived Connection
conn) ByteOffset
receiveTimeout (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
Connection -> CloseType -> SomeException -> ThreadId -> IO ()
killConnection Connection
conn CloseType
Abnormal (AMQPException -> SomeException
forall e. Exception e => e -> SomeException
CE.toException AMQPException
skippedBeatEx) ThreadId
connThread
checkSendTimeout :: IO ()
checkSendTimeout = MVar ByteOffset -> ByteOffset -> IO () -> IO ()
doCheck (Connection -> MVar ByteOffset
connLastSent Connection
conn) ByteOffset
sendTimeout (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
Connection -> Frame -> IO ()
writeFrame (Connection -> Connection
connHandle Connection
conn) (Word16 -> FramePayload -> Frame
Frame Word16
0 FramePayload
HeartbeatPayload)
doCheck :: MVar ByteOffset -> ByteOffset -> IO () -> IO ()
doCheck MVar ByteOffset
var ByteOffset
timeout_µs IO ()
action = MVar ByteOffset -> (ByteOffset -> IO ()) -> IO ()
forall a b. MVar a -> (a -> IO b) -> IO b
withMVar MVar ByteOffset
var ((ByteOffset -> IO ()) -> IO ()) -> (ByteOffset -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \ByteOffset
lastFrameTime -> do
ByteOffset
time <- IO ByteOffset
getTimestamp
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (ByteOffset
time ByteOffset -> ByteOffset -> Bool
forall a. Ord a => a -> a -> Bool
>= ByteOffset
lastFrameTime ByteOffset -> ByteOffset -> ByteOffset
forall a. Num a => a -> a -> a
+ ByteOffset
timeout_µs) IO ()
action
updateLastSent :: Connection -> IO ()
updateLastSent :: Connection -> IO ()
updateLastSent Connection
conn = MVar ByteOffset -> (ByteOffset -> IO ByteOffset) -> IO ()
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ (Connection -> MVar ByteOffset
connLastSent Connection
conn) (IO ByteOffset -> ByteOffset -> IO ByteOffset
forall a b. a -> b -> a
const IO ByteOffset
getTimestamp)
updateLastReceived :: Connection -> IO ()
updateLastReceived :: Connection -> IO ()
updateLastReceived Connection
conn = MVar ByteOffset -> (ByteOffset -> IO ByteOffset) -> IO ()
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ (Connection -> MVar ByteOffset
connLastReceived Connection
conn) (IO ByteOffset -> ByteOffset -> IO ByteOffset
forall a b. a -> b -> a
const IO ByteOffset
getTimestamp)
killConnection :: Connection -> CloseType -> CE.SomeException -> ThreadId -> IO ()
killConnection :: Connection -> CloseType -> SomeException -> ThreadId -> IO ()
killConnection Connection
conn CloseType
closeType SomeException
ex ThreadId
connThreadId = do
MVar (Maybe (CloseType, [Char]))
-> (Maybe (CloseType, [Char]) -> IO (Maybe (CloseType, [Char])))
-> IO ()
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ (Connection -> MVar (Maybe (CloseType, [Char]))
connClosed Connection
conn) ((Maybe (CloseType, [Char]) -> IO (Maybe (CloseType, [Char])))
-> IO ())
-> (Maybe (CloseType, [Char]) -> IO (Maybe (CloseType, [Char])))
-> IO ()
forall a b. (a -> b) -> a -> b
$ IO (Maybe (CloseType, [Char]))
-> Maybe (CloseType, [Char]) -> IO (Maybe (CloseType, [Char]))
forall a b. a -> b -> a
const (IO (Maybe (CloseType, [Char]))
-> Maybe (CloseType, [Char]) -> IO (Maybe (CloseType, [Char])))
-> IO (Maybe (CloseType, [Char]))
-> Maybe (CloseType, [Char])
-> IO (Maybe (CloseType, [Char]))
forall a b. (a -> b) -> a -> b
$ Maybe (CloseType, [Char]) -> IO (Maybe (CloseType, [Char]))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe (CloseType, [Char]) -> IO (Maybe (CloseType, [Char])))
-> Maybe (CloseType, [Char]) -> IO (Maybe (CloseType, [Char]))
forall a b. (a -> b) -> a -> b
$ (CloseType, [Char]) -> Maybe (CloseType, [Char])
forall a. a -> Maybe a
Just (CloseType
closeType, SomeException -> [Char]
forall a. Show a => a -> [Char]
show SomeException
ex)
ThreadId -> SomeException -> IO ()
forall e. Exception e => ThreadId -> e -> IO ()
throwTo ThreadId
connThreadId SomeException
ex
closeConnection :: Connection -> IO ()
closeConnection :: Connection -> IO ()
closeConnection Connection
c = do
IO () -> (IOError -> IO ()) -> IO ()
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
CE.catch (
MVar () -> (() -> IO ()) -> IO ()
forall a b. MVar a -> (a -> IO b) -> IO b
withMVar (Connection -> MVar ()
connWriteLock Connection
c) ((() -> IO ()) -> IO ()) -> (() -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \()
_ -> Connection -> Frame -> IO ()
writeFrame (Connection -> Connection
connHandle Connection
c) (Frame -> IO ()) -> Frame -> IO ()
forall a b. (a -> b) -> a -> b
$ Word16 -> FramePayload -> Frame
Frame Word16
0 (FramePayload -> Frame) -> FramePayload -> Frame
forall a b. (a -> b) -> a -> b
$ MethodPayload -> FramePayload
MethodPayload (MethodPayload -> FramePayload) -> MethodPayload -> FramePayload
forall a b. (a -> b) -> a -> b
$ Word16 -> ShortString -> Word16 -> Word16 -> MethodPayload
Connection_close
Word16
0
(Text -> ShortString
ShortString Text
"")
Word16
0
Word16
0
)
(\ (IOError
e :: CE.IOException) -> do
Just ThreadId
thrID <- MVar (Maybe ThreadId) -> IO (Maybe ThreadId)
forall a. MVar a -> IO a
readMVar (Connection -> MVar (Maybe ThreadId)
connThread Connection
c)
Connection -> CloseType -> SomeException -> ThreadId -> IO ()
killConnection Connection
c CloseType
Abnormal (IOError -> SomeException
forall e. Exception e => e -> SomeException
CE.toException IOError
e) ThreadId
thrID
)
MVar () -> IO ()
forall a. MVar a -> IO a
readMVar (MVar () -> IO ()) -> MVar () -> IO ()
forall a b. (a -> b) -> a -> b
$ Connection -> MVar ()
connClosedLock Connection
c
() -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
getServerProperties :: Connection -> IO FieldTable
getServerProperties :: Connection -> IO FieldTable
getServerProperties = FieldTable -> IO FieldTable
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (FieldTable -> IO FieldTable)
-> (Connection -> FieldTable) -> Connection -> IO FieldTable
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Connection -> FieldTable
connServerProperties
addConnectionClosedHandler :: Connection -> Bool -> IO () -> IO ()
addConnectionClosedHandler :: Connection -> Bool -> IO () -> IO ()
addConnectionClosedHandler Connection
conn Bool
ifClosed IO ()
handler = do
MVar (Maybe (CloseType, [Char]))
-> (Maybe (CloseType, [Char]) -> IO ()) -> IO ()
forall a b. MVar a -> (a -> IO b) -> IO b
withMVar (Connection -> MVar (Maybe (CloseType, [Char]))
connClosed Connection
conn) ((Maybe (CloseType, [Char]) -> IO ()) -> IO ())
-> (Maybe (CloseType, [Char]) -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \case
Just (CloseType, [Char])
_ | Bool
ifClosed -> IO ()
handler
Maybe (CloseType, [Char])
_ -> MVar [IO ()] -> ([IO ()] -> IO [IO ()]) -> IO ()
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ (Connection -> MVar [IO ()]
connClosedHandlers Connection
conn) (([IO ()] -> IO [IO ()]) -> IO ())
-> ([IO ()] -> IO [IO ()]) -> IO ()
forall a b. (a -> b) -> a -> b
$ \[IO ()]
old -> [IO ()] -> IO [IO ()]
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ([IO ()] -> IO [IO ()]) -> [IO ()] -> IO [IO ()]
forall a b. (a -> b) -> a -> b
$ IO ()
handlerIO () -> [IO ()] -> [IO ()]
forall a. a -> [a] -> [a]
:[IO ()]
old
addConnectionBlockedHandler :: Connection -> (Text -> IO ()) -> IO () -> IO ()
addConnectionBlockedHandler :: Connection -> (Text -> IO ()) -> IO () -> IO ()
addConnectionBlockedHandler Connection
conn Text -> IO ()
blockedHandler IO ()
unblockedHandler =
MVar [(Text -> IO (), IO ())]
-> ([(Text -> IO (), IO ())] -> IO [(Text -> IO (), IO ())])
-> IO ()
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ (Connection -> MVar [(Text -> IO (), IO ())]
connBlockedHandlers Connection
conn) (([(Text -> IO (), IO ())] -> IO [(Text -> IO (), IO ())])
-> IO ())
-> ([(Text -> IO (), IO ())] -> IO [(Text -> IO (), IO ())])
-> IO ()
forall a b. (a -> b) -> a -> b
$ \[(Text -> IO (), IO ())]
old -> [(Text -> IO (), IO ())] -> IO [(Text -> IO (), IO ())]
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ([(Text -> IO (), IO ())] -> IO [(Text -> IO (), IO ())])
-> [(Text -> IO (), IO ())] -> IO [(Text -> IO (), IO ())]
forall a b. (a -> b) -> a -> b
$ (Text -> IO ()
blockedHandler, IO ()
unblockedHandler)(Text -> IO (), IO ())
-> [(Text -> IO (), IO ())] -> [(Text -> IO (), IO ())]
forall a. a -> [a] -> [a]
:[(Text -> IO (), IO ())]
old
readFrame :: Conn.Connection -> IO Frame
readFrame :: Connection -> IO Frame
readFrame Connection
handle = do
ByteString
strictDat <- Connection -> Int -> IO ByteString
connectionGetExact Connection
handle Int
7
let dat :: ByteString
dat = ByteString -> ByteString
toLazy ByteString
strictDat
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (ByteString -> Bool
BL.null ByteString
dat) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ IOError -> IO ()
forall e a. Exception e => e -> IO a
CE.throwIO (IOError -> IO ()) -> IOError -> IO ()
forall a b. (a -> b) -> a -> b
$ [Char] -> IOError
userError [Char]
"connection not open"
let len :: Int
len = Word32 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Word32 -> Int) -> Word32 -> Int
forall a b. (a -> b) -> a -> b
$ ByteString -> Word32
peekFrameSize ByteString
dat
ByteString
strictDat' <- Connection -> Int -> IO ByteString
connectionGetExact Connection
handle (Int
lenInt -> Int -> Int
forall a. Num a => a -> a -> a
+Int
1)
let dat' :: ByteString
dat' = ByteString -> ByteString
toLazy ByteString
strictDat'
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (ByteString -> Bool
BL.null ByteString
dat') (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ IOError -> IO ()
forall e a. Exception e => e -> IO a
CE.throwIO (IOError -> IO ()) -> IOError -> IO ()
forall a b. (a -> b) -> a -> b
$ [Char] -> IOError
userError [Char]
"connection not open"
#if MIN_VERSION_binary(0, 7, 0)
let ret :: Either
(ByteString, ByteOffset, [Char]) (ByteString, ByteOffset, Frame)
ret = Get Frame
-> ByteString
-> Either
(ByteString, ByteOffset, [Char]) (ByteString, ByteOffset, Frame)
forall a.
Get a
-> ByteString
-> Either
(ByteString, ByteOffset, [Char]) (ByteString, ByteOffset, a)
runGetOrFail Get Frame
forall t. Binary t => Get t
get (ByteString -> ByteString -> ByteString
BL.append ByteString
dat ByteString
dat')
case Either
(ByteString, ByteOffset, [Char]) (ByteString, ByteOffset, Frame)
ret of
Left (ByteString
_, ByteOffset
_, [Char]
errMsg) -> [Char] -> IO Frame
forall a. HasCallStack => [Char] -> a
error ([Char] -> IO Frame) -> [Char] -> IO Frame
forall a b. (a -> b) -> a -> b
$ [Char]
"readFrame fail: " [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
errMsg
Right (ByteString
_, ByteOffset
consumedBytes, Frame
_) | ByteOffset
consumedBytes ByteOffset -> ByteOffset -> Bool
forall a. Eq a => a -> a -> Bool
/= Int -> ByteOffset
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int
lenInt -> Int -> Int
forall a. Num a => a -> a -> a
+Int
8) ->
[Char] -> IO Frame
forall a. HasCallStack => [Char] -> a
error ([Char] -> IO Frame) -> [Char] -> IO Frame
forall a b. (a -> b) -> a -> b
$ [Char]
"readFrame: parser should read " [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> [Char]
forall a. Show a => a -> [Char]
show (Int
lenInt -> Int -> Int
forall a. Num a => a -> a -> a
+Int
8) [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
" bytes; but read " [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ ByteOffset -> [Char]
forall a. Show a => a -> [Char]
show ByteOffset
consumedBytes
Right (ByteString
_, ByteOffset
_, Frame
frame) -> Frame -> IO Frame
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Frame
frame
#else
let (frame, _, consumedBytes) = runGetState get (BL.append dat dat') 0
if consumedBytes /= fromIntegral (len+8)
then error $ "readFrameSock: parser should read "++show (len+8)++" bytes; but read "++show consumedBytes
else return ()
return frame
#endif
connectionGetExact :: Conn.Connection -> Int -> IO BS.ByteString
connectionGetExact :: Connection -> Int -> IO ByteString
connectionGetExact Connection
conn Int
x = ByteString -> Int -> IO ByteString
loop ByteString
BS.empty Int
0
where loop :: ByteString -> Int -> IO ByteString
loop ByteString
bs Int
y
| Int
y Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
x = ByteString -> IO ByteString
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ByteString
bs
| Bool
otherwise = do
ByteString
next <- Connection -> Int -> IO ByteString
Conn.connectionGet Connection
conn (Int
x Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
y)
ByteString -> Int -> IO ByteString
loop (ByteString -> ByteString -> ByteString
BS.append ByteString
bs ByteString
next) (Int
y Int -> Int -> Int
forall a. Num a => a -> a -> a
+ ByteString -> Int
BS.length ByteString
next)
writeFrame :: Conn.Connection -> Frame -> IO ()
writeFrame :: Connection -> Frame -> IO ()
writeFrame Connection
handle Frame
f = do
Connection -> ByteString -> IO ()
Conn.connectionPut Connection
handle (ByteString -> IO ()) -> (Frame -> ByteString) -> Frame -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> ByteString
toStrict (ByteString -> ByteString)
-> (Frame -> ByteString) -> Frame -> ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Put -> ByteString
runPut (Put -> ByteString) -> (Frame -> Put) -> Frame -> ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Frame -> Put
forall t. Binary t => t -> Put
put (Frame -> IO ()) -> Frame -> IO ()
forall a b. (a -> b) -> a -> b
$ Frame
f
data Channel = Channel {
Channel -> Connection
connection :: Connection,
Channel -> Chan FramePayload
inQueue :: Chan FramePayload,
Channel -> MVar (Seq (MVar Assembly))
outstandingResponses :: MVar (Seq.Seq (MVar Assembly)),
Channel -> Word16
channelID :: Word16,
Channel -> MVar Int
lastConsumerTag :: MVar Int,
Channel -> MVar Int
nextPublishSeqNum :: MVar Int,
Channel -> TVar IntSet
unconfirmedSet :: TVar IntSet.IntSet,
Channel -> TVar IntSet
ackedSet :: TVar IntSet.IntSet,
Channel -> TVar IntSet
nackedSet :: TVar IntSet.IntSet,
Channel -> Lock
chanActive :: Lock,
Channel -> MVar (Maybe (CloseType, [Char]))
chanClosed :: MVar (Maybe (CloseType, String)),
Channel
-> MVar (Map Text ((Message, Envelope) -> IO (), Text -> IO ()))
consumers :: MVar (M.Map Text ((Message, Envelope) -> IO (),
ConsumerTag -> IO ())),
Channel -> MVar [(Message, PublishError) -> IO ()]
returnListeners :: MVar [(Message, PublishError) -> IO ()],
Channel -> MVar [(LongLongInt, Bool, AckType) -> IO ()]
confirmListeners :: MVar [(Word64, Bool, AckType) -> IO ()],
Channel -> MVar [SomeException -> IO ()]
chanExceptionHandlers :: MVar [CE.SomeException -> IO ()]
}
data ChanThreadKilledException = ChanThreadKilledException { ChanThreadKilledException -> SomeException
cause :: CE.SomeException }
deriving (Int -> ChanThreadKilledException -> ShowS
[ChanThreadKilledException] -> ShowS
ChanThreadKilledException -> [Char]
(Int -> ChanThreadKilledException -> ShowS)
-> (ChanThreadKilledException -> [Char])
-> ([ChanThreadKilledException] -> ShowS)
-> Show ChanThreadKilledException
forall a.
(Int -> a -> ShowS) -> (a -> [Char]) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> ChanThreadKilledException -> ShowS
showsPrec :: Int -> ChanThreadKilledException -> ShowS
$cshow :: ChanThreadKilledException -> [Char]
show :: ChanThreadKilledException -> [Char]
$cshowList :: [ChanThreadKilledException] -> ShowS
showList :: [ChanThreadKilledException] -> ShowS
Show)
instance CE.Exception ChanThreadKilledException
unwrapChanThreadKilledException :: CE.SomeException -> CE.SomeException
unwrapChanThreadKilledException :: SomeException -> SomeException
unwrapChanThreadKilledException SomeException
e = SomeException
-> (ChanThreadKilledException -> SomeException)
-> Maybe ChanThreadKilledException
-> SomeException
forall b a. b -> (a -> b) -> Maybe a -> b
maybe SomeException
e ChanThreadKilledException -> SomeException
cause (Maybe ChanThreadKilledException -> SomeException)
-> Maybe ChanThreadKilledException -> SomeException
forall a b. (a -> b) -> a -> b
$ SomeException -> Maybe ChanThreadKilledException
forall e. Exception e => SomeException -> Maybe e
CE.fromException SomeException
e
msgFromContentHeaderProperties :: ContentHeaderProperties -> BL.ByteString -> Message
(CHBasic Maybe ShortString
content_type Maybe ShortString
content_encoding Maybe FieldTable
headers Maybe Word8
delivery_mode Maybe Word8
priority Maybe ShortString
correlation_id Maybe ShortString
reply_to Maybe ShortString
expiration Maybe ShortString
message_id Maybe LongLongInt
timestamp Maybe ShortString
message_type Maybe ShortString
user_id Maybe ShortString
application_id Maybe ShortString
cluster_id) ByteString
body =
let msgId :: Maybe Text
msgId = Maybe ShortString -> Maybe Text
fromShortString Maybe ShortString
message_id
contentType :: Maybe Text
contentType = Maybe ShortString -> Maybe Text
fromShortString Maybe ShortString
content_type
contentEncoding :: Maybe Text
contentEncoding = Maybe ShortString -> Maybe Text
fromShortString Maybe ShortString
content_encoding
replyTo :: Maybe Text
replyTo = Maybe ShortString -> Maybe Text
fromShortString Maybe ShortString
reply_to
correlationID :: Maybe Text
correlationID = Maybe ShortString -> Maybe Text
fromShortString Maybe ShortString
correlation_id
messageType :: Maybe Text
messageType = Maybe ShortString -> Maybe Text
fromShortString Maybe ShortString
message_type
userId :: Maybe Text
userId = Maybe ShortString -> Maybe Text
fromShortString Maybe ShortString
user_id
applicationId :: Maybe Text
applicationId = Maybe ShortString -> Maybe Text
fromShortString Maybe ShortString
application_id
clusterId :: Maybe Text
clusterId = Maybe ShortString -> Maybe Text
fromShortString Maybe ShortString
cluster_id
in ByteString
-> Maybe DeliveryMode
-> Maybe LongLongInt
-> Maybe Text
-> Maybe Text
-> Maybe Text
-> Maybe Text
-> Maybe Text
-> Maybe Text
-> Maybe Text
-> Maybe Text
-> Maybe Word8
-> Maybe Text
-> Maybe Text
-> Maybe FieldTable
-> Message
Message ByteString
body ((Word8 -> DeliveryMode) -> Maybe Word8 -> Maybe DeliveryMode
forall a b. (a -> b) -> Maybe a -> Maybe b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Word8 -> DeliveryMode
intToDeliveryMode Maybe Word8
delivery_mode) Maybe LongLongInt
timestamp Maybe Text
msgId Maybe Text
messageType Maybe Text
userId Maybe Text
applicationId Maybe Text
clusterId Maybe Text
contentType Maybe Text
contentEncoding Maybe Text
replyTo Maybe Word8
priority Maybe Text
correlationID (Maybe ShortString -> Maybe Text
fromShortString Maybe ShortString
expiration) Maybe FieldTable
headers
where
fromShortString :: Maybe ShortString -> Maybe Text
fromShortString (Just (ShortString Text
s)) = Text -> Maybe Text
forall a. a -> Maybe a
Just Text
s
fromShortString Maybe ShortString
_ = Maybe Text
forall a. Maybe a
Nothing
msgFromContentHeaderProperties ContentHeaderProperties
c ByteString
_ = [Char] -> Message
forall a. HasCallStack => [Char] -> a
error ([Char]
"Unknown content header properties: " [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ ContentHeaderProperties -> [Char]
forall a. Show a => a -> [Char]
show ContentHeaderProperties
c)
channelReceiver :: Channel -> IO ()
channelReceiver :: Channel -> IO ()
channelReceiver Channel
chan = do
Assembly
p <- Chan FramePayload -> IO Assembly
readAssembly (Chan FramePayload -> IO Assembly)
-> Chan FramePayload -> IO Assembly
forall a b. (a -> b) -> a -> b
$ Channel -> Chan FramePayload
inQueue Channel
chan
if Assembly -> Bool
isResponse Assembly
p
then do
IO ()
action <- MVar (Seq (MVar Assembly))
-> (Seq (MVar Assembly) -> IO (Seq (MVar Assembly), IO ()))
-> IO (IO ())
forall a b. MVar a -> (a -> IO (a, b)) -> IO b
modifyMVar (Channel -> MVar (Seq (MVar Assembly))
outstandingResponses Channel
chan) ((Seq (MVar Assembly) -> IO (Seq (MVar Assembly), IO ()))
-> IO (IO ()))
-> (Seq (MVar Assembly) -> IO (Seq (MVar Assembly), IO ()))
-> IO (IO ())
forall a b. (a -> b) -> a -> b
$ \Seq (MVar Assembly)
val -> do
case Seq (MVar Assembly) -> ViewL (MVar Assembly)
forall a. Seq a -> ViewL a
Seq.viewl Seq (MVar Assembly)
val of
MVar Assembly
x Seq.:< Seq (MVar Assembly)
rest -> do
(Seq (MVar Assembly), IO ()) -> IO (Seq (MVar Assembly), IO ())
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Seq (MVar Assembly)
rest, MVar Assembly -> Assembly -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar Assembly
x Assembly
p)
ViewL (MVar Assembly)
Seq.EmptyL -> do
(Seq (MVar Assembly), IO ()) -> IO (Seq (MVar Assembly), IO ())
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Seq (MVar Assembly)
val, IOError -> IO ()
forall e a. Exception e => e -> IO a
CE.throwIO (IOError -> IO ()) -> IOError -> IO ()
forall a b. (a -> b) -> a -> b
$ [Char] -> IOError
userError [Char]
"got response, but have no corresponding request")
IO ()
action
else Assembly -> IO ()
handleAsync Assembly
p
Channel -> IO ()
channelReceiver Channel
chan
where
isResponse :: Assembly -> Bool
isResponse :: Assembly -> Bool
isResponse (ContentMethod Basic_deliver{} ContentHeaderProperties
_ ByteString
_) = Bool
False
isResponse (ContentMethod Basic_return{} ContentHeaderProperties
_ ByteString
_) = Bool
False
isResponse (SimpleMethod (Channel_flow Bool
_)) = Bool
False
isResponse (SimpleMethod Channel_close{}) = Bool
False
isResponse (SimpleMethod (Basic_ack LongLongInt
_ Bool
_)) = Bool
False
isResponse (SimpleMethod Basic_nack{}) = Bool
False
isResponse (SimpleMethod (Basic_cancel ShortString
_ Bool
_)) = Bool
False
isResponse Assembly
_ = Bool
True
handleAsync :: Assembly -> IO ()
handleAsync (ContentMethod (Basic_deliver (ShortString Text
consumerTag) LongLongInt
deliveryTag Bool
redelivered (ShortString Text
exchange)
(ShortString Text
routingKey))
ContentHeaderProperties
properties ByteString
body) =
MVar (Map Text ((Message, Envelope) -> IO (), Text -> IO ()))
-> (Map Text ((Message, Envelope) -> IO (), Text -> IO ())
-> IO ())
-> IO ()
forall a b. MVar a -> (a -> IO b) -> IO b
withMVar (Channel
-> MVar (Map Text ((Message, Envelope) -> IO (), Text -> IO ()))
consumers Channel
chan) (\Map Text ((Message, Envelope) -> IO (), Text -> IO ())
s -> do
case Text
-> Map Text ((Message, Envelope) -> IO (), Text -> IO ())
-> Maybe ((Message, Envelope) -> IO (), Text -> IO ())
forall k a. Ord k => k -> Map k a -> Maybe a
M.lookup Text
consumerTag Map Text ((Message, Envelope) -> IO (), Text -> IO ())
s of
Just ((Message, Envelope) -> IO ()
subscriber, Text -> IO ()
_) -> do
let msg :: Message
msg = ContentHeaderProperties -> ByteString -> Message
msgFromContentHeaderProperties ContentHeaderProperties
properties ByteString
body
let env :: Envelope
env = Envelope {envDeliveryTag :: LongLongInt
envDeliveryTag = LongLongInt
deliveryTag, envRedelivered :: Bool
envRedelivered = Bool
redelivered,
envExchangeName :: Text
envExchangeName = Text
exchange, envRoutingKey :: Text
envRoutingKey = Text
routingKey, envChannel :: Channel
envChannel = Channel
chan}
IO () -> [Handler ()] -> IO ()
forall a. IO a -> [Handler a] -> IO a
CE.catches ((Message, Envelope) -> IO ()
subscriber (Message
msg, Envelope
env)) [
(ChanThreadKilledException -> IO ()) -> Handler ()
forall a e. Exception e => (e -> IO a) -> Handler a
CE.Handler (\(ChanThreadKilledException
e::ChanThreadKilledException) -> SomeException -> IO ()
forall e a. Exception e => e -> IO a
CE.throwIO (SomeException -> IO ()) -> SomeException -> IO ()
forall a b. (a -> b) -> a -> b
$ ChanThreadKilledException -> SomeException
cause ChanThreadKilledException
e),
(SomeException -> IO ()) -> Handler ()
forall a e. Exception e => (e -> IO a) -> Handler a
CE.Handler (\(SomeException
e::CE.SomeException) -> Handle -> [Char] -> IO ()
hPutStrLn Handle
stderr ([Char] -> IO ()) -> [Char] -> IO ()
forall a b. (a -> b) -> a -> b
$ [Char]
"AMQP callback threw exception: " [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ SomeException -> [Char]
forall a. Show a => a -> [Char]
show SomeException
e)
]
Maybe ((Message, Envelope) -> IO (), Text -> IO ())
Nothing ->
() -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
)
handleAsync (SimpleMethod (Channel_close Word16
_ (ShortString Text
errorMsg) Word16
_ Word16
_)) = do
IO () -> (IOError -> IO ()) -> IO ()
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
CE.catch (Channel -> Assembly -> IO ()
writeAssembly' Channel
chan (MethodPayload -> Assembly
SimpleMethod MethodPayload
Channel_close_ok))
(\ (IOError
_ :: CE.IOException) ->
() -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
)
Channel -> CloseType -> Text -> IO ()
closeChannel' Channel
chan CloseType
Abnormal Text
errorMsg
IO ThreadId
myThreadId IO ThreadId -> (ThreadId -> IO ()) -> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (ThreadId -> AMQPException -> IO ())
-> AMQPException -> ThreadId -> IO ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip ThreadId -> AMQPException -> IO ()
forall e. Exception e => ThreadId -> e -> IO ()
CE.throwTo (CloseType -> [Char] -> AMQPException
ChannelClosedException CloseType
Abnormal ([Char] -> AMQPException)
-> (Text -> [Char]) -> Text -> AMQPException
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> [Char]
T.unpack (Text -> AMQPException) -> Text -> AMQPException
forall a b. (a -> b) -> a -> b
$ Text
errorMsg)
handleAsync (SimpleMethod (Channel_flow Bool
active)) = do
if Bool
active
then Lock -> IO ()
openLock (Lock -> IO ()) -> Lock -> IO ()
forall a b. (a -> b) -> a -> b
$ Channel -> Lock
chanActive Channel
chan
else Lock -> IO ()
closeLock (Lock -> IO ()) -> Lock -> IO ()
forall a b. (a -> b) -> a -> b
$ Channel -> Lock
chanActive Channel
chan
() -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
handleAsync (ContentMethod basicReturn :: MethodPayload
basicReturn@Basic_return{} ContentHeaderProperties
props ByteString
body) = do
let msg :: Message
msg = ContentHeaderProperties -> ByteString -> Message
msgFromContentHeaderProperties ContentHeaderProperties
props ByteString
body
pubError :: PublishError
pubError = MethodPayload -> PublishError
basicReturnToPublishError MethodPayload
basicReturn
MVar [(Message, PublishError) -> IO ()]
-> ([(Message, PublishError) -> IO ()] -> IO ()) -> IO ()
forall a b. MVar a -> (a -> IO b) -> IO b
withMVar (Channel -> MVar [(Message, PublishError) -> IO ()]
returnListeners Channel
chan) (([(Message, PublishError) -> IO ()] -> IO ()) -> IO ())
-> ([(Message, PublishError) -> IO ()] -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \[(Message, PublishError) -> IO ()]
listeners ->
[(Message, PublishError) -> IO ()]
-> (((Message, PublishError) -> IO ()) -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [(Message, PublishError) -> IO ()]
listeners ((((Message, PublishError) -> IO ()) -> IO ()) -> IO ())
-> (((Message, PublishError) -> IO ()) -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \(Message, PublishError) -> IO ()
l -> IO () -> (SomeException -> IO ()) -> IO ()
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
CE.catch ((Message, PublishError) -> IO ()
l (Message
msg, PublishError
pubError)) ((SomeException -> IO ()) -> IO ())
-> (SomeException -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \(SomeException
ex :: CE.SomeException) ->
Handle -> [Char] -> IO ()
hPutStrLn Handle
stderr ([Char] -> IO ()) -> [Char] -> IO ()
forall a b. (a -> b) -> a -> b
$ [Char]
"return listener on channel ["[Char] -> ShowS
forall a. [a] -> [a] -> [a]
++Word16 -> [Char]
forall a. Show a => a -> [Char]
show (Channel -> Word16
channelID Channel
chan)[Char] -> ShowS
forall a. [a] -> [a] -> [a]
++[Char]
"] handling error ["[Char] -> ShowS
forall a. [a] -> [a] -> [a]
++PublishError -> [Char]
forall a. Show a => a -> [Char]
show PublishError
pubError[Char] -> ShowS
forall a. [a] -> [a] -> [a]
++[Char]
"] threw exception: "[Char] -> ShowS
forall a. [a] -> [a] -> [a]
++SomeException -> [Char]
forall a. Show a => a -> [Char]
show SomeException
ex
handleAsync (SimpleMethod (Basic_ack LongLongInt
deliveryTag Bool
multiple)) = LongLongInt -> Bool -> AckType -> IO ()
handleConfirm LongLongInt
deliveryTag Bool
multiple AckType
BasicAck
handleAsync (SimpleMethod (Basic_nack LongLongInt
deliveryTag Bool
multiple Bool
_)) = LongLongInt -> Bool -> AckType -> IO ()
handleConfirm LongLongInt
deliveryTag Bool
multiple AckType
BasicNack
handleAsync (SimpleMethod (Basic_cancel ShortString
consumerTag Bool
_)) = ShortString -> IO ()
handleCancel ShortString
consumerTag
handleAsync Assembly
m = [Char] -> IO ()
forall a. HasCallStack => [Char] -> a
error ([Char]
"Unknown method: " [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ Assembly -> [Char]
forall a. Show a => a -> [Char]
show Assembly
m)
handleConfirm :: LongLongInt -> Bool -> AckType -> IO ()
handleConfirm LongLongInt
deliveryTag Bool
multiple AckType
k = do
MVar [(LongLongInt, Bool, AckType) -> IO ()]
-> ([(LongLongInt, Bool, AckType) -> IO ()] -> IO ()) -> IO ()
forall a b. MVar a -> (a -> IO b) -> IO b
withMVar (Channel -> MVar [(LongLongInt, Bool, AckType) -> IO ()]
confirmListeners Channel
chan) (([(LongLongInt, Bool, AckType) -> IO ()] -> IO ()) -> IO ())
-> ([(LongLongInt, Bool, AckType) -> IO ()] -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \[(LongLongInt, Bool, AckType) -> IO ()]
listeners ->
[(LongLongInt, Bool, AckType) -> IO ()]
-> (((LongLongInt, Bool, AckType) -> IO ()) -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [(LongLongInt, Bool, AckType) -> IO ()]
listeners ((((LongLongInt, Bool, AckType) -> IO ()) -> IO ()) -> IO ())
-> (((LongLongInt, Bool, AckType) -> IO ()) -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \(LongLongInt, Bool, AckType) -> IO ()
l -> IO () -> (SomeException -> IO ()) -> IO ()
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
CE.catch ((LongLongInt, Bool, AckType) -> IO ()
l (LongLongInt
deliveryTag, Bool
multiple, AckType
k)) ((SomeException -> IO ()) -> IO ())
-> (SomeException -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \(SomeException
ex :: CE.SomeException) ->
Handle -> [Char] -> IO ()
hPutStrLn Handle
stderr ([Char] -> IO ()) -> [Char] -> IO ()
forall a b. (a -> b) -> a -> b
$ [Char]
"confirm listener on channel ["[Char] -> ShowS
forall a. [a] -> [a] -> [a]
++Word16 -> [Char]
forall a. Show a => a -> [Char]
show (Channel -> Word16
channelID Channel
chan)[Char] -> ShowS
forall a. [a] -> [a] -> [a]
++[Char]
"] handling method "[Char] -> ShowS
forall a. [a] -> [a] -> [a]
++AckType -> [Char]
forall a. Show a => a -> [Char]
show AckType
k[Char] -> ShowS
forall a. [a] -> [a] -> [a]
++[Char]
" threw exception: "[Char] -> ShowS
forall a. [a] -> [a] -> [a]
++SomeException -> [Char]
forall a. Show a => a -> [Char]
show SomeException
ex
let seqNum :: Int
seqNum = LongLongInt -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral LongLongInt
deliveryTag
let targetSet :: TVar IntSet
targetSet = case AckType
k of
AckType
BasicAck -> Channel -> TVar IntSet
ackedSet Channel
chan
AckType
BasicNack -> Channel -> TVar IntSet
nackedSet Channel
chan
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
IntSet
unconfSet <- TVar IntSet -> STM IntSet
forall a. TVar a -> STM a
readTVar (Channel -> TVar IntSet
unconfirmedSet Channel
chan)
let (IntSet -> IntSet
merge, IntSet
pending) = if Bool
multiple
then (IntSet -> IntSet -> IntSet
IntSet.union IntSet
confs, IntSet
pending')
else (Int -> IntSet -> IntSet
IntSet.insert Int
seqNum, Int -> IntSet -> IntSet
IntSet.delete Int
seqNum IntSet
unconfSet)
where
confs :: IntSet
confs = (IntSet, IntSet) -> IntSet
forall a b. (a, b) -> a
fst (IntSet, IntSet)
parts
pending' :: IntSet
pending' = (IntSet, IntSet) -> IntSet
forall a b. (a, b) -> b
snd (IntSet, IntSet)
parts
parts :: (IntSet, IntSet)
parts = (Int -> Bool) -> IntSet -> (IntSet, IntSet)
IntSet.partition (\Int
n -> Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
seqNum) IntSet
unconfSet
TVar IntSet -> (IntSet -> IntSet) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar IntSet
targetSet (\IntSet
ts -> IntSet -> IntSet
merge IntSet
ts)
TVar IntSet -> IntSet -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar (Channel -> TVar IntSet
unconfirmedSet Channel
chan) IntSet
pending
handleCancel :: ShortString -> IO ()
handleCancel (ShortString Text
consumerTag) =
MVar (Map Text ((Message, Envelope) -> IO (), Text -> IO ()))
-> (Map Text ((Message, Envelope) -> IO (), Text -> IO ())
-> IO ())
-> IO ()
forall a b. MVar a -> (a -> IO b) -> IO b
withMVar (Channel
-> MVar (Map Text ((Message, Envelope) -> IO (), Text -> IO ()))
consumers Channel
chan) (\Map Text ((Message, Envelope) -> IO (), Text -> IO ())
s -> do
case Text
-> Map Text ((Message, Envelope) -> IO (), Text -> IO ())
-> Maybe ((Message, Envelope) -> IO (), Text -> IO ())
forall k a. Ord k => k -> Map k a -> Maybe a
M.lookup Text
consumerTag Map Text ((Message, Envelope) -> IO (), Text -> IO ())
s of
Just ((Message, Envelope) -> IO ()
_, Text -> IO ()
cancelCB) ->
IO () -> (SomeException -> IO ()) -> IO ()
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
CE.catch (Text -> IO ()
cancelCB Text
consumerTag) ((SomeException -> IO ()) -> IO ())
-> (SomeException -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \(SomeException
ex :: CE.SomeException) ->
Handle -> [Char] -> IO ()
hPutStrLn Handle
stderr ([Char] -> IO ()) -> [Char] -> IO ()
forall a b. (a -> b) -> a -> b
$ [Char]
"consumer cancellation listener "[Char] -> ShowS
forall a. [a] -> [a] -> [a]
++Text -> [Char]
forall a. Show a => a -> [Char]
show Text
consumerTag[Char] -> ShowS
forall a. [a] -> [a] -> [a]
++[Char]
" on channel ["[Char] -> ShowS
forall a. [a] -> [a] -> [a]
++Word16 -> [Char]
forall a. Show a => a -> [Char]
show (Channel -> Word16
channelID Channel
chan)[Char] -> ShowS
forall a. [a] -> [a] -> [a]
++[Char]
"] threw exception: "[Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ SomeException -> [Char]
forall a. Show a => a -> [Char]
show SomeException
ex
Maybe ((Message, Envelope) -> IO (), Text -> IO ())
Nothing ->
() -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
)
basicReturnToPublishError :: MethodPayload -> PublishError
basicReturnToPublishError (Basic_return Word16
code (ShortString Text
errText) (ShortString Text
exchange) (ShortString Text
routingKey)) =
let replyError :: ReturnReplyCode
replyError = case Word16
code of
Word16
312 -> Text -> ReturnReplyCode
Unroutable Text
errText
Word16
313 -> Text -> ReturnReplyCode
NoConsumers Text
errText
Word16
404 -> Text -> ReturnReplyCode
NotFound Text
errText
Word16
num -> [Char] -> ReturnReplyCode
forall a. HasCallStack => [Char] -> a
error ([Char] -> ReturnReplyCode) -> [Char] -> ReturnReplyCode
forall a b. (a -> b) -> a -> b
$ [Char]
"unexpected return error code: " [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++Word16 -> [Char]
forall a. Show a => a -> [Char]
show Word16
num
pubError :: PublishError
pubError = ReturnReplyCode -> Maybe Text -> Text -> PublishError
PublishError ReturnReplyCode
replyError (Text -> Maybe Text
forall a. a -> Maybe a
Just Text
exchange) Text
routingKey
in PublishError
pubError
basicReturnToPublishError MethodPayload
x = [Char] -> PublishError
forall a. HasCallStack => [Char] -> a
error ([Char] -> PublishError) -> [Char] -> PublishError
forall a b. (a -> b) -> a -> b
$ [Char]
"basicReturnToPublishError fail: "[Char] -> ShowS
forall a. [a] -> [a] -> [a]
++MethodPayload -> [Char]
forall a. Show a => a -> [Char]
show MethodPayload
x
addReturnListener :: Channel -> ((Message, PublishError) -> IO ()) -> IO ()
addReturnListener :: Channel -> ((Message, PublishError) -> IO ()) -> IO ()
addReturnListener Channel
chan (Message, PublishError) -> IO ()
listener = do
MVar [(Message, PublishError) -> IO ()]
-> ([(Message, PublishError) -> IO ()]
-> IO [(Message, PublishError) -> IO ()])
-> IO ()
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ (Channel -> MVar [(Message, PublishError) -> IO ()]
returnListeners Channel
chan) (([(Message, PublishError) -> IO ()]
-> IO [(Message, PublishError) -> IO ()])
-> IO ())
-> ([(Message, PublishError) -> IO ()]
-> IO [(Message, PublishError) -> IO ()])
-> IO ()
forall a b. (a -> b) -> a -> b
$ \[(Message, PublishError) -> IO ()]
listeners -> [(Message, PublishError) -> IO ()]
-> IO [(Message, PublishError) -> IO ()]
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ([(Message, PublishError) -> IO ()]
-> IO [(Message, PublishError) -> IO ()])
-> [(Message, PublishError) -> IO ()]
-> IO [(Message, PublishError) -> IO ()]
forall a b. (a -> b) -> a -> b
$ (Message, PublishError) -> IO ()
listener((Message, PublishError) -> IO ())
-> [(Message, PublishError) -> IO ()]
-> [(Message, PublishError) -> IO ()]
forall a. a -> [a] -> [a]
:[(Message, PublishError) -> IO ()]
listeners
addChannelExceptionHandler :: Channel -> (CE.SomeException -> IO ()) -> IO ()
addChannelExceptionHandler :: Channel -> (SomeException -> IO ()) -> IO ()
addChannelExceptionHandler Channel
chan SomeException -> IO ()
handler = do
MVar [SomeException -> IO ()]
-> ([SomeException -> IO ()] -> IO [SomeException -> IO ()])
-> IO ()
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ (Channel -> MVar [SomeException -> IO ()]
chanExceptionHandlers Channel
chan) (([SomeException -> IO ()] -> IO [SomeException -> IO ()])
-> IO ())
-> ([SomeException -> IO ()] -> IO [SomeException -> IO ()])
-> IO ()
forall a b. (a -> b) -> a -> b
$ \[SomeException -> IO ()]
handlers -> [SomeException -> IO ()] -> IO [SomeException -> IO ()]
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ([SomeException -> IO ()] -> IO [SomeException -> IO ()])
-> [SomeException -> IO ()] -> IO [SomeException -> IO ()]
forall a b. (a -> b) -> a -> b
$ SomeException -> IO ()
handler(SomeException -> IO ())
-> [SomeException -> IO ()] -> [SomeException -> IO ()]
forall a. a -> [a] -> [a]
:[SomeException -> IO ()]
handlers
isNormalChannelClose :: CE.SomeException -> Bool
isNormalChannelClose :: SomeException -> Bool
isNormalChannelClose SomeException
e = case SomeException -> Maybe AMQPException
forall e. Exception e => SomeException -> Maybe e
CE.fromException SomeException
e :: Maybe AMQPException of
Just (ChannelClosedException CloseType
Normal [Char]
_) -> Bool
True
Just (ConnectionClosedException CloseType
Normal [Char]
_) -> Bool
True
Maybe AMQPException
_ -> Bool
False
closeChannel' :: Channel -> CloseType -> Text -> IO ()
closeChannel' :: Channel -> CloseType -> Text -> IO ()
closeChannel' Channel
c CloseType
closeType Text
reason = do
MVar (Maybe (CloseType, [Char]))
-> (Maybe (CloseType, [Char]) -> IO (Maybe (CloseType, [Char])))
-> IO ()
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ (Channel -> MVar (Maybe (CloseType, [Char]))
chanClosed Channel
c) ((Maybe (CloseType, [Char]) -> IO (Maybe (CloseType, [Char])))
-> IO ())
-> (Maybe (CloseType, [Char]) -> IO (Maybe (CloseType, [Char])))
-> IO ()
forall a b. (a -> b) -> a -> b
$ \Maybe (CloseType, [Char])
x -> do
if Maybe (CloseType, [Char]) -> Bool
forall a. Maybe a -> Bool
isNothing Maybe (CloseType, [Char])
x
then do
MVar (IntMap (Channel, ThreadId))
-> (IntMap (Channel, ThreadId) -> IO (IntMap (Channel, ThreadId)))
-> IO ()
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ (Connection -> MVar (IntMap (Channel, ThreadId))
connChannels (Connection -> MVar (IntMap (Channel, ThreadId)))
-> Connection -> MVar (IntMap (Channel, ThreadId))
forall a b. (a -> b) -> a -> b
$ Channel -> Connection
connection Channel
c) ((IntMap (Channel, ThreadId) -> IO (IntMap (Channel, ThreadId)))
-> IO ())
-> (IntMap (Channel, ThreadId) -> IO (IntMap (Channel, ThreadId)))
-> IO ()
forall a b. (a -> b) -> a -> b
$ \IntMap (Channel, ThreadId)
old -> do
Bool
ret <- ChannelAllocator -> Int -> IO Bool
freeChannel (Connection -> ChannelAllocator
connChanAllocator (Connection -> ChannelAllocator) -> Connection -> ChannelAllocator
forall a b. (a -> b) -> a -> b
$ Channel -> Connection
connection Channel
c) (Int -> IO Bool) -> Int -> IO Bool
forall a b. (a -> b) -> a -> b
$ Word16 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Word16 -> Int) -> Word16 -> Int
forall a b. (a -> b) -> a -> b
$ Channel -> Word16
channelID Channel
c
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
ret (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Handle -> [Char] -> IO ()
hPutStrLn Handle
stderr [Char]
"closeChannel error: channel already freed"
IntMap (Channel, ThreadId) -> IO (IntMap (Channel, ThreadId))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (IntMap (Channel, ThreadId) -> IO (IntMap (Channel, ThreadId)))
-> IntMap (Channel, ThreadId) -> IO (IntMap (Channel, ThreadId))
forall a b. (a -> b) -> a -> b
$ Int -> IntMap (Channel, ThreadId) -> IntMap (Channel, ThreadId)
forall a. Int -> IntMap a -> IntMap a
IM.delete (Word16 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Word16 -> Int) -> Word16 -> Int
forall a b. (a -> b) -> a -> b
$ Channel -> Word16
channelID Channel
c) IntMap (Channel, ThreadId)
old
IO Bool -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Bool -> IO ()) -> IO Bool -> IO ()
forall a b. (a -> b) -> a -> b
$ Lock -> IO Bool
killLock (Lock -> IO Bool) -> Lock -> IO Bool
forall a b. (a -> b) -> a -> b
$ Channel -> Lock
chanActive Channel
c
MVar (Seq (MVar Assembly)) -> IO ()
forall a. MVar (Seq (MVar a)) -> IO ()
killOutstandingResponses (MVar (Seq (MVar Assembly)) -> IO ())
-> MVar (Seq (MVar Assembly)) -> IO ()
forall a b. (a -> b) -> a -> b
$ Channel -> MVar (Seq (MVar Assembly))
outstandingResponses Channel
c
Maybe (CloseType, [Char]) -> IO (Maybe (CloseType, [Char]))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe (CloseType, [Char]) -> IO (Maybe (CloseType, [Char])))
-> Maybe (CloseType, [Char]) -> IO (Maybe (CloseType, [Char]))
forall a b. (a -> b) -> a -> b
$ (CloseType, [Char]) -> Maybe (CloseType, [Char])
forall a. a -> Maybe a
Just (CloseType
closeType, Text -> [Char]
T.unpack Text
reason)
else Maybe (CloseType, [Char]) -> IO (Maybe (CloseType, [Char]))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (CloseType, [Char])
x
where
killOutstandingResponses :: MVar (Seq.Seq (MVar a)) -> IO ()
killOutstandingResponses :: forall a. MVar (Seq (MVar a)) -> IO ()
killOutstandingResponses MVar (Seq (MVar a))
outResps = do
MVar (Seq (MVar a)) -> (Seq (MVar a) -> IO (Seq (MVar a))) -> IO ()
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ MVar (Seq (MVar a))
outResps ((Seq (MVar a) -> IO (Seq (MVar a))) -> IO ())
-> (Seq (MVar a) -> IO (Seq (MVar a))) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Seq (MVar a)
val -> do
(MVar a -> IO Bool) -> Seq (MVar a) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
F.mapM_ (\MVar a
x -> MVar a -> a -> IO Bool
forall a. MVar a -> a -> IO Bool
tryPutMVar MVar a
x (a -> IO Bool) -> a -> IO Bool
forall a b. (a -> b) -> a -> b
$ [Char] -> a
forall a. HasCallStack => [Char] -> a
error [Char]
"channel closed") Seq (MVar a)
val
Seq (MVar a) -> IO (Seq (MVar a))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Seq (MVar a)
forall a. HasCallStack => a
undefined
openChannel :: Connection -> IO Channel
openChannel :: Connection -> IO Channel
openChannel Connection
c = do
Chan FramePayload
newInQueue <- IO (Chan FramePayload)
forall a. IO (Chan a)
newChan
MVar (Seq (MVar Assembly))
outRes <- Seq (MVar Assembly) -> IO (MVar (Seq (MVar Assembly)))
forall a. a -> IO (MVar a)
newMVar Seq (MVar Assembly)
forall a. Seq a
Seq.empty
MVar Int
lastConsTag <- Int -> IO (MVar Int)
forall a. a -> IO (MVar a)
newMVar Int
0
Lock
ca <- IO Lock
newLock
MVar (Maybe (CloseType, [Char]))
closed <- Maybe (CloseType, [Char]) -> IO (MVar (Maybe (CloseType, [Char])))
forall a. a -> IO (MVar a)
newMVar Maybe (CloseType, [Char])
forall a. Maybe a
Nothing
MVar (Map Text ((Message, Envelope) -> IO (), Text -> IO ()))
conss <- Map Text ((Message, Envelope) -> IO (), Text -> IO ())
-> IO
(MVar (Map Text ((Message, Envelope) -> IO (), Text -> IO ())))
forall a. a -> IO (MVar a)
newMVar Map Text ((Message, Envelope) -> IO (), Text -> IO ())
forall k a. Map k a
M.empty
MVar [(Message, PublishError) -> IO ()]
retListeners <- [(Message, PublishError) -> IO ()]
-> IO (MVar [(Message, PublishError) -> IO ()])
forall a. a -> IO (MVar a)
newMVar []
TVar IntSet
aSet <- IntSet -> IO (TVar IntSet)
forall a. a -> IO (TVar a)
newTVarIO IntSet
IntSet.empty
TVar IntSet
nSet <- IntSet -> IO (TVar IntSet)
forall a. a -> IO (TVar a)
newTVarIO IntSet
IntSet.empty
MVar Int
nxtSeq <- Int -> IO (MVar Int)
forall a. a -> IO (MVar a)
newMVar Int
0
TVar IntSet
unconfSet <- IntSet -> IO (TVar IntSet)
forall a. a -> IO (TVar a)
newTVarIO IntSet
IntSet.empty
MVar [(LongLongInt, Bool, AckType) -> IO ()]
cnfListeners <- [(LongLongInt, Bool, AckType) -> IO ()]
-> IO (MVar [(LongLongInt, Bool, AckType) -> IO ()])
forall a. a -> IO (MVar a)
newMVar []
MVar [SomeException -> IO ()]
handlers <- [SomeException -> IO ()] -> IO (MVar [SomeException -> IO ()])
forall a. a -> IO (MVar a)
newMVar []
Channel
newChannel <- MVar (IntMap (Channel, ThreadId))
-> (IntMap (Channel, ThreadId)
-> IO (IntMap (Channel, ThreadId), Channel))
-> IO Channel
forall a b. MVar a -> (a -> IO (a, b)) -> IO b
modifyMVar (Connection -> MVar (IntMap (Channel, ThreadId))
connChannels Connection
c) ((IntMap (Channel, ThreadId)
-> IO (IntMap (Channel, ThreadId), Channel))
-> IO Channel)
-> (IntMap (Channel, ThreadId)
-> IO (IntMap (Channel, ThreadId), Channel))
-> IO Channel
forall a b. (a -> b) -> a -> b
$ \IntMap (Channel, ThreadId)
mp -> do
Int
newChannelID <- ChannelAllocator -> IO Int
allocateChannel (Connection -> ChannelAllocator
connChanAllocator Connection
c)
let newChannel :: Channel
newChannel = Connection
-> Chan FramePayload
-> MVar (Seq (MVar Assembly))
-> Word16
-> MVar Int
-> MVar Int
-> TVar IntSet
-> TVar IntSet
-> TVar IntSet
-> Lock
-> MVar (Maybe (CloseType, [Char]))
-> MVar (Map Text ((Message, Envelope) -> IO (), Text -> IO ()))
-> MVar [(Message, PublishError) -> IO ()]
-> MVar [(LongLongInt, Bool, AckType) -> IO ()]
-> MVar [SomeException -> IO ()]
-> Channel
Channel Connection
c Chan FramePayload
newInQueue MVar (Seq (MVar Assembly))
outRes (Int -> Word16
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
newChannelID) MVar Int
lastConsTag MVar Int
nxtSeq TVar IntSet
unconfSet TVar IntSet
aSet TVar IntSet
nSet Lock
ca MVar (Maybe (CloseType, [Char]))
closed MVar (Map Text ((Message, Envelope) -> IO (), Text -> IO ()))
conss MVar [(Message, PublishError) -> IO ()]
retListeners MVar [(LongLongInt, Bool, AckType) -> IO ()]
cnfListeners MVar [SomeException -> IO ()]
handlers
ThreadId
thrID <- IO () -> (Either SomeException () -> IO ()) -> IO ThreadId
forall a. IO a -> (Either SomeException a -> IO ()) -> IO ThreadId
forkFinally' (Channel -> IO ()
channelReceiver Channel
newChannel) ((Either SomeException () -> IO ()) -> IO ThreadId)
-> (Either SomeException () -> IO ()) -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ \Either SomeException ()
res -> do
Channel -> CloseType -> Text -> IO ()
closeChannel' Channel
newChannel CloseType
Normal Text
"closed"
case Either SomeException ()
res of
Right ()
_ -> () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Left SomeException
ex -> do
let unwrappedExc :: SomeException
unwrappedExc = SomeException -> SomeException
unwrapChanThreadKilledException SomeException
ex
[SomeException -> IO ()]
handlers' <- MVar [SomeException -> IO ()] -> IO [SomeException -> IO ()]
forall a. MVar a -> IO a
readMVar MVar [SomeException -> IO ()]
handlers
case ([SomeException -> IO ()] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [SomeException -> IO ()]
handlers', SomeException -> Maybe [Char]
fromAbnormalChannelClose SomeException
unwrappedExc) of
(Bool
True, Just [Char]
reason) -> Handle -> [Char] -> IO ()
hPutStrLn Handle
stderr ([Char] -> IO ()) -> [Char] -> IO ()
forall a b. (a -> b) -> a -> b
$ [Char]
"unhandled AMQP channel exception (chanId="[Char] -> ShowS
forall a. [a] -> [a] -> [a]
++Int -> [Char]
forall a. Show a => a -> [Char]
show Int
newChannelID[Char] -> ShowS
forall a. [a] -> [a] -> [a]
++[Char]
"): "[Char] -> ShowS
forall a. [a] -> [a] -> [a]
++[Char]
reason
(Bool, Maybe [Char])
_ -> ((SomeException -> IO ()) -> IO ())
-> [SomeException -> IO ()] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ ((SomeException -> IO ()) -> SomeException -> IO ()
forall a b. (a -> b) -> a -> b
$ SomeException
unwrappedExc) [SomeException -> IO ()]
handlers'
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int -> IntMap (Channel, ThreadId) -> Bool
forall a. Int -> IntMap a -> Bool
IM.member Int
newChannelID IntMap (Channel, ThreadId)
mp) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ IOError -> IO ()
forall e a. Exception e => e -> IO a
CE.throwIO (IOError -> IO ()) -> IOError -> IO ()
forall a b. (a -> b) -> a -> b
$ [Char] -> IOError
userError [Char]
"openChannel fail: channel already open"
(IntMap (Channel, ThreadId), Channel)
-> IO (IntMap (Channel, ThreadId), Channel)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Int
-> (Channel, ThreadId)
-> IntMap (Channel, ThreadId)
-> IntMap (Channel, ThreadId)
forall a. Int -> a -> IntMap a -> IntMap a
IM.insert Int
newChannelID (Channel
newChannel, ThreadId
thrID) IntMap (Channel, ThreadId)
mp, Channel
newChannel)
SimpleMethod (Channel_open_ok LongString
_) <- Channel -> Assembly -> IO Assembly
request Channel
newChannel (Assembly -> IO Assembly) -> Assembly -> IO Assembly
forall a b. (a -> b) -> a -> b
$ MethodPayload -> Assembly
SimpleMethod (MethodPayload -> Assembly) -> MethodPayload -> Assembly
forall a b. (a -> b) -> a -> b
$ ShortString -> MethodPayload
Channel_open (Text -> ShortString
ShortString Text
"")
Channel -> IO Channel
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Channel
newChannel
where
fromAbnormalChannelClose :: CE.SomeException -> Maybe String
fromAbnormalChannelClose :: SomeException -> Maybe [Char]
fromAbnormalChannelClose SomeException
exc =
case SomeException -> Maybe AMQPException
forall e. Exception e => SomeException -> Maybe e
CE.fromException SomeException
exc :: Maybe AMQPException of
Just (ConnectionClosedException CloseType
_ [Char]
_) -> Maybe [Char]
forall a. Maybe a
Nothing
Just (ChannelClosedException CloseType
Normal [Char]
_) -> Maybe [Char]
forall a. Maybe a
Nothing
Just (ChannelClosedException CloseType
Abnormal [Char]
reason) -> [Char] -> Maybe [Char]
forall a. a -> Maybe a
Just [Char]
reason
Just (AllChannelsAllocatedException Int
_) -> [Char] -> Maybe [Char]
forall a. a -> Maybe a
Just [Char]
"all channels allocated"
Maybe AMQPException
Nothing -> [Char] -> Maybe [Char]
forall a. a -> Maybe a
Just ([Char] -> Maybe [Char]) -> [Char] -> Maybe [Char]
forall a b. (a -> b) -> a -> b
$ SomeException -> [Char]
forall a. Show a => a -> [Char]
show SomeException
exc
closeChannel :: Channel -> IO ()
closeChannel :: Channel -> IO ()
closeChannel Channel
c = do
SimpleMethod MethodPayload
Channel_close_ok <- Channel -> Assembly -> IO Assembly
request Channel
c (Assembly -> IO Assembly) -> Assembly -> IO Assembly
forall a b. (a -> b) -> a -> b
$ MethodPayload -> Assembly
SimpleMethod (MethodPayload -> Assembly) -> MethodPayload -> Assembly
forall a b. (a -> b) -> a -> b
$ Word16 -> ShortString -> Word16 -> Word16 -> MethodPayload
Channel_close Word16
0 (Text -> ShortString
ShortString Text
"") Word16
0 Word16
0
MVar (IntMap (Channel, ThreadId))
-> (IntMap (Channel, ThreadId) -> IO ()) -> IO ()
forall a b. MVar a -> (a -> IO b) -> IO b
withMVar (Connection -> MVar (IntMap (Channel, ThreadId))
connChannels (Connection -> MVar (IntMap (Channel, ThreadId)))
-> Connection -> MVar (IntMap (Channel, ThreadId))
forall a b. (a -> b) -> a -> b
$ Channel -> Connection
connection Channel
c) ((IntMap (Channel, ThreadId) -> IO ()) -> IO ())
-> (IntMap (Channel, ThreadId) -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \IntMap (Channel, ThreadId)
chans -> do
case Int -> IntMap (Channel, ThreadId) -> Maybe (Channel, ThreadId)
forall a. Int -> IntMap a -> Maybe a
IM.lookup (Word16 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Word16 -> Int) -> Word16 -> Int
forall a b. (a -> b) -> a -> b
$ Channel -> Word16
channelID Channel
c) IntMap (Channel, ThreadId)
chans of
Just (Channel
_, ThreadId
thrID) -> ThreadId -> AMQPException -> IO ()
forall e. Exception e => ThreadId -> e -> IO ()
throwTo ThreadId
thrID (AMQPException -> IO ()) -> AMQPException -> IO ()
forall a b. (a -> b) -> a -> b
$ CloseType -> [Char] -> AMQPException
ChannelClosedException CloseType
Normal [Char]
"closeChannel was called"
Maybe (Channel, ThreadId)
Nothing -> () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
writeFrames :: Channel -> [FramePayload] -> IO ()
writeFrames :: Channel -> [FramePayload] -> IO ()
writeFrames Channel
chan [FramePayload]
payloads = do
let conn :: Connection
conn = Channel -> Connection
connection Channel
chan
MVar (IntMap (Channel, ThreadId))
-> (IntMap (Channel, ThreadId) -> IO ()) -> IO ()
forall a b. MVar a -> (a -> IO b) -> IO b
withMVar (Connection -> MVar (IntMap (Channel, ThreadId))
connChannels Connection
conn) ((IntMap (Channel, ThreadId) -> IO ()) -> IO ())
-> (IntMap (Channel, ThreadId) -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \IntMap (Channel, ThreadId)
chans ->
if Int -> IntMap (Channel, ThreadId) -> Bool
forall a. Int -> IntMap a -> Bool
IM.member (Word16 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Word16 -> Int) -> Word16 -> Int
forall a b. (a -> b) -> a -> b
$ Channel -> Word16
channelID Channel
chan) IntMap (Channel, ThreadId)
chans
then IO () -> (IOError -> IO ()) -> IO ()
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
CE.catch
(do
MVar () -> (() -> IO ()) -> IO ()
forall a b. MVar a -> (a -> IO b) -> IO b
withMVar (Connection -> MVar ()
connWriteLock Connection
conn) ((() -> IO ()) -> IO ()) -> (() -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \()
_ ->
(FramePayload -> IO ()) -> [FramePayload] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (\FramePayload
payload -> Connection -> Frame -> IO ()
writeFrame (Connection -> Connection
connHandle Connection
conn) (Word16 -> FramePayload -> Frame
Frame (Channel -> Word16
channelID Channel
chan) FramePayload
payload)) [FramePayload]
payloads
Connection -> IO ()
updateLastSent Connection
conn)
(\(IOError
_ :: CE.IOException) -> do
IOError -> IO ()
forall e a. Exception e => e -> IO a
CE.throwIO (IOError -> IO ()) -> IOError -> IO ()
forall a b. (a -> b) -> a -> b
$ [Char] -> IOError
userError [Char]
"connection not open"
)
else do
IOError -> IO ()
forall e a. Exception e => e -> IO a
CE.throwIO (IOError -> IO ()) -> IOError -> IO ()
forall a b. (a -> b) -> a -> b
$ [Char] -> IOError
userError [Char]
"channel not open"
writeAssembly' :: Channel -> Assembly -> IO ()
writeAssembly' :: Channel -> Assembly -> IO ()
writeAssembly' Channel
chan (ContentMethod MethodPayload
m ContentHeaderProperties
properties ByteString
msg) = do
Lock -> IO ()
waitLock (Lock -> IO ()) -> Lock -> IO ()
forall a b. (a -> b) -> a -> b
$ Channel -> Lock
chanActive Channel
chan
let !toWrite :: [FramePayload]
toWrite = [
MethodPayload -> FramePayload
MethodPayload MethodPayload
m,
Word16
-> Word16 -> LongLongInt -> ContentHeaderProperties -> FramePayload
ContentHeaderPayload
(ContentHeaderProperties -> Word16
getClassIDOf ContentHeaderProperties
properties)
Word16
0
(ByteOffset -> LongLongInt
forall a b. (Integral a, Num b) => a -> b
fromIntegral (ByteOffset -> LongLongInt) -> ByteOffset -> LongLongInt
forall a b. (a -> b) -> a -> b
$ ByteString -> ByteOffset
BL.length ByteString
msg)
ContentHeaderProperties
properties] [FramePayload] -> [FramePayload] -> [FramePayload]
forall a. [a] -> [a] -> [a]
++
(if ByteString -> ByteOffset
BL.length ByteString
msg ByteOffset -> ByteOffset -> Bool
forall a. Ord a => a -> a -> Bool
> ByteOffset
0
then do
(ByteString -> FramePayload) -> [ByteString] -> [FramePayload]
forall a b. (a -> b) -> [a] -> [b]
map ByteString -> FramePayload
ContentBodyPayload
(ByteString -> ByteOffset -> [ByteString]
splitLen ByteString
msg (ByteOffset -> [ByteString]) -> ByteOffset -> [ByteString]
forall a b. (a -> b) -> a -> b
$ Int -> ByteOffset
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Connection -> Int
connMaxFrameSize (Connection -> Int) -> Connection -> Int
forall a b. (a -> b) -> a -> b
$ Channel -> Connection
connection Channel
chan) ByteOffset -> ByteOffset -> ByteOffset
forall a. Num a => a -> a -> a
- ByteOffset
8)
else []
)
Channel -> [FramePayload] -> IO ()
writeFrames Channel
chan [FramePayload]
toWrite
where
splitLen :: ByteString -> ByteOffset -> [ByteString]
splitLen ByteString
str ByteOffset
len | ByteString -> ByteOffset
BL.length ByteString
str ByteOffset -> ByteOffset -> Bool
forall a. Ord a => a -> a -> Bool
> ByteOffset
len = ByteOffset -> ByteString -> ByteString
BL.take ByteOffset
len ByteString
str ByteString -> [ByteString] -> [ByteString]
forall a. a -> [a] -> [a]
: ByteString -> ByteOffset -> [ByteString]
splitLen (ByteOffset -> ByteString -> ByteString
BL.drop ByteOffset
len ByteString
str) ByteOffset
len
splitLen ByteString
str ByteOffset
_ = [ByteString
str]
writeAssembly' Channel
chan (SimpleMethod MethodPayload
m) = Channel -> [FramePayload] -> IO ()
writeFrames Channel
chan [MethodPayload -> FramePayload
MethodPayload MethodPayload
m]
writeAssembly :: Channel -> Assembly -> IO ()
writeAssembly :: Channel -> Assembly -> IO ()
writeAssembly Channel
chan Assembly
m =
IO () -> [Handler ()] -> IO ()
forall a. IO a -> [Handler a] -> IO a
CE.catches
(Channel -> Assembly -> IO ()
writeAssembly' Channel
chan Assembly
m)
[(AMQPException -> IO ()) -> Handler ()
forall a e. Exception e => (e -> IO a) -> Handler a
CE.Handler (\ (AMQPException
_ :: AMQPException) -> Channel -> IO ()
forall a. Channel -> IO a
throwMostRelevantAMQPException Channel
chan),
(ErrorCall -> IO ()) -> Handler ()
forall a e. Exception e => (e -> IO a) -> Handler a
CE.Handler (\ (ErrorCall
_ :: CE.ErrorCall) -> Channel -> IO ()
forall a. Channel -> IO a
throwMostRelevantAMQPException Channel
chan),
(IOError -> IO ()) -> Handler ()
forall a e. Exception e => (e -> IO a) -> Handler a
CE.Handler (\ (IOError
_ :: CE.IOException) -> Channel -> IO ()
forall a. Channel -> IO a
throwMostRelevantAMQPException Channel
chan)]
request :: Channel -> Assembly -> IO Assembly
request :: Channel -> Assembly -> IO Assembly
request Channel
chan Assembly
m = do
MVar Assembly
res <- IO (MVar Assembly)
forall a. IO (MVar a)
newEmptyMVar
IO Assembly -> [Handler Assembly] -> IO Assembly
forall a. IO a -> [Handler a] -> IO a
CE.catches (do
MVar (Maybe (CloseType, [Char]))
-> (Maybe (CloseType, [Char]) -> IO ()) -> IO ()
forall a b. MVar a -> (a -> IO b) -> IO b
withMVar (Channel -> MVar (Maybe (CloseType, [Char]))
chanClosed Channel
chan) ((Maybe (CloseType, [Char]) -> IO ()) -> IO ())
-> (Maybe (CloseType, [Char]) -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Maybe (CloseType, [Char])
cc -> do
if Maybe (CloseType, [Char]) -> Bool
forall a. Maybe a -> Bool
isNothing Maybe (CloseType, [Char])
cc
then do
MVar (Seq (MVar Assembly))
-> (Seq (MVar Assembly) -> IO (Seq (MVar Assembly))) -> IO ()
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ (Channel -> MVar (Seq (MVar Assembly))
outstandingResponses Channel
chan) ((Seq (MVar Assembly) -> IO (Seq (MVar Assembly))) -> IO ())
-> (Seq (MVar Assembly) -> IO (Seq (MVar Assembly))) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Seq (MVar Assembly)
val -> Seq (MVar Assembly) -> IO (Seq (MVar Assembly))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Seq (MVar Assembly) -> IO (Seq (MVar Assembly)))
-> Seq (MVar Assembly) -> IO (Seq (MVar Assembly))
forall a b. (a -> b) -> a -> b
$! Seq (MVar Assembly)
val Seq (MVar Assembly) -> MVar Assembly -> Seq (MVar Assembly)
forall a. Seq a -> a -> Seq a
Seq.|> MVar Assembly
res
Channel -> Assembly -> IO ()
writeAssembly' Channel
chan Assembly
m
else IOError -> IO ()
forall e a. Exception e => e -> IO a
CE.throwIO (IOError -> IO ()) -> IOError -> IO ()
forall a b. (a -> b) -> a -> b
$ [Char] -> IOError
userError [Char]
"closed"
!Assembly
r <- MVar Assembly -> IO Assembly
forall a. MVar a -> IO a
takeMVar MVar Assembly
res
Assembly -> IO Assembly
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Assembly
r
)
[(AMQPException -> IO Assembly) -> Handler Assembly
forall a e. Exception e => (e -> IO a) -> Handler a
CE.Handler (\ (AMQPException
_ :: AMQPException) -> Channel -> IO Assembly
forall a. Channel -> IO a
throwMostRelevantAMQPException Channel
chan),
(ErrorCall -> IO Assembly) -> Handler Assembly
forall a e. Exception e => (e -> IO a) -> Handler a
CE.Handler (\ (ErrorCall
_ :: CE.ErrorCall) -> Channel -> IO Assembly
forall a. Channel -> IO a
throwMostRelevantAMQPException Channel
chan),
(IOError -> IO Assembly) -> Handler Assembly
forall a e. Exception e => (e -> IO a) -> Handler a
CE.Handler (\ (IOError
_ :: CE.IOException) -> Channel -> IO Assembly
forall a. Channel -> IO a
throwMostRelevantAMQPException Channel
chan)]
throwMostRelevantAMQPException :: Channel -> IO a
throwMostRelevantAMQPException :: forall a. Channel -> IO a
throwMostRelevantAMQPException Channel
chan = do
Maybe (CloseType, [Char])
cc <- MVar (Maybe (CloseType, [Char])) -> IO (Maybe (CloseType, [Char]))
forall a. MVar a -> IO a
readMVar (MVar (Maybe (CloseType, [Char]))
-> IO (Maybe (CloseType, [Char])))
-> MVar (Maybe (CloseType, [Char]))
-> IO (Maybe (CloseType, [Char]))
forall a b. (a -> b) -> a -> b
$ Connection -> MVar (Maybe (CloseType, [Char]))
connClosed (Connection -> MVar (Maybe (CloseType, [Char])))
-> Connection -> MVar (Maybe (CloseType, [Char]))
forall a b. (a -> b) -> a -> b
$ Channel -> Connection
connection Channel
chan
case Maybe (CloseType, [Char])
cc of
Just (CloseType
closeType, [Char]
r) -> AMQPException -> IO a
forall e a. Exception e => e -> IO a
CE.throwIO (AMQPException -> IO a) -> AMQPException -> IO a
forall a b. (a -> b) -> a -> b
$ CloseType -> [Char] -> AMQPException
ConnectionClosedException CloseType
closeType [Char]
r
Maybe (CloseType, [Char])
Nothing -> do
Maybe (CloseType, [Char])
chc <- MVar (Maybe (CloseType, [Char])) -> IO (Maybe (CloseType, [Char]))
forall a. MVar a -> IO a
readMVar (MVar (Maybe (CloseType, [Char]))
-> IO (Maybe (CloseType, [Char])))
-> MVar (Maybe (CloseType, [Char]))
-> IO (Maybe (CloseType, [Char]))
forall a b. (a -> b) -> a -> b
$ Channel -> MVar (Maybe (CloseType, [Char]))
chanClosed Channel
chan
case Maybe (CloseType, [Char])
chc of
Just (CloseType
ct, [Char]
r) -> AMQPException -> IO a
forall e a. Exception e => e -> IO a
CE.throwIO (AMQPException -> IO a) -> AMQPException -> IO a
forall a b. (a -> b) -> a -> b
$ CloseType -> [Char] -> AMQPException
ChannelClosedException CloseType
ct [Char]
r
Maybe (CloseType, [Char])
Nothing -> AMQPException -> IO a
forall e a. Exception e => e -> IO a
CE.throwIO (AMQPException -> IO a) -> AMQPException -> IO a
forall a b. (a -> b) -> a -> b
$ CloseType -> [Char] -> AMQPException
ConnectionClosedException CloseType
Abnormal [Char]
"unknown reason"
waitForAllConfirms :: Channel -> STM (IntSet.IntSet, IntSet.IntSet)
waitForAllConfirms :: Channel -> STM (IntSet, IntSet)
waitForAllConfirms Channel
chan = do
IntSet
pending <- TVar IntSet -> STM IntSet
forall a. TVar a -> STM a
readTVar (TVar IntSet -> STM IntSet) -> TVar IntSet -> STM IntSet
forall a b. (a -> b) -> a -> b
$ Channel -> TVar IntSet
unconfirmedSet Channel
chan
Bool -> STM ()
check (IntSet -> Bool
IntSet.null IntSet
pending)
(,) (IntSet -> IntSet -> (IntSet, IntSet))
-> STM IntSet -> STM (IntSet -> (IntSet, IntSet))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TVar IntSet -> IntSet -> STM IntSet
forall a. TVar a -> a -> STM a
swapTVar (Channel -> TVar IntSet
ackedSet Channel
chan) IntSet
IntSet.empty
STM (IntSet -> (IntSet, IntSet))
-> STM IntSet -> STM (IntSet, IntSet)
forall a b. STM (a -> b) -> STM a -> STM b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> TVar IntSet -> IntSet -> STM IntSet
forall a. TVar a -> a -> STM a
swapTVar (Channel -> TVar IntSet
nackedSet Channel
chan) IntSet
IntSet.empty