{-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE MultiParamTypeClasses #-} {-# LANGUAGE TypeOperators #-} -- | -- Conduit bindings for AMQP (see amqp package) https://hackage.haskell.org/package/amqp -- -- Create a AMQP connection, a channel, declare a queue and an exchange -- and run the given action. -- -- /Example/: -- -- Connect to a server, declare a queue and an exchange and setup a callback for messages coming in on the queue. Then publish a single message to our new exchange -- -- >{-# LANGUAGE OverloadedStrings #-} -- > -- >import Control.Concurrent (threadDelay) -- >import Control.Monad.IO.Class (MonadIO, liftIO) -- >import Control.Monad.Trans.Resource (runResourceT) -- >import qualified Data.ByteString.Lazy.Char8 as BL -- >import Data.Conduit -- >import Network.AMQP -- >import Network.AMQP.Conduit -- >import Test.Hspec -- > -- >main :: IO () -- >main = hspec $ do -- > describe "produce and consume test" $ do -- > it "send a message and recieve the message" $ do -- > runResourceT $ withAMQPChannel config $ \conn -> do -- > sendMsg str $$ amqpSendSink conn "myKey" -- > amqp <- createAMQPChannels config 10 -- > amqp' <- createConsumers amqp $ \(msg,env) -> do -- > amqpReceiveSource (msg,env) $$ printMsg -- > -- | NOTE: RabbitMQ 1.7 doesn't implement this command. -- > -- amqp'' <- pauseConsumers amqp' -- > -- amqp''' <- resumeConsumers amqp'' -- > threadDelay $ 15 * 1000000 -- > _ <- deleteConsumers amqp' -- > return () -- > -- >str :: String -- >str = "This is a test message" -- > -- >config :: AmqpConf -- >config = AmqpConf "amqp://guest:guest@192.168.59.103:5672/" queue exchange "myKey" -- > where -- > exchange = newExchange {exchangeName = "myExchange", exchangeType = "direct"} -- > queue = newQueue {queueName = "myQueue"} -- > -- >sendMsg :: (Monad m, MonadIO m) => String -> Source m Message -- >sendMsg msg = do -- > yield (newMsg {msgBody = (BL.pack msg),msgDeliveryMode = Just Persistent} ) -- > -- >printMsg :: (Monad m, MonadIO m) => Sink (Message, Envelope) m () -- >printMsg = do -- > m <- await -- > case m of -- > Nothing -> printMsg -- > Just (msg,env) -> do -- > liftIO $ ackEnv env -- > liftIO $ (BL.unpack $ msgBody msg) `shouldBe` str -- > liftIO $ putStrLn $ "received message: " ++ (BL.unpack $ msgBody msg) -- > - -- > module Network.AMQP.Conduit ( -- * Conduit amqpReceiveSource , amqpSendSink -- * Data type , AmqpConf (..) , AmqpConn (..) , ExchangeKey , AmqpURI -- * Connection and Channel , withAMQPChannel , withAMQPChannels , createAMQPChannels , closeChannels , destoryConnection -- * Consumer utils , createConsumers , createConsumers' , createConsumer , deleteConsumers , deleteConsumer , pauseConsumers , pauseConsumer , resumeConsumers , resumeConsumer , module Network.AMQP ) where import Control.Exception.Lifted (bracket) import Control.Monad.Except -- import Control.Monad.IO.Class (MonadIO, liftIO) import Control.Monad.Trans.Control (MonadBaseControl) import Data.Conduit (Sink, Source, await, yield) import Data.Maybe (isJust, isNothing) import Data.Text (Text) import Network.AMQP (AMQPException (ConnectionClosedException), Ack (Ack, NoAck), Channel, Connection, ConsumerTag, Envelope, ExchangeOpts, Message, QueueOpts, addConnectionClosedHandler, bindQueue, cancelConsumer, closeChannel, closeConnection, declareExchange, declareQueue, exchangeName, flow, fromURI, getMsg, openChannel, openConnection'', publishMsg, queueName) import Network.AMQP.Lifted (consumeMsgs) -- | Amqp Connection and Channel data AmqpConn = AmqpConn { amqpConn :: Connection , amqpChan :: [(Channel, Maybe ConsumerTag)] , amqpConf :: AmqpConf } -- | Amqp connection configuration. queue name, exchange name, exchange key name, and amqp URI. data AmqpConf = AmqpConf { -- | Connection string to the database. amqpUri :: AmqpURI , amqpQueue :: QueueOpts , amqpExchange :: ExchangeOpts , amqpExchanKey :: ExchangeKey } type ExchangeKey = Text type AmqpURI = String -- |Create a AMQP connection and Channel(s) and run the given action. The connetion and channnels are properly released after the action finishes using it. Note that you should not use the given Connection, channels outside the action since it may be already been released. withAMQPChannels :: (MonadIO m, MonadBaseControl IO m) => AmqpConf -- ^ Connection config to the AMQP server. -> Int -- ^ number of channels to be kept open in the connection. -> (AmqpConn -> m a) -- ^ Action to be executed that uses the connection. -> m a withAMQPChannels conf num f = bracket (do -- liftIO $ putStrLn "connecting.." liftIO $ connect conf (numCheck num)) (\conn -> do -- liftIO $ putStrLn "disconnecting.." liftIO $liftIO $ disconnect conn) (\conn -> do -- liftIO $ putStrLn "calling function" f conn) -- | Same as runAMQP, but only one channel is opened. withAMQPChannel:: (MonadIO m, MonadBaseControl IO m) => AmqpConf -- ^ Connection config to the AMQP server. -> (AmqpConn -> m a) -- ^ Action to be executed that uses the connection. -> m a withAMQPChannel conf = withAMQPChannels conf 1 -- | Create a connection and channels. Note that it's your responsability to properly close the connection and the channels when unneeded. Use withAMQPChannels for an automatic resource control. createAMQPChannels :: (MonadIO m, MonadBaseControl IO m) => AmqpConf -- ^ Connection config to the AMQP server. -> Int -- ^ number of channels to be kept open in the connection -> m AmqpConn createAMQPChannels conf num = liftIO $ connect conf (numCheck num) -- Consumer utils createConsumer :: (MonadIO m, MonadBaseControl IO m) => (Channel, Maybe ConsumerTag) -> Text -> Ack -> ((Message, Envelope) -> m ()) -> m (Channel, Maybe ConsumerTag) createConsumer (chan, tag) queue ack f = case tag of Nothing -> do tag' <- consumeMsgs chan queue ack f return (chan, Just tag') Just _ -> return (chan, tag) deleteConsumer :: MonadIO m => (Channel, Maybe ConsumerTag) -> m (Channel, Maybe ConsumerTag) deleteConsumer (chan, tag) = case tag of Nothing -> return (chan, tag) Just s -> do liftIO $ putStrLn "cancel cunsumer channel." liftIO $ cancelConsumer chan s return (chan, Nothing) createConsumersHelper :: (MonadIO m, MonadBaseControl IO m) => AmqpConn -> ((Message, Envelope) -> m ()) -> Ack -> m AmqpConn createConsumersHelper conn f ack = do chan <- mapM (\chan -> createConsumer chan qName ack f) chanList return $ conn {amqpChan = chan} where qName = queueName (amqpQueue (amqpConf conn)) chanList = filter (\(_, tag) -> isNothing tag) (amqpChan conn) -- | after processing for any message tha you get, automatically acknowledged (see "NoAck" in the "Network.AMQP" module) createConsumers' :: (MonadIO m, MonadBaseControl IO m) => AmqpConn -> ((Message, Envelope) -> m ()) -> m AmqpConn createConsumers' conn f = createConsumersHelper conn f NoAck -- | You have to call ackMsg or ackEnv after processing for any message that you get, otherwise it might be delivered again (see "ackMsg" and "ackEnv" in the "Network.AMQP" module) createConsumers :: (MonadIO m, MonadBaseControl IO m) => AmqpConn -> ((Message, Envelope) -> m ()) -> m AmqpConn createConsumers conn f = createConsumersHelper conn f Ack -- helper functions deleteConsumers :: AmqpConn -> IO AmqpConn deleteConsumers conn = do chan <- mapM deleteConsumer chanList return $ conn {amqpChan = chan} where chanList = filter (\(_, tag) -> isJust tag) (amqpChan conn) pauseConsumers :: AmqpConn -> IO AmqpConn pauseConsumers conn = do chan <- mapM pauseConsumer chanList return $ conn {amqpChan = chan} where chanList = filter (\(_, tag) -> isJust tag) (amqpChan conn) pauseConsumer :: (Channel, Maybe ConsumerTag ) -> IO (Channel, Maybe ConsumerTag) pauseConsumer chan = flowConsumer chan False resumeConsumers :: AmqpConn -> IO AmqpConn resumeConsumers conn = do chan <- mapM resumeConsumer chanList return $ conn {amqpChan = chan} where chanList = filter (\(_, tag) -> isJust tag) (amqpChan conn) resumeConsumer :: (Channel, Maybe ConsumerTag) -> IO (Channel, Maybe ConsumerTag) resumeConsumer chan = flowConsumer chan True flowConsumer :: (Channel, Maybe ConsumerTag) -> Bool -> IO (Channel, Maybe ConsumerTag) flowConsumer (chan, tag) flag= case tag of Nothing -> return (chan, tag) Just _ -> do flow chan flag return (chan, tag) closeChannels :: AmqpConn -> IO AmqpConn closeChannels conn = do mapM_ (\(chan, _) -> closeChannel chan ) (amqpChan conn) return $ conn {amqpChan =[]} destoryConnection :: AmqpConn -> IO () destoryConnection conn = do addConnectionClosedHandler (amqpConn conn) False (return ()) closeConnection (amqpConn conn) -- internal connect :: AmqpConf -> Int -> IO AmqpConn connect conf num = do -- make a connection and a channel of the connection. conn <- openConnection'' uri chan <- replicateM num (initChan conf conn) -- set a excetion when closeing the connection -- addConnectionClosedHandler conn True (throwIO (ConnectionClosedException "Connection Closed.")) return $ AmqpConn conn chan conf where uri = fromURI (amqpUri conf) initChan :: AmqpConf -> Connection -> IO (Channel, Maybe ConsumerTag) initChan conf conn = do chan <- openChannel conn -- debug -- liftIO $ putStrLn ("URI = " ++ (show (amqpUri conf))) -- liftIO $ putStrLn ("exchange name = " ++ (show eName)) -- liftIO $ putStrLn ("ecchange key = " ++ (show (key))) -- liftIO $ putStrLn ("queue name= = " ++ (show (qName))) -- declare exchange, a queue and binding declareExchange chan (amqpExchange conf) _ <- declareQueue chan (amqpQueue conf) bindQueue chan qName eName key return (chan, Nothing) where qName = queueName (amqpQueue conf) key = amqpExchanKey conf eName = exchangeName (amqpExchange conf) disconnect :: AmqpConn -> IO () disconnect conn = do -- mapM_ (\(chan, _) -> closeChannel chan) (amqpChan conn) closeConnection (amqpConn conn) send :: AmqpConn -> ExchangeKey -> Message -> IO () send conn key msg = do -- liftIO $ putStrLn ("key = " ++ show key ++ ", msg = " ++ show msg) publishMsg channel (exchangeName (amqpExchange (amqpConf conn))) key msg where channel = fst $ head $ (amqpChan conn) numCheck :: Int -> Int numCheck int | int <= 0 = 1 | int > 0 = int | otherwise = 1 -- | this is for consumer, if you usea it with createConsumers, You have to call ackMsg or ackEnv after processing for any message that you get, otherwise it might be delivered again (see "ackMsg" and "ackEnv" in the "Network.AMQP" module) amqpReceiveSource :: (Monad m, MonadIO m) => (Message, Envelope) -> Source m (Message, Envelope) amqpReceiveSource (msg, env) = loop where loop = do yield (msg, env) loop -- | a sink as sending msgs. amqpSendSink :: (Monad m, MonadIO m) => AmqpConn -> ExchangeKey -> Sink Message m () amqpSendSink conn key = loop where loop = await >>= maybe (return ()) (\v -> (liftIO $ send conn key v) >> loop)