amqp-conduit-0.1.0.0: Conduit bindings for AMQP (see amqp package)

Safe HaskellNone
LanguageHaskell98

Network.AMQP.Conduit

Contents

Description

Conduit bindings for AMQP (see amqp package) https://hackage.haskell.org/package/amqp

Create a AMQP connection, a channel, declare a queue and an exchange and run the given action.

Example:

Connect to a server, declare a queue and an exchange and setup a callback for messages coming in on the queue. Then publish a single message to our new exchange

{-# LANGUAGE OverloadedStrings #-}

import           Control.Concurrent           (threadDelay)
import           Control.Monad.IO.Class       (MonadIO, liftIO)
import           Control.Monad.Trans.Resource (runResourceT)
import qualified Data.ByteString.Lazy.Char8   as BL
import           Data.Conduit
import           Network.AMQP
import           Network.AMQP.Conduit
import           Test.Hspec

main :: IO ()
main = hspec $ do
    describe "produce and consume test" $ do
        it "send a message and recieve the message" $ do
            runResourceT $ withAMQPChannel config $ \conn -> do
                sendMsg str $$ amqpSendSink conn "myKey"
            amqp <- createAMQPChannels config 10
            amqp' <- createConsumers amqp $ \(msg,env) -> do
                amqpReceiveSource (msg,env) $$ printMsg
            -- | NOTE: RabbitMQ 1.7 doesn't implement this command.
            -- amqp'' <- pauseConsumers amqp'
            -- amqp''' <- resumeConsumers amqp''
            threadDelay $ 15  * 1000000
            _ <- deleteConsumers amqp'
            return ()

str :: String
str = "This is a test message"

config :: AmqpConf
config = AmqpConf "amqp://guest:guest@192.168.59.103:5672/" queue exchange "myKey"
    where
        exchange = newExchange {exchangeName = "myExchange", exchangeType = "direct"}
        queue = newQueue {queueName = "myQueue"}

sendMsg :: (Monad m, MonadIO m) => String -> Source m Message
sendMsg msg = do
    yield (newMsg {msgBody = (BL.pack msg),msgDeliveryMode = Just Persistent} )

printMsg :: (Monad m, MonadIO m) => Sink (Message, Envelope) m ()
printMsg = do
    m <- await
    case m of
       Nothing -> printMsg
       Just (msg,env) -> do
           liftIO $ ackEnv env
           liftIO $ (BL.unpack $ msgBody msg) `shouldBe` str
           liftIO $ putStrLn $ "received message: " ++ (BL.unpack $ msgBody msg)
           -

Synopsis

Conduit

amqpReceiveSource :: (Monad m, MonadIO m) => (Message, Envelope) -> Source m (Message, Envelope) Source

this is for consumer, if you usea it with createConsumers, You have to call ackMsg or ackEnv after processing for any message that you get, otherwise it might be delivered again (see "ackMsg" and "ackEnv" in the Network.AMQP module)

amqpSendSink :: (Monad m, MonadIO m) => AmqpConn -> ExchangeKey -> Sink Message m () Source

a sink as sending msgs.

Data type

data AmqpConf Source

Amqp connection configuration. queue name, exchange name, exchange key name, and amqp URI.

Constructors

AmqpConf 

Fields

amqpUri :: AmqpURI

Connection string to the database.

amqpQueue :: QueueOpts
 
amqpExchange :: ExchangeOpts
 
amqpExchanKey :: ExchangeKey
 

data AmqpConn Source

Amqp Connection and Channel

Constructors

AmqpConn 

Fields

amqpConn :: Connection
 
amqpChan :: [(Channel, Maybe ConsumerTag)]
 
amqpConf :: AmqpConf
 

type ExchangeKey = Text Source

Connection and Channel

withAMQPChannel Source

Arguments

:: (MonadIO m, MonadBaseControl IO m) 
=> AmqpConf

Connection config to the AMQP server.

-> (AmqpConn -> m a)

Action to be executed that uses the connection.

-> m a 

Same as runAMQP, but only one channel is opened.

withAMQPChannels Source

Arguments

:: (MonadIO m, MonadBaseControl IO m) 
=> AmqpConf

Connection config to the AMQP server.

-> Int

number of channels to be kept open in the connection.

-> (AmqpConn -> m a)

Action to be executed that uses the connection.

-> m a 

Create a AMQP connection and Channel(s) and run the given action. The connetion and channnels are properly released after the action finishes using it. Note that you should not use the given Connection, channels outside the action since it may be already been released.

createAMQPChannels Source

Arguments

:: (MonadIO m, MonadBaseControl IO m) 
=> AmqpConf

Connection config to the AMQP server.

-> Int

number of channels to be kept open in the connection

-> m AmqpConn 

Create a connection and channels. Note that it's your responsability to properly close the connection and the channels when unneeded. Use withAMQPChannels for an automatic resource control.

Consumer utils

createConsumers :: (MonadIO m, MonadBaseControl IO m) => AmqpConn -> ((Message, Envelope) -> m ()) -> m AmqpConn Source

You have to call ackMsg or ackEnv after processing for any message that you get, otherwise it might be delivered again (see "ackMsg" and "ackEnv" in the Network.AMQP module)

createConsumers' :: (MonadIO m, MonadBaseControl IO m) => AmqpConn -> ((Message, Envelope) -> m ()) -> m AmqpConn Source

after processing for any message tha you get, automatically acknowledged (see NoAck in the Network.AMQP module)

createConsumer :: (MonadIO m, MonadBaseControl IO m) => (Channel, Maybe ConsumerTag) -> Text -> Ack -> ((Message, Envelope) -> m ()) -> m (Channel, Maybe ConsumerTag) Source

deleteConsumer :: MonadIO m => (Channel, Maybe ConsumerTag) -> m (Channel, Maybe ConsumerTag) Source

pauseConsumer :: (Channel, Maybe ConsumerTag) -> IO (Channel, Maybe ConsumerTag) Source

resumeConsumer :: (Channel, Maybe ConsumerTag) -> IO (Channel, Maybe ConsumerTag) Source