amqp-conduit-0.2.0.0: Conduit bindings for AMQP (see amqp package)

Safe HaskellNone
LanguageHaskell98

Network.AMQP.Conduit

Contents

Description

Conduit bindings for AMQP (see amqp package) https://hackage.haskell.org/package/amqp

Create a AMQP connection, a channel, declare a queue and an exchange and run the given action.

Example:

Connect to a server, declare a queue and an exchange and setup a callback for messages coming in on the queue. Then publish a single message to our new exchange

{-# LANGUAGE OverloadedStrings #-}

import           Control.Concurrent           (threadDelay)
import           Control.Monad.IO.Class       (MonadIO, liftIO)
import           Control.Monad.Trans.Resource (runResourceT)
import qualified Data.ByteString.Lazy.Char8   as BL
import           Data.Conduit
import           Network.AMQP
import           Network.AMQP.Conduit
import           Test.Hspec


main :: IO ()
main = hspec $ do
    describe "produce and consume test" $ do
        it "send a message and recieve the message" $ do
            runResourceT $ withChannel config $ \conn -> do
                sendMsg str $$ amqpSendSink conn "myExchange" "myKey"
            amqp <- createChannel config
            amqp' <- createConsumer amqp "myQueue" Ack $ \(msg,env) -> do
                amqpReceiveSource (msg,env) $$ printMsg
            -- | NOTE: RabbitMQ 1.7 doesn't implement this command.
            -- amqp'' <- pauseConsumers amqp'
            -- amqp''' <- resumeConsumers amqp''
            threadDelay $ 15  * 1000000
            _ <- deleteConsumer amqp'
            return ()

str :: String
str = "This is a test message"

config :: AmqpConf
config = AmqpConf "amqp://guest:guest@localhost:5672/" queue exchange "myKey"
    where
        exchange = newExchange {exchangeName = "myExchange", exchangeType = "direct"}
        queue = newQueue {queueName = "myQueue"}

sendMsg :: (Monad m, MonadIO m) => String -> Source m Message
sendMsg msg = do
    yield (newMsg {msgBody = (BL.pack msg),msgDeliveryMode = Just Persistent} )

printMsg :: (Monad m, MonadIO m) => Sink (Message, Envelope) m ()
printMsg = do
    m <- await
    case m of
       Nothing -> printMsg
       Just (msg,env) -> do
           liftIO $ ackEnv env
           liftIO $ (BL.unpack $ msgBody msg) `shouldBe` str
           liftIO $ putStrLn $ "received message: " ++ (BL.unpack $ msgBody msg)
           -

Synopsis

Conduit

amqpReceiveSource :: MonadIO m => (Message, Envelope) -> Source m (Message, Envelope) Source

Source as consuming data pushed.

amqpSendSink :: MonadIO m => AmqpConn -> Exchange -> ExchangeKey -> Sink Message m () Source

Sink as sending data.

Data type

data AmqpConf Source

Amqp connection configuration. queue name, exchange name, exchange key name, and amqp URI.

Constructors

AmqpConf 

Fields

amqpUri :: AmqpURI

Connection string to the database.

amqpQueue :: QueueOpts
 
amqpExchange :: ExchangeOpts
 
amqpExchanKey :: ExchangeKey
 

data AmqpConn Source

Amqp Connection and Channel

Connection and Channel

withChannel Source

Arguments

:: (MonadIO m, MonadBaseControl IO m) 
=> AmqpConf

Connection config to the AMQP server.

-> (AmqpConn -> m a)

Action to be executed that uses the connection.

-> m a 

Create a AMQP connection and a channel and run the given action. The connetion and channnel are properly released after the action finishes using it. Note that you should not use the given Connection, channel outside the action since it may be already been released.

createConnectionChannel Source

Arguments

:: AmqpConf

Connection config to the AMQP server.

-> IO AmqpConn 

Create a connection and a channel. Note that it's your responsability to properly close the connection and the channels when unneeded. Use withAMQPChannels for an automatic resource control.

destoryConnection :: AmqpConn -> IO () Source

Close a connection

Exchange and Queue utils

Consumer utils