Safe Haskell | None |
---|---|
Language | Haskell2010 |
High level functions for working with message queues. Built on top of Network.AMQP. See https://hackage.haskell.org/package/amqp, which only works with RabbitMQ: https://www.rabbitmq.com/
Example:
Connect to a server, initialize a queue, publish a message, and create a worker to process them.
{-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE DeriveGeneric #-} module Main where import Control.Monad.Catch (SomeException) import Data.Aeson (FromJSON, ToJSON) import Data.Text (Text) import GHC.Generics (Generic) import qualified Network.AMQP.Worker as Worker import Network.AMQP.Worker (fromURI, Exchange, Queue, Direct, def, WorkerException, Message(..), Connection) data TestMessage = TestMessage { greeting :: Text } deriving (Generic, Show, Eq) instance FromJSON TestMessage instance ToJSON TestMessage exchange :: Exchange exchange = Worker.exchange "testExchange" queue :: Queue Direct TestMessage queue = Worker.queue exchange "testQueue" results :: Queue Direct Text results = Worker.queue exchange "resultQueue" example :: IO () example = do -- connect conn <- Worker.connect (fromURI "amqp://guest:guest@localhost:5672") -- initialize the queues Worker.initQueue conn queue Worker.initQueue conn results -- publish a message Worker.publish conn queue (TestMessage "hello world") -- create a worker, the program loops here Worker.worker def conn queue onError (onMessage conn) onMessage :: Connection -> Message TestMessage -> IO () onMessage conn m = do let testMessage = value m putStrLn "Got Message" print testMessage Worker.publish conn results (greeting testMessage) onError :: WorkerException SomeException -> IO () onError e = do putStrLn "Do something with errors" print e
- exchange :: ExchangeName -> Exchange
- type ExchangeName = Text
- queue :: Exchange -> RoutingKey -> Queue Direct msg
- topicQueue :: Exchange -> BindingKey -> Queue Topic msg
- data Exchange = Exchange ExchangeOpts
- data Queue queueType msg = Queue Exchange queueType QueueOpts
- type Direct = RoutingKey
- type Topic = BindingKey
- data Connection
- connect :: ConnectionOpts -> IO Connection
- disconnect :: Connection -> IO ()
- fromURI :: String -> ConnectionOpts
- initQueue :: QueueKey key => Connection -> Queue key msg -> IO ()
- publish :: (ToJSON msg, MonadBaseControl IO m) => Connection -> Queue Direct msg -> msg -> m ()
- publishToExchange :: (ToJSON a, MonadBaseControl IO m) => Connection -> ExchangeName -> RoutingKey -> a -> m ()
- consume :: (FromJSON msg, MonadBaseControl IO m) => Connection -> Queue key msg -> m (Maybe (ConsumeResult msg))
- consumeNext :: (FromJSON msg, MonadBaseControl IO m) => Microseconds -> Connection -> Queue key msg -> m (ConsumeResult msg)
- data ConsumeResult a
- = Parsed (Message a)
- | Error ParseError
- data ParseError = ParseError String ByteString
- data Message a = Message {
- body :: ByteString
- value :: a
- worker :: (FromJSON a, MonadBaseControl IO m, MonadCatch m) => WorkerOptions -> Connection -> Queue key a -> (WorkerException SomeException -> m ()) -> (Message a -> m ()) -> m ()
- data WorkerException e
- data WorkerOptions = WorkerOptions {}
- type Microseconds = Int
- def :: Default a => a
- newtype RoutingKey = RoutingKey Text
- newtype BindingKey = BindingKey [BindingName]
- data BindingName
- class QueueKey key where
Declaring Queues and Exchanges
exchange :: ExchangeName -> Exchange Source #
Declare an exchange
In AMQP, exchanges can be fanout, direct, or topic. This library attempts to simplify this choice by making all exchanges be topic exchanges, and allowing the user to specify topic or direct behavior on the queue itself. See queue
exchange :: Exchange exchange = Worker.exchange "testExchange"
type ExchangeName = Text Source #
queue :: Exchange -> RoutingKey -> Queue Direct msg Source #
Declare a direct queue with the name RoutingKey
. Direct queues work as you expect: you can publish messages to them and read from them.
queue :: Queue Direct MyMessageType queue = Worker.queue exchange "testQueue"
topicQueue :: Exchange -> BindingKey -> Queue Topic msg Source #
Declare a topic queue. Topic queues allow you to bind a queue to a dynamic address with wildcards
queue :: Queue Topic MyMessageType queue = Worker.topicQueue exchange "new-users.*"
data Queue queueType msg Source #
Queues consist of an exchange, a type (Direct or Topic), and a message type
queue :: Queue Direct MyMessageType
type Direct = RoutingKey Source #
type Topic = BindingKey Source #
Connecting
data Connection Source #
connect :: ConnectionOpts -> IO Connection Source #
Connect to the AMQP server.
conn <- connect (fromURI "amqp://guest:guest@localhost:5672")
disconnect :: Connection -> IO () Source #
fromURI :: String -> ConnectionOpts #
Parses amqp standard URI of the form amqp://user:password
host:port/vhost and returns a
ConnectionOpts for use with
openConnection''
| Any of these fields may be empty and will be replaced with defaults from
amqp:/guest:guest@localhost:5672@
Initializing exchanges and queues
initQueue :: QueueKey key => Connection -> Queue key msg -> IO () Source #
Register a queue and its exchange with the AMQP server. Call this before publishing or reading
let queue = Worker.queue exchange "my-queue" :: Queue Direct Text conn <- Worker.connect (fromURI "amqp://guest:guest@localhost:5672") Worker.initQueue conn queue
Publishing Messages
publish :: (ToJSON msg, MonadBaseControl IO m) => Connection -> Queue Direct msg -> msg -> m () Source #
publish a message to a queue. Enforces that the message type and queue name are correct at the type level
let queue = Worker.queue exchange "users" :: Queue Direct User publish conn queue (User "username")
publishToExchange :: (ToJSON a, MonadBaseControl IO m) => Connection -> ExchangeName -> RoutingKey -> a -> m () Source #
publish a message to a routing key, without making sure a queue exists to handle it or if it is the right type of message body
publishToExchange conn "users.admin.created" (User "username")
Reading Messages
consume :: (FromJSON msg, MonadBaseControl IO m) => Connection -> Queue key msg -> m (Maybe (ConsumeResult msg)) Source #
Check for a message once and attempt to parse it
res <- consume conn queue case res of Just (Parsed m) -> print m Just (Error e) -> putStrLn "could not parse message" Notihng -> putStrLn "No messages on the queue"
consumeNext :: (FromJSON msg, MonadBaseControl IO m) => Microseconds -> Connection -> Queue key msg -> m (ConsumeResult msg) Source #
Block while checking for messages every N microseconds. Return once you find one.
res <- consumeNext conn queue case res of (Parsed m) -> print m (Error e) -> putStrLn "could not parse message"
data ConsumeResult a Source #
Parsed (Message a) | |
Error ParseError |
a parsed message from the queue
Message | |
|
Worker
worker :: (FromJSON a, MonadBaseControl IO m, MonadCatch m) => WorkerOptions -> Connection -> Queue key a -> (WorkerException SomeException -> m ()) -> (Message a -> m ()) -> m () Source #
Create a worker which loops, checks for messages, and handles errors
startWorker conn queue = do Worker.worker def conn queue onError onMessage where onMessage :: Message User onMessage m = do putStrLn "handle user message" print (value m) onError :: WorkerException SomeException -> IO () onError e = do putStrLn "Do something with errors"
data WorkerException e Source #
Exceptions created while processing
Eq e => Eq (WorkerException e) Source # | |
Show e => Show (WorkerException e) Source # | |
Exception e => Exception (WorkerException e) Source # | |
data WorkerOptions Source #
Options for worker
WorkerOptions | |
|
type Microseconds = Int Source #
Routing Keys
newtype RoutingKey Source #
A name used to address queues
newtype BindingKey Source #
A dynamic binding address for topic queues
commentsKey :: BindingKey commentsKey = "posts.*.comments"