module Network.AMQP (
Connection,
openConnection,
openConnection',
closeConnection,
Channel,
openChannel,
ExchangeOpts(..),
newExchange,
declareExchange,
deleteExchange,
QueueOpts(..),
newQueue,
declareQueue,
bindQueue,
purgeQueue,
deleteQueue,
Message(..),
DeliveryMode(..),
newMsg,
Envelope(..),
ConsumerTag,
Ack(..),
consumeMsgs,
cancelConsumer,
publishMsg,
getMsg,
rejectMsg,
recoverMsgs,
ackMsg,
ackEnv,
txSelect,
txCommit,
txRollback,
flow,
AMQPException(..)
) where
import Data.Binary
import Data.Binary.Get
import Data.Binary.Put as BPut
import Data.Typeable
import qualified Data.Map as M
import qualified Data.IntMap as IM
import qualified Data.ByteString.Char8 as BS
import qualified Data.ByteString.Lazy.Char8 as BL
import Data.IORef
import Data.Maybe
import Data.Int
import Control.Concurrent
import Control.Monad
import qualified Control.Exception as CE
import Network.BSD
import Network.Socket
import qualified Network.Socket.ByteString as NB
import Network.AMQP.Protocol
import Network.AMQP.Types
import Network.AMQP.Helpers
import Network.AMQP.Generated
data ExchangeOpts = ExchangeOpts
{
exchangeName :: String,
exchangeType :: String,
exchangePassive :: Bool,
exchangeDurable :: Bool,
exchangeAutoDelete :: Bool,
exchangeInternal :: Bool
}
newExchange :: ExchangeOpts
newExchange = ExchangeOpts "" "" False True False False
declareExchange :: Channel -> ExchangeOpts -> IO ()
declareExchange chan exchg = do
(SimpleMethod Exchange_declare_ok) <- request chan (SimpleMethod (Exchange_declare
1
(ShortString $ exchangeName exchg)
(ShortString $ exchangeType exchg)
(exchangePassive exchg)
(exchangeDurable exchg)
(exchangeAutoDelete exchg)
(exchangeInternal exchg)
False
(FieldTable (M.fromList []))))
return ()
deleteExchange :: Channel -> String -> IO ()
deleteExchange chan exchangeName = do
(SimpleMethod Exchange_delete_ok) <- request chan (SimpleMethod (Exchange_delete
1
(ShortString exchangeName)
False
False
))
return ()
data QueueOpts = QueueOpts
{
queueName :: String,
--optional
queuePassive :: Bool,
queueDurable :: Bool,
queueExclusive :: Bool,
queueAutoDelete :: Bool
}
newQueue :: QueueOpts
newQueue = QueueOpts "" False True False False
declareQueue :: Channel -> QueueOpts -> IO (String, Int, Int)
declareQueue chan queue = do
(SimpleMethod (Queue_declare_ok (ShortString qName) messageCount consumerCount)) <- request chan $ (SimpleMethod (Queue_declare
1
(ShortString $ queueName queue)
(queuePassive queue)
(queueDurable queue)
(queueExclusive queue)
(queueAutoDelete queue)
False
(FieldTable (M.fromList []))))
return (qName, fromIntegral messageCount, fromIntegral consumerCount)
bindQueue :: Channel -> String -> String -> String -> IO ()
bindQueue chan queueName exchangeName routingKey = do
(SimpleMethod Queue_bind_ok) <- request chan (SimpleMethod (Queue_bind
1
(ShortString queueName)
(ShortString exchangeName)
(ShortString routingKey)
False
(FieldTable (M.fromList []))))
return ()
purgeQueue :: Channel -> String -> IO Word32
purgeQueue chan queueName = do
(SimpleMethod (Queue_purge_ok msgCount)) <- request chan $ (SimpleMethod (Queue_purge
1
(ShortString queueName)
False
))
return msgCount
deleteQueue :: Channel -> String -> IO Word32
deleteQueue chan queueName = do
(SimpleMethod (Queue_delete_ok msgCount)) <- request chan $ (SimpleMethod (Queue_delete
1
(ShortString queueName)
False
False
False
))
return msgCount
type ConsumerTag = String
data Ack = Ack | NoAck
ackToBool :: Ack -> Bool
ackToBool Ack = False
ackToBool NoAck = True
consumeMsgs :: Channel -> String -> Ack -> ((Message,Envelope) -> IO ()) -> IO ConsumerTag
consumeMsgs chan queueName ack callback = do
newConsumerTag <- (liftM show) $ modifyMVar (lastConsumerTag chan) $ \c -> return (c+1,c+1)
modifyMVar_ (consumers chan) $ \c -> return $ M.insert newConsumerTag callback c
writeAssembly chan (SimpleMethod $ Basic_consume
1
(ShortString queueName)
(ShortString newConsumerTag)
False
(ackToBool ack)
False
True
)
return newConsumerTag
cancelConsumer :: Channel -> ConsumerTag -> IO ()
cancelConsumer chan consumerTag = do
modifyMVar_ (consumers chan) $ \c -> return $ M.delete consumerTag c
(SimpleMethod (Basic_cancel_ok consumerTag')) <- request chan $ (SimpleMethod (Basic_cancel
(ShortString consumerTag)
False
))
return ()
publishMsg :: Channel -> String -> String -> Message -> IO ()
publishMsg chan exchangeName routingKey msg = do
writeAssembly chan (ContentMethod (Basic_publish
1
(ShortString exchangeName)
(ShortString routingKey)
False
False)
(CHBasic
(fmap ShortString $ msgContentType msg)
Nothing
Nothing
(fmap deliveryModeToInt $ msgDeliveryMode msg)
Nothing
Nothing
Nothing
Nothing
(fmap ShortString $ msgID msg)
(msgTimestamp msg)
Nothing
Nothing
Nothing
Nothing
)
(msgBody msg))
return ()
getMsg :: Channel -> Ack -> String -> IO (Maybe (Message, Envelope))
getMsg chan ack queueName = do
ret <- request chan (SimpleMethod (Basic_get
1
(ShortString queueName)
(ackToBool ack)
))
case ret of
ContentMethod (Basic_get_ok deliveryTag redelivered (ShortString exchangeName) (ShortString routingKey) msgCount) properties msgBody ->
return $ Just $ (msgFromContentHeaderProperties properties msgBody,
Envelope {envDeliveryTag = deliveryTag, envRedelivered = redelivered,
envExchangeName = exchangeName, envRoutingKey = routingKey, envChannel = chan})
_ -> return Nothing
ackMsg :: Channel -> LongLongInt -> Bool -> IO ()
ackMsg chan deliveryTag multiple =
writeAssembly chan $ (SimpleMethod (Basic_ack
deliveryTag
multiple
))
ackEnv :: Envelope -> IO ()
ackEnv env = ackMsg (envChannel env) (envDeliveryTag env) False
rejectMsg :: Channel -> LongLongInt -> Bool -> IO ()
rejectMsg chan deliveryTag requeue =
writeAssembly chan $ (SimpleMethod (Basic_reject
deliveryTag
requeue
))
recoverMsgs :: Channel -> Bool -> IO ()
recoverMsgs chan requeue =
writeAssembly chan $ (SimpleMethod (Basic_recover
requeue
))
txSelect :: Channel -> IO ()
txSelect chan = do
(SimpleMethod Tx_select_ok) <- request chan $ SimpleMethod Tx_select
return ()
txCommit :: Channel -> IO ()
txCommit chan = do
(SimpleMethod Tx_commit_ok) <- request chan $ SimpleMethod Tx_commit
return ()
txRollback :: Channel -> IO ()
txRollback chan = do
(SimpleMethod Tx_rollback_ok) <- request chan $ SimpleMethod Tx_rollback
return ()
flow :: Channel -> Bool -> IO ()
flow chan active = do
(SimpleMethod (Channel_flow_ok _)) <- request chan $ SimpleMethod (Channel_flow active)
return ()
data Envelope = Envelope
{
envDeliveryTag :: LongLongInt,
envRedelivered :: Bool,
envExchangeName :: String,
envRoutingKey :: String,
envChannel :: Channel
}
data DeliveryMode = Persistent
| NonPersistent
deriving Show
deliveryModeToInt NonPersistent = 1
deliveryModeToInt Persistent = 2
intToDeliveryMode 1 = NonPersistent
intToDeliveryMode 2 = Persistent
data Message = Message {
msgBody :: BL.ByteString,
msgDeliveryMode :: Maybe DeliveryMode,
msgTimestamp :: Maybe Timestamp,
msgID :: Maybe String,
msgContentType :: Maybe String
}
deriving Show
newMsg :: Message
newMsg = Message (BL.empty) Nothing Nothing Nothing Nothing
data Assembly = SimpleMethod MethodPayload
| ContentMethod MethodPayload ContentHeaderProperties BL.ByteString
deriving Show
readAssembly :: Chan FramePayload -> IO Assembly
readAssembly chan = do
m <- readChan chan
case m of
MethodPayload p ->
if hasContent m
then do
(props, msg) <- collectContent chan
return $ ContentMethod p props msg
else do
return $ SimpleMethod p
x -> error $ "didn't expect frame: "++(show x)
collectContent :: Chan FramePayload -> IO (ContentHeaderProperties, BL.ByteString)
collectContent chan = do
(ContentHeaderPayload _ _ bodySize props) <- readChan chan
content <- collect $ fromIntegral bodySize
return (props, BL.concat content)
where
collect x | x <= 0 = return []
collect rem = do
(ContentBodyPayload payload) <- readChan chan
r <- collect (rem (BL.length payload))
return $ payload : r
data Connection = Connection {
connSocket :: Socket,
connChannels :: (MVar (IM.IntMap (Channel, ThreadId))),
connMaxFrameSize :: Int,
connClosed :: MVar (Maybe String),
connWriteLock :: MVar (),
lastChannelID :: MVar Int
}
connectionReceiver :: Connection -> IO ()
connectionReceiver conn = do
(Frame chanID payload) <- readFrameSock (connSocket conn) (connMaxFrameSize conn)
forwardToChannel chanID payload
connectionReceiver conn
where
forwardToChannel 0 (MethodPayload Connection_close_ok) = do
modifyMVar_ (connClosed conn) $ \x -> return $ Just "closed by user"
killThread =<< myThreadId
forwardToChannel 0 (MethodPayload (Connection_close _ (ShortString errorMsg) _ _ )) = do
modifyMVar_ (connClosed conn) $ \x -> return $ Just errorMsg
killThread =<< myThreadId
forwardToChannel 0 payload = print $ "Got unexpected msg on channel zero: "++(show payload)
forwardToChannel chanID payload = do
withMVar (connChannels conn) $ \cs -> do
case IM.lookup (fromIntegral chanID) cs of
Just c -> writeChan (inQueue $ fst c) payload
Nothing -> print $ "ERROR: channel not open "++(show chanID)
openConnection :: String -> String -> String -> String -> IO Connection
openConnection host vhost loginName loginPassword =
openConnection' host 5672 vhost loginName loginPassword
openConnection' :: String -> PortNumber -> String -> String -> String -> IO Connection
openConnection' host port vhost loginName loginPassword = do
proto <- getProtocolNumber "tcp"
sock <- socket AF_INET Stream proto
addr <- inet_addr host
connect sock (SockAddrInet port addr)
NB.send sock $ toStrict $ BPut.runPut $ do
BPut.putByteString $ BS.pack "AMQP"
BPut.putWord8 1
BPut.putWord8 1 --TCP/IP
BPut.putWord8 9
BPut.putWord8 1
Frame 0 (MethodPayload (Connection_start version_major version_minor server_properties mechanisms locales)) <- readFrameSock sock 4096
writeFrameSock sock start_ok
Frame 0 (MethodPayload (Connection_tune channel_max frame_max heartbeat)) <- readFrameSock sock 4096
let maxFrameSize = (min 131072 frame_max)
writeFrameSock sock (Frame 0 (MethodPayload
(Connection_tune_ok 0 maxFrameSize 0)
))
writeFrameSock sock open
Frame 0 (MethodPayload (Connection_open_ok _)) <- readFrameSock sock $ fromIntegral maxFrameSize
connChannels <- newMVar IM.empty
lastChanID <- newMVar 0
cClosed <- newMVar Nothing
writeLock <- newMVar ()
let conn = Connection sock connChannels (fromIntegral maxFrameSize) cClosed writeLock lastChanID
forkIO $ CE.finally (connectionReceiver conn)
(do
sClose sock
modifyMVar_ cClosed $ \x -> return $ Just $ maybe "closed" id x
withMVar connChannels $ \cc -> mapM_ (\c -> killThread $ snd c) $ IM.elems cc
withMVar connChannels $ \cc -> return $ IM.empty
)
return conn
where
start_ok = (Frame 0 (MethodPayload (Connection_start_ok (FieldTable (M.fromList []))
(ShortString "AMQPLAIN")
(LongString (drop 4 $ BL.unpack $ runPut $ put $ FieldTable (M.fromList [(ShortString "LOGIN",FVLongString $ LongString loginName), (ShortString "PASSWORD", FVLongString $ LongString loginPassword)])))
(ShortString "en_US")) ))
open = (Frame 0 (MethodPayload (Connection_open
(ShortString vhost)
(ShortString "")
True)))
closeConnection :: Connection -> IO ()
closeConnection c = do
CE.catch (
withMVar (connWriteLock c) $ \_ -> writeFrameSock (connSocket c) $ (Frame 0 (MethodPayload (Connection_close
0
(ShortString "")
0
0
)))
)
(\ (e::CE.IOException) -> do
return ()
)
return ()
readFrameSock :: Socket -> Int -> IO Frame
readFrameSock sock maxFrameSize = do
dat <- recvExact 7
let len = fromIntegral $ peekFrameSize dat
dat' <- recvExact (len+1)
let (frame, rest, consumedBytes) = runGetState get (BL.append dat dat') 0
return frame
where
recvExact bytes = recvExact' bytes $ BL.empty
recvExact' bytes buf = do
dat <- NB.recv sock (min bytes maxFrameSize)
if BS.length dat == 0
then CE.throwIO $ ConnectionClosedException ""
else do
let buf' = BL.append buf (toLazy dat)
bufLen = fromIntegral $ BL.length buf'
if bufLen >= bytes
then return buf'
else recvExact' (bytesbufLen) buf'
writeFrameSock :: Socket -> Frame -> IO ()
writeFrameSock sock x = do
NB.send sock $ toStrict $ runPut $ put x
return ()
data Channel = Channel {
connection :: Connection,
inQueue :: Chan FramePayload,
outstandingResponses :: Chan (MVar Assembly),
channelID :: Word16,
lastConsumerTag :: MVar Int,
chanActive :: Lock,
chanClosed :: MVar (Maybe String),
consumers :: MVar (M.Map String ((Message, Envelope) -> IO ()))
}
msgFromContentHeaderProperties :: ContentHeaderProperties -> BL.ByteString -> Message
msgFromContentHeaderProperties
(CHBasic content_type content_encoding headers delivery_mode priority correlation_id reply_to expiration
message_id timestamp typ user_id app_id cluster_id) msgBody =
let msgId =
case message_id of
(Just (ShortString s)) -> Just s
_ -> Nothing
contentType =
case content_type of
(Just (ShortString s)) -> Just s
_ -> Nothing
in
Message msgBody (fmap intToDeliveryMode delivery_mode) timestamp msgId contentType
channelReceiver :: Channel -> IO ()
channelReceiver chan = do
p <- readAssembly $ inQueue chan
if isResponse p
then do
emp <- isEmptyChan $ outstandingResponses chan
if emp
then CE.throwIO $ userError "got response, but have no corresponding request"
else do
x <- readChan (outstandingResponses chan)
putMVar x p
else handleAsync p
channelReceiver chan
where
isResponse :: Assembly -> Bool
isResponse (ContentMethod (Basic_deliver _ _ _ _ _) _ _) = False
isResponse (ContentMethod (Basic_return _ _ _ _) _ _) = False
isResponse (SimpleMethod (Channel_flow _)) = False
isResponse (SimpleMethod (Channel_close _ _ _ _)) = False
isResponse _ = True
handleAsync (ContentMethod (Basic_deliver (ShortString consumerTag) deliveryTag redelivered (ShortString exchangeName)
(ShortString routingKey))
properties msgBody) =
withMVar (consumers chan) (\s -> do
let subscriber = fromJust $ M.lookup consumerTag s
let msg = msgFromContentHeaderProperties properties msgBody
let env = Envelope {envDeliveryTag = deliveryTag, envRedelivered = redelivered,
envExchangeName = exchangeName, envRoutingKey = routingKey, envChannel = chan}
subscriber (msg, env)
)
handleAsync (SimpleMethod (Channel_close errorNum (ShortString errorMsg) _ _)) = do
modifyMVar_ (chanClosed chan) $ \x -> return $ Just errorMsg
closeChannel' chan
killThread =<< myThreadId
handleAsync (SimpleMethod (Channel_flow active)) = do
if active
then openLock $ chanActive chan
else closeLock $ chanActive chan
return ()
--Basic.return
handleAsync (ContentMethod (Basic_return replyCode replyText exchange routingKey) properties msgData) =
print "BASIC.RETURN not implemented"
closeChannel' c = do
modifyMVar_ (connChannels $ connection c) $ \old -> return $ IM.delete (fromIntegral $ channelID c) old
modifyMVar_ (chanClosed c) $ \x -> do
killLock $ chanActive c
killOutstandingResponses $ outstandingResponses c
return $ Just $ maybe "closed" id x
where
killOutstandingResponses :: Chan (MVar a) -> IO ()
killOutstandingResponses chan = do
emp <- isEmptyChan chan
if emp
then return ()
else do
x <- readChan chan
tryPutMVar x $ error "channel closed"
killOutstandingResponses chan
openChannel :: Connection -> IO Channel
openChannel c = do
newInQueue <- newChan
outRes <- newChan
lastConsumerTag <- newMVar 0
ca <- newLock
chanClosed <- newMVar Nothing
consumers <- newMVar M.empty
newChannelID <- modifyMVar (lastChannelID c) $ \x -> return (x+1,x+1)
let newChannel = Channel c newInQueue outRes (fromIntegral newChannelID) lastConsumerTag ca chanClosed consumers
thrID <- forkIO $ CE.finally (channelReceiver newChannel)
(closeChannel' newChannel)
modifyMVar_ (connChannels c) (\oldMap -> return $ IM.insert newChannelID (newChannel, thrID) oldMap)
(SimpleMethod Channel_open_ok) <- request newChannel (SimpleMethod (Channel_open (ShortString "")))
return newChannel
writeFrames :: Channel -> [FramePayload] -> IO ()
writeFrames chan payloads =
let conn = connection chan in
withMVar (connChannels conn) $ \chans ->
if IM.member (fromIntegral $ channelID chan) chans
then
CE.catch
(withMVar (connWriteLock conn) $ \_ ->
mapM_ (\payload -> writeFrameSock (connSocket conn) (Frame (channelID chan) payload)) payloads)
( \(e :: CE.IOException) -> do
CE.throwIO $ userError "connection not open"
)
else do
CE.throwIO $ userError "channel not open"
writeAssembly' :: Channel -> Assembly -> IO ()
writeAssembly' chan (ContentMethod m properties msg) = do
waitLock $ chanActive chan
let !toWrite =
[(MethodPayload m),
(ContentHeaderPayload
(getClassIDOf properties) --classID
0
(fromIntegral $ BL.length msg) --bodySize
properties)] ++
(if BL.length msg > 0
then do
map ContentBodyPayload
(splitLen msg (fromIntegral $ connMaxFrameSize $ connection chan))
else []
)
writeFrames chan toWrite
where
splitLen str len | BL.length str > len = (BL.take len str):(splitLen (BL.drop len str) len)
splitLen str _ = [str]
writeAssembly' chan (SimpleMethod m) = do
writeFrames chan [MethodPayload m]
writeAssembly :: Channel -> Assembly -> IO ()
writeAssembly chan m =
CE.catches
(writeAssembly' chan m)
[CE.Handler (\ (ex :: AMQPException) -> throwMostRelevantAMQPException chan),
CE.Handler (\ (ex :: CE.ErrorCall) -> throwMostRelevantAMQPException chan),
CE.Handler (\ (ex :: CE.IOException) -> throwMostRelevantAMQPException chan)]
request :: Channel -> Assembly -> IO Assembly
request chan m = do
res <- newEmptyMVar
CE.catches (do
withMVar (chanClosed chan) $ \cc -> do
if isNothing cc
then do
writeChan (outstandingResponses chan) res
writeAssembly' chan m
else CE.throwIO $ userError "closed"
!r <- takeMVar res
return r
)
[CE.Handler (\ (ex :: AMQPException) -> throwMostRelevantAMQPException chan),
CE.Handler (\ (ex :: CE.ErrorCall) -> throwMostRelevantAMQPException chan),
CE.Handler (\ (ex :: CE.IOException) -> throwMostRelevantAMQPException chan)]
throwMostRelevantAMQPException chan = do
cc <- readMVar $ connClosed $ connection chan
case cc of
Just r -> CE.throwIO $ ConnectionClosedException r
Nothing -> do
chc <- readMVar $ chanClosed chan
case chc of
Just r -> CE.throwIO $ ChannelClosedException r
Nothing -> CE.throwIO $ ConnectionClosedException "unknown reason"
data AMQPException =
ChannelClosedException String
| ConnectionClosedException String
deriving (Typeable, Show, Ord, Eq)
instance CE.Exception AMQPException