amqp-worker-0.2.4: High level functions for working with message queues

Safe HaskellNone
LanguageHaskell2010

Network.AMQP.Worker

Contents

Description

High level functions for working with message queues. Built on top of Network.AMQP. See https://hackage.haskell.org/package/amqp, which only works with RabbitMQ: https://www.rabbitmq.com/

Example:

Connect to a server, initialize a queue, publish a message, and create a worker to process them.

{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE DeriveGeneric #-}
module Main where

import Control.Monad.Catch (SomeException)
import Data.Aeson (FromJSON, ToJSON)
import Data.Text (Text)
import GHC.Generics (Generic)
import qualified Network.AMQP.Worker as Worker
import Network.AMQP.Worker (fromURI, Exchange, Queue, Direct, def, WorkerException, Message(..), Connection)

data TestMessage = TestMessage
 { greeting :: Text }
 deriving (Generic, Show, Eq)

instance FromJSON TestMessage
instance ToJSON TestMessage


exchange :: Exchange
exchange = Worker.exchange "testExchange"


queue :: Queue Direct TestMessage
queue = Worker.queue exchange "testQueue"


results :: Queue Direct Text
results = Worker.queue exchange "resultQueue"


example :: IO ()
example = do
  -- connect
  conn <- Worker.connect (fromURI "amqp://guest:guest@localhost:5672")

  -- initialize the queues
  Worker.initQueue conn queue
  Worker.initQueue conn results

  -- publish a message
  Worker.publish conn queue (TestMessage "hello world")

  -- create a worker, the program loops here
  Worker.worker def conn queue onError (onMessage conn)


onMessage :: Connection -> Message TestMessage -> IO ()
onMessage conn m = do
  let testMessage = value m
  putStrLn "Got Message"
  print testMessage
  Worker.publish conn results (greeting testMessage)


onError :: WorkerException SomeException -> IO ()
onError e = do
  putStrLn "Do something with errors"
  print e

Synopsis

Declaring Queues and Exchanges

exchange :: ExchangeName -> Exchange Source #

Declare an exchange

In AMQP, exchanges can be fanout, direct, or topic. This library attempts to simplify this choice by making all exchanges be topic exchanges, and allowing the user to specify topic or direct behavior on the queue itself. See queue

exchange :: Exchange
exchange = Worker.exchange "testExchange"

queue :: Exchange -> RoutingKey -> Queue Direct msg Source #

Declare a direct queue with the name RoutingKey. Direct queues work as you expect: you can publish messages to them and read from them.

queue :: Queue Direct MyMessageType
queue = Worker.queue exchange "testQueue"

topicQueue :: Exchange -> BindingKey -> Queue Topic msg Source #

Declare a topic queue. Topic queues allow you to bind a queue to a dynamic address with wildcards

queue :: Queue Topic MyMessageType
queue = Worker.topicQueue exchange "new-users.*"

data Queue queueType msg Source #

Queues consist of an exchange, a type (Direct or Topic), and a message type

queue :: Queue Direct MyMessageType

Constructors

Queue Exchange queueType QueueOpts 

Instances

Eq queueType => Eq (Queue queueType msg) Source # 

Methods

(==) :: Queue queueType msg -> Queue queueType msg -> Bool #

(/=) :: Queue queueType msg -> Queue queueType msg -> Bool #

Show queueType => Show (Queue queueType msg) Source # 

Methods

showsPrec :: Int -> Queue queueType msg -> ShowS #

show :: Queue queueType msg -> String #

showList :: [Queue queueType msg] -> ShowS #

Connecting

connect :: ConnectionOpts -> IO Connection Source #

Connect to the AMQP server.

conn <- connect (fromURI "amqp://guest:guest@localhost:5672")

fromURI :: String -> ConnectionOpts #

Parses amqp standard URI of the form amqp://user:passwordhost:port/vhost and returns a ConnectionOpts for use with openConnection'' | Any of these fields may be empty and will be replaced with defaults from amqp:/guest:guest@localhost:5672@

Initializing exchanges and queues

initQueue :: QueueKey key => Connection -> Queue key msg -> IO () Source #

Register a queue and its exchange with the AMQP server. Call this before publishing or reading

let queue = Worker.queue exchange "my-queue" :: Queue Direct Text
conn <- Worker.connect (fromURI "amqp://guest:guest@localhost:5672")
Worker.initQueue conn queue

Publishing Messages

publish :: (ToJSON msg, MonadBaseControl IO m) => Connection -> Queue Direct msg -> msg -> m () Source #

publish a message to a queue. Enforces that the message type and queue name are correct at the type level

let queue = Worker.queue exchange "users" :: Queue Direct User
publish conn queue (User "username")

publishToExchange :: (ToJSON a, MonadBaseControl IO m) => Connection -> ExchangeName -> RoutingKey -> a -> m () Source #

publish a message to a routing key, without making sure a queue exists to handle it or if it is the right type of message body

publishToExchange conn "users.admin.created" (User "username")

Reading Messages

consume :: (FromJSON msg, MonadBaseControl IO m) => Connection -> Queue key msg -> m (Maybe (ConsumeResult msg)) Source #

Check for a message once and attempt to parse it

res <- consume conn queue
case res of
  Just (Parsed m) -> print m
  Just (Error e) -> putStrLn "could not parse message"
  Notihng -> putStrLn "No messages on the queue"

consumeNext :: (FromJSON msg, MonadBaseControl IO m) => Microseconds -> Connection -> Queue key msg -> m (ConsumeResult msg) Source #

Block while checking for messages every N microseconds. Return once you find one.

res <- consumeNext conn queue
case res of
  (Parsed m) -> print m
  (Error e) -> putStrLn "could not parse message"

data Message a Source #

a parsed message from the queue

Constructors

Message 

Fields

Instances

Eq a => Eq (Message a) Source # 

Methods

(==) :: Message a -> Message a -> Bool #

(/=) :: Message a -> Message a -> Bool #

Show a => Show (Message a) Source # 

Methods

showsPrec :: Int -> Message a -> ShowS #

show :: Message a -> String #

showList :: [Message a] -> ShowS #

Worker

worker :: (FromJSON a, MonadBaseControl IO m, MonadCatch m) => WorkerOptions -> Connection -> Queue key a -> (WorkerException SomeException -> m ()) -> (Message a -> m ()) -> m () Source #

Create a worker which loops, checks for messages, and handles errors

startWorker conn queue = do
  Worker.worker def conn queue onError onMessage

  where
    onMessage :: Message User
    onMessage m = do
      putStrLn "handle user message"
      print (value m)

    onError :: WorkerException SomeException -> IO ()
    onError e = do
      putStrLn "Do something with errors"

data WorkerOptions Source #

Options for worker

Constructors

WorkerOptions 

Fields

def :: Default a => a #

The default value for this type.

Routing Keys

newtype BindingKey Source #

A dynamic binding address for topic queues

commentsKey :: BindingKey
commentsKey = "posts.*.comments"

Constructors

BindingKey [BindingName] 

class QueueKey key where Source #

Minimal complete definition

showKey

Methods

showKey :: key -> Text Source #