amqp-worker-1.0.0: Type-safe AMQP workers
Copyright(c) 2023 Sean Hess
LicenseBSD3
MaintainerSean Hess <seanhess@gmail.com>
Stabilityexperimental
Portabilityportable
Safe HaskellSafe-Inferred
LanguageHaskell2010

Network.AMQP.Worker

Description

Type safe and simplified message queues with AMQP

Synopsis

How to use this library

Define keys to identify how messages will be published and what the message type is

import Network.AMQP.Worker as Worker

data Greeting = Greeting
  { message :: Text }
  deriving (Generic, Show, Eq)

instance FromJSON Greeting
instance ToJSON Greeting

newGreetings :: Key Routing Greeting
newGreetings = key "messages" & word "greetings" & word "new"

Connect to AMQP and publish a message

  conn <- Worker.connect (fromURI "amqp://guest:guest@localhost:5672")

  Worker.publish conn newMessages $ TestMessage "hello"

To receive messages, first define a queue. You can bind direclty to the Routing Key to ensure it is delivered once

  q <- Worker.queue conn def newMessages :: IO (Queue Greeting)

  -- Loop and print any values received
  Worker.worker conn def q onError (print . value)

You can also define dynamic Routing Keys to receive many kinds of messages

  let newMessages = key "messages" & any1 & word "new"
  q <- Worker.queue conn def newMessages :: IO (Queue Greeting)

Binding and Routing Keys

newtype Key a msg Source #

Messages are published with a specific identifier called a Routing key. Queues can use Binding Keys to control which messages are delivered to them.

Routing keys have no dynamic component and can be used to publish messages

commentsKey :: Key Routing Comment
commentsKey = key "posts" & word "new"

Binding keys can contain wildcards, only used for matching messages

commentsKey :: Key Binding Comment
commentsKey = key "posts" & any1 & word "comments" & many

Constructors

Key [Binding] 

Instances

Instances details
Monoid (Key a msg) Source # 
Instance details

Defined in Network.AMQP.Worker.Key

Methods

mempty :: Key a msg #

mappend :: Key a msg -> Key a msg -> Key a msg #

mconcat :: [Key a msg] -> Key a msg #

Semigroup (Key a msg) Source # 
Instance details

Defined in Network.AMQP.Worker.Key

Methods

(<>) :: Key a msg -> Key a msg -> Key a msg #

sconcat :: NonEmpty (Key a msg) -> Key a msg #

stimes :: Integral b => b -> Key a msg -> Key a msg #

Show (Key a msg) Source # 
Instance details

Defined in Network.AMQP.Worker.Key

Methods

showsPrec :: Int -> Key a msg -> ShowS #

show :: Key a msg -> String #

showList :: [Key a msg] -> ShowS #

Eq (Key a msg) Source # 
Instance details

Defined in Network.AMQP.Worker.Key

Methods

(==) :: Key a msg -> Key a msg -> Bool #

(/=) :: Key a msg -> Key a msg -> Bool #

data Binding Source #

Instances

Instances details
Show Binding Source # 
Instance details

Defined in Network.AMQP.Worker.Key

Eq Binding Source # 
Instance details

Defined in Network.AMQP.Worker.Key

Methods

(==) :: Binding -> Binding -> Bool #

(/=) :: Binding -> Binding -> Bool #

word :: Text -> Key a msg -> Key a msg Source #

A specific word. Can be used to chain Routing keys or Binding keys

key :: Text -> Key Routing msg Source #

Start a new routing key (can also be used for bindings)

any1 :: Key a msg -> Key Binding msg Source #

Match any one word. Equivalent to *. Converts to a Binding key and can no longer be used to publish messaages

many :: Key a msg -> Key Binding msg Source #

Match zero or more words. Equivalient to #. Converts to a Binding key and can no longer be used to publish messages

Connecting

connect :: MonadIO m => ConnectionOpts -> m Connection Source #

Connect to the AMQP server.

conn <- connect (fromURI "amqp://guest:guest@localhost:5672")

fromURI :: String -> ConnectionOpts #

Parses amqp standard URI of the form amqp://user:passwordhost:port/vhost and returns a ConnectionOpts for use with openConnection'' | Any of these fields may be empty and will be replaced with defaults from amqp:/guest:guest@localhost:5672@

Sending Messages

publish :: (RequireRouting a, ToJSON msg, MonadIO m) => Connection -> Key a msg -> msg -> m () Source #

send a message to a queue. Enforces that the message type and queue name are correct at the type level

let key = key "users" :: Key Routing User
publish conn key (User "username")

Publishing to a Binding Key results in an error

-- Compiler error! This doesn't make sense
let key = key "users" & many :: Key Binding User
publish conn key (User "username")

Initializing queues

queue :: MonadIO m => Connection -> QueuePrefix -> Key a msg -> m (Queue msg) Source #

Create a queue to receive messages matching the Key with a name prefixed via queueName.

q <- Worker.queue conn "main" $ key "messages" & any1
Worker.worker conn def q onError onMessage

queueNamed :: MonadIO m => Connection -> QueueName -> Key a msg -> m (Queue msg) Source #

Create a queue to receive messages matching the binding key. Each queue with a unique name will be delivered a separate copy of the messsage. Workers operating on the same queue, or on queues with the same name will load balance

data Queue msg Source #

A queue is an inbox for messages to be delivered

Constructors

Queue (Key Binding msg) QueueName 

Instances

Instances details
Show (Queue msg) Source # 
Instance details

Defined in Network.AMQP.Worker.Queue

Methods

showsPrec :: Int -> Queue msg -> ShowS #

show :: Queue msg -> String #

showList :: [Queue msg] -> ShowS #

Eq (Queue msg) Source # 
Instance details

Defined in Network.AMQP.Worker.Queue

Methods

(==) :: Queue msg -> Queue msg -> Bool #

(/=) :: Queue msg -> Queue msg -> Bool #

queueName :: QueuePrefix -> Key a msg -> QueueName Source #

Name a queue with a prefix and the binding key name. Useful for seeing at a glance which queues are receiving which messages

-- "main messages.new"
queueName "main" (key "messages" & word "new")

newtype QueuePrefix Source #

Constructors

QueuePrefix Text 

Instances

Instances details
IsString QueuePrefix Source # 
Instance details

Defined in Network.AMQP.Worker.Queue

Show QueuePrefix Source # 
Instance details

Defined in Network.AMQP.Worker.Queue

Default QueuePrefix Source # 
Instance details

Defined in Network.AMQP.Worker.Queue

Methods

def :: QueuePrefix #

Eq QueuePrefix Source # 
Instance details

Defined in Network.AMQP.Worker.Queue

Messages

data Message a Source #

a parsed message from the queue

Constructors

Message 

Fields

Instances

Instances details
Show a => Show (Message a) Source # 
Instance details

Defined in Network.AMQP.Worker.Message

Methods

showsPrec :: Int -> Message a -> ShowS #

show :: Message a -> String #

showList :: [Message a] -> ShowS #

Eq a => Eq (Message a) Source # 
Instance details

Defined in Network.AMQP.Worker.Message

Methods

(==) :: Message a -> Message a -> Bool #

(/=) :: Message a -> Message a -> Bool #

Worker

worker :: (FromJSON a, MonadIO m, MonadCatch m) => Connection -> WorkerOptions -> Queue a -> (WorkerException SomeException -> m ()) -> (Message a -> m ()) -> m () Source #

Create a worker which loops, checks for messages, and handles errors

startWorker conn queue = do
  Worker.worker conn def queue onError onMessage

  where
    onMessage :: Message User
    onMessage m = do
      putStrLn "handle user message"
      print (value m)

    onError :: WorkerException SomeException -> IO ()
    onError e = do
      putStrLn "Do something with errors"

data WorkerOptions Source #

Options for worker

Constructors

WorkerOptions 

Fields

def :: Default a => a #

The default value for this type.