Safe Haskell | None |
---|---|
Language | Haskell98 |
Conduit bindings for AMQP (see amqp package) https://hackage.haskell.org/package/amqp
Create a AMQP connection, a channel, declare a queue and an exchange and run the given action.
Example:
Connect to a server, declare a queue and an exchange and setup a callback for messages coming in on the queue. Then publish a single message to our new exchange
{-# LANGUAGE OverloadedStrings #-} import Control.Concurrent (threadDelay) import Control.Monad.IO.Class (MonadIO, liftIO) import Control.Monad.Trans.Resource (runResourceT) import qualified Data.ByteString.Lazy.Char8 as BL import Data.Conduit import Network.AMQP import Network.AMQP.Conduit import Test.Hspec main :: IO () main = hspec $ do describe "produce and consume test" $ do it "send a message and recieve the message" $ do runResourceT $ withAMQPChannel config $ \conn -> do sendMsg str $$ amqpSendSink conn "myKey" amqp <- createAMQPChannels config 10 amqp' <- createConsumers amqp $ \(msg,env) -> do amqpReceiveSource (msg,env) $$ printMsg -- | NOTE: RabbitMQ 1.7 doesn't implement this command. -- amqp'' <- pauseConsumers amqp' -- amqp''' <- resumeConsumers amqp'' threadDelay $ 15 * 1000000 _ <- deleteConsumers amqp' return () str :: String str = "This is a test message" config :: AmqpConf config = AmqpConf "amqp://guest:guest@192.168.59.103:5672/" queue exchange "myKey" where exchange = newExchange {exchangeName = "myExchange", exchangeType = "direct"} queue = newQueue {queueName = "myQueue"} sendMsg :: (Monad m, MonadIO m) => String -> Source m Message sendMsg msg = do yield (newMsg {msgBody = (BL.pack msg),msgDeliveryMode = Just Persistent} ) printMsg :: (Monad m, MonadIO m) => Sink (Message, Envelope) m () printMsg = do m <- await case m of Nothing -> printMsg Just (msg,env) -> do liftIO $ ackEnv env liftIO $ (BL.unpack $ msgBody msg) `shouldBe` str liftIO $ putStrLn $ "received message: " ++ (BL.unpack $ msgBody msg) -
- amqpReceiveSource :: (Monad m, MonadIO m) => (Message, Envelope) -> Source m (Message, Envelope)
- amqpSendSink :: (Monad m, MonadIO m) => AmqpConn -> ExchangeKey -> Sink Message m ()
- data AmqpConf = AmqpConf {
- amqpUri :: AmqpURI
- amqpQueue :: QueueOpts
- amqpExchange :: ExchangeOpts
- amqpExchanKey :: ExchangeKey
- data AmqpConn = AmqpConn {}
- type ExchangeKey = Text
- type AmqpURI = String
- withAMQPChannel :: (MonadIO m, MonadBaseControl IO m) => AmqpConf -> (AmqpConn -> m a) -> m a
- withAMQPChannels :: (MonadIO m, MonadBaseControl IO m) => AmqpConf -> Int -> (AmqpConn -> m a) -> m a
- createAMQPChannels :: (MonadIO m, MonadBaseControl IO m) => AmqpConf -> Int -> m AmqpConn
- closeChannels :: AmqpConn -> IO AmqpConn
- destoryConnection :: AmqpConn -> IO ()
- createConsumers :: (MonadIO m, MonadBaseControl IO m) => AmqpConn -> ((Message, Envelope) -> m ()) -> m AmqpConn
- createConsumers' :: (MonadIO m, MonadBaseControl IO m) => AmqpConn -> ((Message, Envelope) -> m ()) -> m AmqpConn
- createConsumer :: (MonadIO m, MonadBaseControl IO m) => (Channel, Maybe ConsumerTag) -> Text -> Ack -> ((Message, Envelope) -> m ()) -> m (Channel, Maybe ConsumerTag)
- deleteConsumers :: AmqpConn -> IO AmqpConn
- deleteConsumer :: MonadIO m => (Channel, Maybe ConsumerTag) -> m (Channel, Maybe ConsumerTag)
- pauseConsumers :: AmqpConn -> IO AmqpConn
- pauseConsumer :: (Channel, Maybe ConsumerTag) -> IO (Channel, Maybe ConsumerTag)
- resumeConsumers :: AmqpConn -> IO AmqpConn
- resumeConsumer :: (Channel, Maybe ConsumerTag) -> IO (Channel, Maybe ConsumerTag)
Conduit
amqpReceiveSource :: (Monad m, MonadIO m) => (Message, Envelope) -> Source m (Message, Envelope) Source
this is for consumer, if you usea it with createConsumers, You have to call ackMsg or ackEnv after processing for any message that you get, otherwise it might be delivered again (see "ackMsg" and "ackEnv" in the Network.AMQP module)
amqpSendSink :: (Monad m, MonadIO m) => AmqpConn -> ExchangeKey -> Sink Message m () Source
a sink as sending msgs.
Data type
Amqp connection configuration. queue name, exchange name, exchange key name, and amqp URI.
AmqpConf | |
|
Amqp Connection and Channel
type ExchangeKey = Text Source
Connection and Channel
:: (MonadIO m, MonadBaseControl IO m) | |
=> AmqpConf | Connection config to the AMQP server. |
-> (AmqpConn -> m a) | Action to be executed that uses the connection. |
-> m a |
Same as runAMQP, but only one channel is opened.
:: (MonadIO m, MonadBaseControl IO m) | |
=> AmqpConf | Connection config to the AMQP server. |
-> Int | number of channels to be kept open in the connection. |
-> (AmqpConn -> m a) | Action to be executed that uses the connection. |
-> m a |
Create a AMQP connection and Channel(s) and run the given action. The connetion and channnels are properly released after the action finishes using it. Note that you should not use the given Connection, channels outside the action since it may be already been released.
:: (MonadIO m, MonadBaseControl IO m) | |
=> AmqpConf | Connection config to the AMQP server. |
-> Int | number of channels to be kept open in the connection |
-> m AmqpConn |
Create a connection and channels. Note that it's your responsability to properly close the connection and the channels when unneeded. Use withAMQPChannels for an automatic resource control.
closeChannels :: AmqpConn -> IO AmqpConn Source
destoryConnection :: AmqpConn -> IO () Source
Consumer utils
createConsumers :: (MonadIO m, MonadBaseControl IO m) => AmqpConn -> ((Message, Envelope) -> m ()) -> m AmqpConn Source
You have to call ackMsg or ackEnv after processing for any message that you get, otherwise it might be delivered again (see "ackMsg" and "ackEnv" in the Network.AMQP module)
createConsumers' :: (MonadIO m, MonadBaseControl IO m) => AmqpConn -> ((Message, Envelope) -> m ()) -> m AmqpConn Source
after processing for any message tha you get, automatically acknowledged (see NoAck in the Network.AMQP module)
createConsumer :: (MonadIO m, MonadBaseControl IO m) => (Channel, Maybe ConsumerTag) -> Text -> Ack -> ((Message, Envelope) -> m ()) -> m (Channel, Maybe ConsumerTag) Source
deleteConsumers :: AmqpConn -> IO AmqpConn Source
deleteConsumer :: MonadIO m => (Channel, Maybe ConsumerTag) -> m (Channel, Maybe ConsumerTag) Source
pauseConsumers :: AmqpConn -> IO AmqpConn Source
pauseConsumer :: (Channel, Maybe ConsumerTag) -> IO (Channel, Maybe ConsumerTag) Source
resumeConsumers :: AmqpConn -> IO AmqpConn Source
resumeConsumer :: (Channel, Maybe ConsumerTag) -> IO (Channel, Maybe ConsumerTag) Source