module Network.AMQP (
Connection,
openConnection,
openConnection',
closeConnection,
addConnectionClosedHandler,
Channel,
openChannel,
qos,
ExchangeOpts(..),
newExchange,
declareExchange,
deleteExchange,
QueueOpts(..),
newQueue,
declareQueue,
bindQueue,
bindQueue',
purgeQueue,
deleteQueue,
Message(..),
DeliveryMode(..),
newMsg,
Envelope(..),
ConsumerTag,
Ack(..),
consumeMsgs,
cancelConsumer,
publishMsg,
getMsg,
rejectMsg,
recoverMsgs,
ackMsg,
ackEnv,
txSelect,
txCommit,
txRollback,
flow,
AMQPException(..)
) where
import Data.Binary
import Data.Binary.Get
import Data.Binary.Put as BPut
import Data.Typeable
import qualified Data.Map as M
import qualified Data.IntMap as IM
import qualified Data.ByteString.Char8 as BS
import qualified Data.ByteString.Lazy.Char8 as BL
import qualified Data.Sequence as Seq
import qualified Data.Foldable as F
import qualified Data.Text as T
import Data.Text (Text)
import Data.IORef
import Data.Maybe
import Data.Int
import Control.Concurrent
import Control.Monad
import qualified Control.Exception as CE
import Network.BSD
import Network.Socket
import qualified Network.Socket.ByteString as NB
import Network.AMQP.Protocol
import Network.AMQP.Types
import Network.AMQP.Helpers
import Network.AMQP.Generated
data ExchangeOpts = ExchangeOpts
{
exchangeName :: Text,
exchangeType :: Text,
exchangePassive :: Bool,
exchangeDurable :: Bool,
exchangeAutoDelete :: Bool,
exchangeInternal :: Bool
}
newExchange :: ExchangeOpts
newExchange = ExchangeOpts "" "" False True False False
declareExchange :: Channel -> ExchangeOpts -> IO ()
declareExchange chan exchg = do
(SimpleMethod Exchange_declare_ok) <- request chan (SimpleMethod (Exchange_declare
1
(ShortString $ exchangeName exchg)
(ShortString $ exchangeType exchg)
(exchangePassive exchg)
(exchangeDurable exchg)
(exchangeAutoDelete exchg)
(exchangeInternal exchg)
False
(FieldTable (M.fromList []))))
return ()
deleteExchange :: Channel -> Text -> IO ()
deleteExchange chan exchangeName = do
(SimpleMethod Exchange_delete_ok) <- request chan (SimpleMethod (Exchange_delete
1
(ShortString exchangeName)
False
False
))
return ()
data QueueOpts = QueueOpts
{
queueName :: Text,
--optional
queuePassive :: Bool,
queueDurable :: Bool,
queueExclusive :: Bool,
queueAutoDelete :: Bool,
queueHeaders :: FieldTable
}
newQueue :: QueueOpts
newQueue = QueueOpts "" False True False False (FieldTable M.empty)
declareQueue :: Channel -> QueueOpts -> IO (Text, Int, Int)
declareQueue chan queue = do
(SimpleMethod (Queue_declare_ok (ShortString qName) messageCount consumerCount)) <- request chan $ (SimpleMethod (Queue_declare
1
(ShortString $ queueName queue)
(queuePassive queue)
(queueDurable queue)
(queueExclusive queue)
(queueAutoDelete queue)
False
(queueHeaders queue)))
return (qName, fromIntegral messageCount, fromIntegral consumerCount)
bindQueue :: Channel -> Text -> Text -> Text -> IO ()
bindQueue chan queueName exchangeName routingKey = do
bindQueue' chan queueName exchangeName routingKey (FieldTable (M.fromList []))
bindQueue' :: Channel -> Text -> Text -> Text -> FieldTable -> IO ()
bindQueue' chan queueName exchangeName routingKey args = do
(SimpleMethod Queue_bind_ok) <- request chan (SimpleMethod (Queue_bind
1
(ShortString queueName)
(ShortString exchangeName)
(ShortString routingKey)
False
args
))
return ()
purgeQueue :: Channel -> Text -> IO Word32
purgeQueue chan queueName = do
(SimpleMethod (Queue_purge_ok msgCount)) <- request chan $ (SimpleMethod (Queue_purge
1
(ShortString queueName)
False
))
return msgCount
deleteQueue :: Channel -> Text -> IO Word32
deleteQueue chan queueName = do
(SimpleMethod (Queue_delete_ok msgCount)) <- request chan $ (SimpleMethod (Queue_delete
1
(ShortString queueName)
False
False
False
))
return msgCount
type ConsumerTag = Text
data Ack = Ack | NoAck
ackToBool :: Ack -> Bool
ackToBool Ack = False
ackToBool NoAck = True
consumeMsgs :: Channel -> Text -> Ack -> ((Message,Envelope) -> IO ()) -> IO ConsumerTag
consumeMsgs chan queueName ack callback = do
newConsumerTag <- (fmap (T.pack . show)) $ modifyMVar (lastConsumerTag chan) $ \c -> return (c+1,c+1)
modifyMVar_ (consumers chan) $ \c -> return $ M.insert newConsumerTag callback c
writeAssembly chan (SimpleMethod $ Basic_consume
1
(ShortString queueName)
(ShortString newConsumerTag)
False
(ackToBool ack)
False
True
)
return newConsumerTag
cancelConsumer :: Channel -> ConsumerTag -> IO ()
cancelConsumer chan consumerTag = do
(SimpleMethod (Basic_cancel_ok consumerTag')) <- request chan $ (SimpleMethod (Basic_cancel
(ShortString consumerTag)
False
))
modifyMVar_ (consumers chan) $ \c -> return $ M.delete consumerTag c
publishMsg :: Channel -> Text -> Text -> Message -> IO ()
publishMsg chan exchangeName routingKey msg = do
writeAssembly chan (ContentMethod (Basic_publish
1
(ShortString exchangeName)
(ShortString routingKey)
False
False)
(CHBasic
(fmap ShortString $ msgContentType msg)
Nothing
(msgHeaders msg)
(fmap deliveryModeToInt $ msgDeliveryMode msg)
Nothing
(fmap ShortString $ msgCorrelationID msg)
(fmap ShortString $ msgReplyTo msg)
Nothing
(fmap ShortString $ msgID msg)
(msgTimestamp msg)
Nothing
Nothing
Nothing
Nothing
)
(msgBody msg))
return ()
getMsg :: Channel -> Ack -> Text -> IO (Maybe (Message, Envelope))
getMsg chan ack queueName = do
ret <- request chan (SimpleMethod (Basic_get
1
(ShortString queueName)
(ackToBool ack)
))
case ret of
ContentMethod (Basic_get_ok deliveryTag redelivered (ShortString exchangeName) (ShortString routingKey) msgCount) properties msgBody ->
return $ Just $ (msgFromContentHeaderProperties properties msgBody,
Envelope {envDeliveryTag = deliveryTag, envRedelivered = redelivered,
envExchangeName = exchangeName, envRoutingKey = routingKey, envChannel = chan})
_ -> return Nothing
ackMsg :: Channel -> LongLongInt -> Bool -> IO ()
ackMsg chan deliveryTag multiple =
writeAssembly chan $ (SimpleMethod (Basic_ack
deliveryTag
multiple
))
ackEnv :: Envelope -> IO ()
ackEnv env = ackMsg (envChannel env) (envDeliveryTag env) False
rejectMsg :: Channel -> LongLongInt -> Bool -> IO ()
rejectMsg chan deliveryTag requeue =
writeAssembly chan $ (SimpleMethod (Basic_reject
deliveryTag
requeue
))
recoverMsgs :: Channel -> Bool -> IO ()
recoverMsgs chan requeue =
writeAssembly chan $ (SimpleMethod (Basic_recover
requeue
))
txSelect :: Channel -> IO ()
txSelect chan = do
(SimpleMethod Tx_select_ok) <- request chan $ SimpleMethod Tx_select
return ()
txCommit :: Channel -> IO ()
txCommit chan = do
(SimpleMethod Tx_commit_ok) <- request chan $ SimpleMethod Tx_commit
return ()
txRollback :: Channel -> IO ()
txRollback chan = do
(SimpleMethod Tx_rollback_ok) <- request chan $ SimpleMethod Tx_rollback
return ()
flow :: Channel -> Bool -> IO ()
flow chan active = do
(SimpleMethod (Channel_flow_ok _)) <- request chan $ SimpleMethod (Channel_flow active)
return ()
data Envelope = Envelope
{
envDeliveryTag :: LongLongInt,
envRedelivered :: Bool,
envExchangeName :: Text,
envRoutingKey :: Text,
envChannel :: Channel
}
data DeliveryMode = Persistent
| NonPersistent
deriving Show
deliveryModeToInt NonPersistent = 1
deliveryModeToInt Persistent = 2
intToDeliveryMode 1 = NonPersistent
intToDeliveryMode 2 = Persistent
data Message = Message {
msgBody :: BL.ByteString,
msgDeliveryMode :: Maybe DeliveryMode,
msgTimestamp :: Maybe Timestamp,
msgID :: Maybe Text,
msgContentType :: Maybe Text,
msgReplyTo :: Maybe Text,
msgCorrelationID :: Maybe Text,
msgHeaders :: Maybe FieldTable
}
deriving Show
newMsg :: Message
newMsg = Message (BL.empty) Nothing Nothing Nothing Nothing Nothing Nothing Nothing
data Assembly = SimpleMethod MethodPayload
| ContentMethod MethodPayload ContentHeaderProperties BL.ByteString
deriving Show
readAssembly :: Chan FramePayload -> IO Assembly
readAssembly chan = do
m <- readChan chan
case m of
MethodPayload p ->
if hasContent m
then do
(props, msg) <- collectContent chan
return $ ContentMethod p props msg
else do
return $ SimpleMethod p
x -> error $ "didn't expect frame: "++(show x)
collectContent :: Chan FramePayload -> IO (ContentHeaderProperties, BL.ByteString)
collectContent chan = do
(ContentHeaderPayload _ _ bodySize props) <- readChan chan
content <- collect $ fromIntegral bodySize
return (props, BL.concat content)
where
collect x | x <= 0 = return []
collect rem = do
(ContentBodyPayload payload) <- readChan chan
r <- collect (rem (BL.length payload))
return $ payload : r
data Connection = Connection {
connSocket :: Socket,
connChannels :: (MVar (IM.IntMap (Channel, ThreadId))),
connMaxFrameSize :: Int,
connClosed :: MVar (Maybe String),
connClosedLock :: MVar (),
connWriteLock :: MVar (),
connClosedHandlers :: MVar [IO ()],
lastChannelID :: MVar Int
}
connectionReceiver :: Connection -> IO ()
connectionReceiver conn = do
(Frame chanID payload) <- readFrameSock (connSocket conn) (connMaxFrameSize conn)
forwardToChannel chanID payload
connectionReceiver conn
where
forwardToChannel 0 (MethodPayload Connection_close_ok) = do
modifyMVar_ (connClosed conn) $ \x -> return $ Just "closed by user"
killThread =<< myThreadId
forwardToChannel 0 (MethodPayload (Connection_close _ (ShortString errorMsg) _ _ )) = do
modifyMVar_ (connClosed conn) $ \x -> return $ Just $ T.unpack errorMsg
killThread =<< myThreadId
forwardToChannel 0 payload = print $ "Got unexpected msg on channel zero: "++(show payload)
forwardToChannel chanID payload = do
withMVar (connChannels conn) $ \cs -> do
case IM.lookup (fromIntegral chanID) cs of
Just c -> writeChan (inQueue $ fst c) payload
Nothing -> print $ "ERROR: channel not open "++(show chanID)
openConnection :: String -> Text -> Text -> Text -> IO Connection
openConnection host vhost loginName loginPassword =
openConnection' host 5672 vhost loginName loginPassword
openConnection' :: String -> PortNumber -> Text -> Text -> Text -> IO Connection
openConnection' host port vhost loginName loginPassword = withSocketsDo $ do
(addrInfo:_) <- getAddrInfo Nothing (Just host) (Just $ show $ fromEnum port)
proto <- getProtocolNumber "tcp"
sock <- socket (addrFamily addrInfo) Stream proto
connect sock (addrAddress addrInfo)
NB.send sock $ toStrict $ BPut.runPut $ do
BPut.putByteString $ BS.pack "AMQP"
BPut.putWord8 1
BPut.putWord8 1 --TCP/IP
BPut.putWord8 9
BPut.putWord8 1
Frame 0 (MethodPayload (Connection_start version_major version_minor server_properties mechanisms locales)) <- readFrameSock sock 4096
writeFrameSock sock start_ok
Frame 0 (MethodPayload (Connection_tune channel_max frame_max heartbeat)) <- readFrameSock sock 4096
let maxFrameSize = (min 131072 frame_max)
writeFrameSock sock (Frame 0 (MethodPayload
(Connection_tune_ok 0 maxFrameSize 0)
))
writeFrameSock sock open
Frame 0 (MethodPayload (Connection_open_ok _)) <- readFrameSock sock $ fromIntegral maxFrameSize
connChannels <- newMVar IM.empty
lastChanID <- newMVar 0
cClosed <- newMVar Nothing
writeLock <- newMVar ()
ccl <- newEmptyMVar
connClosedHandlers <- newMVar []
let conn = Connection sock connChannels (fromIntegral maxFrameSize) cClosed ccl writeLock connClosedHandlers lastChanID
forkIO $ CE.finally (connectionReceiver conn)
(do
CE.catch (sClose sock) (\(e::CE.SomeException) -> return ())
modifyMVar_ cClosed $ \x -> return $ Just $ maybe "closed" id x
withMVar connChannels $ \cc -> mapM_ (\c -> killThread $ snd c) $ IM.elems cc
withMVar connChannels $ \cc -> return $ IM.empty
tryPutMVar ccl ()
withMVar connClosedHandlers sequence
)
return conn
where
start_ok = (Frame 0 (MethodPayload (Connection_start_ok (FieldTable (M.fromList []))
(ShortString "AMQPLAIN")
(LongString (T.pack $ drop 4 $ BL.unpack $ runPut $ put $ FieldTable (M.fromList [("LOGIN",FVString loginName), ("PASSWORD", FVString loginPassword)])))
(ShortString "en_US")) ))
open = (Frame 0 (MethodPayload (Connection_open
(ShortString vhost)
(ShortString $ T.pack "")
True)))
closeConnection :: Connection -> IO ()
closeConnection c = do
CE.catch (
withMVar (connWriteLock c) $ \_ -> writeFrameSock (connSocket c) $ (Frame 0 (MethodPayload (Connection_close
0
(ShortString "")
0
0
)))
)
(\ (e::CE.IOException) -> do
return ()
)
readMVar $ connClosedLock c
return ()
addConnectionClosedHandler :: Connection -> Bool -> IO () -> IO ()
addConnectionClosedHandler conn ifClosed handler = do
withMVar (connClosed conn) $ \cc -> do
case cc of
Just _ | ifClosed == True -> handler
_ -> modifyMVar_ (connClosedHandlers conn) $ \old -> return $ handler:old
readFrameSock :: Socket -> Int -> IO Frame
readFrameSock sock maxFrameSize = do
dat <- recvExact 7
let len = fromIntegral $ peekFrameSize dat
dat' <- recvExact (len+1)
let (frame, rest, consumedBytes) = runGetState get (BL.append dat dat') 0
if consumedBytes /= fromIntegral (len+8)
then error $ "readFrameSock: parser should read "++show (len+8)++" bytes; but read "++show consumedBytes
else return ()
return frame
where
recvExact bytes = do
b <- recvExact' bytes $ BL.empty
if BL.length b /= fromIntegral bytes
then error $ "recvExact wanted "++show bytes++" bytes; got "++show (fromIntegral $ BL.length b)++" bytes"
else return b
recvExact' bytes buf = do
dat <- NB.recv sock bytes
let len = BS.length dat
if len == 0
then CE.throwIO $ ConnectionClosedException "recv returned 0 bytes"
else do
let buf' = BL.append buf (toLazy dat)
if len >= bytes
then return buf'
else recvExact' (byteslen) buf'
writeFrameSock :: Socket -> Frame -> IO ()
writeFrameSock sock x = do
f $ toStrict $ runPut $ put x
where
f x | BS.length x == 0 = return ()
f x = do
sent <- NB.send sock x
f $ BS.drop sent x
data Channel = Channel {
connection :: Connection,
inQueue :: Chan FramePayload,
outstandingResponses :: MVar (Seq.Seq (MVar Assembly)),
channelID :: Word16,
lastConsumerTag :: MVar Int,
chanActive :: Lock,
chanClosed :: MVar (Maybe String),
consumers :: MVar (M.Map Text ((Message, Envelope) -> IO ()))
}
msgFromContentHeaderProperties :: ContentHeaderProperties -> BL.ByteString -> Message
msgFromContentHeaderProperties
(CHBasic content_type content_encoding headers delivery_mode priority correlation_id reply_to expiration
message_id timestamp typ user_id app_id cluster_id) msgBody =
let msgId = fromShortString message_id
contentType = fromShortString content_type
replyTo = fromShortString reply_to
correlationID = fromShortString correlation_id
in
Message msgBody (fmap intToDeliveryMode delivery_mode) timestamp msgId contentType replyTo correlationID headers
where
fromShortString (Just (ShortString s)) = Just s
fromShortString _ = Nothing
channelReceiver :: Channel -> IO ()
channelReceiver chan = do
p <- readAssembly $ inQueue chan
if isResponse p
then do
action <- modifyMVar (outstandingResponses chan) $ \val -> do
case Seq.viewl val of
x Seq.:< rest -> do
return (rest, putMVar x p)
Seq.EmptyL -> do
return (val, CE.throwIO $ userError "got response, but have no corresponding request")
action
else handleAsync p
channelReceiver chan
where
isResponse :: Assembly -> Bool
isResponse (ContentMethod (Basic_deliver _ _ _ _ _) _ _) = False
isResponse (ContentMethod (Basic_return _ _ _ _) _ _) = False
isResponse (SimpleMethod (Channel_flow _)) = False
isResponse (SimpleMethod (Channel_close _ _ _ _)) = False
isResponse _ = True
handleAsync (ContentMethod (Basic_deliver (ShortString consumerTag) deliveryTag redelivered (ShortString exchangeName)
(ShortString routingKey))
properties msgBody) =
withMVar (consumers chan) (\s -> do
case M.lookup consumerTag s of
Just subscriber -> do
let msg = msgFromContentHeaderProperties properties msgBody
let env = Envelope {envDeliveryTag = deliveryTag, envRedelivered = redelivered,
envExchangeName = exchangeName, envRoutingKey = routingKey, envChannel = chan}
CE.catch (subscriber (msg, env))
(\(e::CE.SomeException) -> putStrLn $ "AMQP callback threw exception: "++show e)
Nothing ->
return ()
)
handleAsync (SimpleMethod (Channel_close errorNum (ShortString errorMsg) _ _)) = do
closeChannel' chan errorMsg
killThread =<< myThreadId
handleAsync (SimpleMethod (Channel_flow active)) = do
if active
then openLock $ chanActive chan
else closeLock $ chanActive chan
return ()
--Basic.return
handleAsync (ContentMethod (Basic_return replyCode replyText exchange routingKey) properties msgData) =
print "BASIC.RETURN not implemented"
closeChannel' c reason = do
modifyMVar_ (connChannels $ connection c) $ \old -> return $ IM.delete (fromIntegral $ channelID c) old
modifyMVar_ (chanClosed c) $ \x -> do
if isNothing x
then do
killLock $ chanActive c
killOutstandingResponses $ outstandingResponses c
return $ Just $ maybe (T.unpack reason) id x
else return x
where
killOutstandingResponses :: (MVar (Seq.Seq (MVar a))) -> IO ()
killOutstandingResponses outResps = do
modifyMVar_ outResps $ \val -> do
F.mapM_ (\x -> tryPutMVar x $ error "channel closed") val
return undefined
openChannel :: Connection -> IO Channel
openChannel c = do
newInQueue <- newChan
outRes <- newMVar Seq.empty
lastConsumerTag <- newMVar 0
ca <- newLock
chanClosed <- newMVar Nothing
consumers <- newMVar M.empty
newChannelID <- modifyMVar (lastChannelID c) $ \x -> return (x+1,x+1)
let newChannel = Channel c newInQueue outRes (fromIntegral newChannelID) lastConsumerTag ca chanClosed consumers
thrID <- forkIO $ CE.finally (channelReceiver newChannel)
(closeChannel' newChannel "closed")
modifyMVar_ (connChannels c) (\oldMap -> return $ IM.insert newChannelID (newChannel, thrID) oldMap)
(SimpleMethod Channel_open_ok) <- request newChannel (SimpleMethod (Channel_open (ShortString "")))
return newChannel
writeFrames :: Channel -> [FramePayload] -> IO ()
writeFrames chan payloads =
let conn = connection chan in
withMVar (connChannels conn) $ \chans ->
if IM.member (fromIntegral $ channelID chan) chans
then
CE.catch
(withMVar (connWriteLock conn) $ \_ ->
mapM_ (\payload -> writeFrameSock (connSocket conn) (Frame (channelID chan) payload)) payloads)
( \(e :: CE.IOException) -> do
CE.throwIO $ userError "connection not open"
)
else do
CE.throwIO $ userError "channel not open"
writeAssembly' :: Channel -> Assembly -> IO ()
writeAssembly' chan (ContentMethod m properties msg) = do
waitLock $ chanActive chan
let !toWrite =
[(MethodPayload m),
(ContentHeaderPayload
(getClassIDOf properties) --classID
0
(fromIntegral $ BL.length msg) --bodySize
properties)] ++
(if BL.length msg > 0
then do
map ContentBodyPayload
(splitLen msg $ (fromIntegral $ connMaxFrameSize $ connection chan) 8)
else []
)
writeFrames chan toWrite
where
splitLen str len | BL.length str > len = (BL.take len str):(splitLen (BL.drop len str) len)
splitLen str _ = [str]
writeAssembly' chan (SimpleMethod m) = do
writeFrames chan [MethodPayload m]
writeAssembly :: Channel -> Assembly -> IO ()
writeAssembly chan m =
CE.catches
(writeAssembly' chan m)
[CE.Handler (\ (ex :: AMQPException) -> throwMostRelevantAMQPException chan),
CE.Handler (\ (ex :: CE.ErrorCall) -> throwMostRelevantAMQPException chan),
CE.Handler (\ (ex :: CE.IOException) -> throwMostRelevantAMQPException chan)]
request :: Channel -> Assembly -> IO Assembly
request chan m = do
res <- newEmptyMVar
CE.catches (do
withMVar (chanClosed chan) $ \cc -> do
if isNothing cc
then do
modifyMVar_ (outstandingResponses chan) $ \val -> return $! val Seq.|> res
writeAssembly' chan m
else CE.throwIO $ userError "closed"
!r <- takeMVar res
return r
)
[CE.Handler (\ (ex :: AMQPException) -> throwMostRelevantAMQPException chan),
CE.Handler (\ (ex :: CE.ErrorCall) -> throwMostRelevantAMQPException chan),
CE.Handler (\ (ex :: CE.IOException) -> throwMostRelevantAMQPException chan)]
throwMostRelevantAMQPException chan = do
cc <- readMVar $ connClosed $ connection chan
case cc of
Just r -> CE.throwIO $ ConnectionClosedException r
Nothing -> do
chc <- readMVar $ chanClosed chan
case chc of
Just r -> CE.throwIO $ ChannelClosedException r
Nothing -> CE.throwIO $ ConnectionClosedException "unknown reason"
qos :: Channel -> Word32 -> Word16 -> IO ()
qos chan prefetchSize prefetchCount = do
(SimpleMethod Basic_qos_ok) <- request chan (SimpleMethod (Basic_qos
prefetchSize
prefetchCount
False
))
return ()
data AMQPException =
ChannelClosedException String
| ConnectionClosedException String
deriving (Typeable, Show, Ord, Eq)
instance CE.Exception AMQPException