Copyright | (c) 2023 Sean Hess |
---|---|
License | BSD3 |
Maintainer | Sean Hess <seanhess@gmail.com> |
Stability | experimental |
Portability | portable |
Safe Haskell | Safe-Inferred |
Language | Haskell2010 |
Type safe and simplified message queues with AMQP. Compatible with RabbitMQ
Synopsis
- connect :: MonadIO m => ConnectionOpts -> m Connection
- fromURI :: String -> ConnectionOpts
- data Connection
- newtype Key a msg = Key [Bind]
- data Bind
- data Route
- key :: Text -> Key Route msg
- word :: Text -> Key a msg -> Key a msg
- any1 :: Key a msg -> Key Bind msg
- many :: Key a msg -> Key Bind msg
- publish :: (RequireRoute a, ToJSON msg, MonadIO m) => Connection -> Key a msg -> msg -> m ()
- queue :: MonadIO m => Connection -> QueuePrefix -> Key a msg -> m (Queue msg)
- queueNamed :: MonadIO m => Connection -> QueueName -> Key a msg -> m (Queue msg)
- data Queue msg = Queue (Key Bind msg) QueueName
- queueName :: QueuePrefix -> Key a msg -> QueueName
- type QueueName = Text
- type QueuePrefix = Text
- takeMessage :: (MonadIO m, FromJSON a) => Connection -> Queue a -> m (Message a)
- data ParseError = ParseError String ByteString
- data Message a = Message {
- body :: ByteString
- value :: a
- worker :: (FromJSON a, MonadIO m) => Connection -> Queue a -> (Message a -> m ()) -> m ()
How to use this library
Define keys to identify how messages will be published and what the message type is
import Network.AMQP.Worker as Worker data Greeting = Greeting { message :: Text } deriving (Generic, Show, Eq) instance FromJSON Greeting instance ToJSON Greeting newGreetings :: Key Routing Greeting newGreetings = key "greetings" & word "new"
Connect to AMQP and publish a message
conn <- Worker.connect (fromURI "amqp://guest:guest@localhost:5672") Worker.publish conn newGreetings $ Greeting "hello"
Create a queue to receive messages. You can bind direclty to the Routing Key to ensure it is delivered once
q <- Worker.queue conn "new" newMessages :: IO (Queue Greeting) m <- Worker.takeMessage conn q print (value m)
Define dynamic Routing Keys to receive many kinds of messages
let anyMessages = key "messages" & any1 q <- Worker.queue conn "main" anyMessages m <- Worker.takeMessage conn q print (value m)
Create a worker to conintually process messages
forkIO $ Worker.worker conn q $ \m -> do print (value m)
Connecting
connect :: MonadIO m => ConnectionOpts -> m Connection Source #
Connect to the AMQP server using simple defaults
conn <- connect (fromURI "amqp://guest:guest@localhost:5672")
fromURI :: String -> ConnectionOpts #
Parses amqp standard URI of the form amqp://user:password
host:port/vhost and returns a
ConnectionOpts for use with
openConnection''
| Any of these fields may be empty and will be replaced with defaults from
amqp:/guest:guest@localhost:5672@
data Connection Source #
Binding and Routing Keys
Messages are published with a specific identifier called a Routing key. Queues can use Binding Keys to control which messages are delivered to them.
Routing keys have no dynamic component and can be used to publish messages
commentsKey :: Key Route Comment commentsKey = key "posts" & word "new"
Binding keys can contain wildcards, only used for matching messages
commentsKey :: Key Bind Comment commentsKey = key "posts" & any1 & word "comments" & many
key :: Text -> Key Route msg Source #
Start a new routing key (can also be used for bindings)
key "messages" -- matches "messages"
word :: Text -> Key a msg -> Key a msg Source #
A specific word. Can be used to chain Routing keys or Binding keys
key "messages" & word "new" -- matches "messages.new"
any1 :: Key a msg -> Key Bind msg Source #
Match any one word. Equivalent to *
. Converts to a Binding key and can no longer be used to publish messaages
key "messages" & any1 -- matches "messages.new" -- matches "messages.update"
many :: Key a msg -> Key Bind msg Source #
Match zero or more words. Equivalient to #
. Converts to a Binding key and can no longer be used to publish messages
key "messages" & many -- matches "messages" -- matches "messages.new" -- matches "messages.1234.update"
Sending Messages
publish :: (RequireRoute a, ToJSON msg, MonadIO m) => Connection -> Key a msg -> msg -> m () Source #
send a message to a queue. Enforces that the message type and queue name are correct at the type level
let newUsers = key "users" & word "new" :: Key Route User publish conn newUsers (User "username")
Publishing to a Binding Key results in an error
-- Compiler error! This doesn't make sense let users = key "users" & many :: Key Binding User publish conn users (User "username")
Initializing queues
queue :: MonadIO m => Connection -> QueuePrefix -> Key a msg -> m (Queue msg) Source #
queueNamed :: MonadIO m => Connection -> QueueName -> Key a msg -> m (Queue msg) Source #
Create a queue to receive messages matching the binding key. Each queue with a unique name will be delivered a separate copy of the messsage. Workers will load balance if operating on the same queue, or on queues with the same name
A queue is an inbox for messages to be delivered
queueName :: QueuePrefix -> Key a msg -> QueueName Source #
Name a queue with a prefix and the binding key name. Useful for seeing at a glance which queues are receiving which messages
-- "main messages.new" queueName "main" (key "messages" & word "new")
type QueuePrefix = Text Source #
Messages
takeMessage :: (MonadIO m, FromJSON a) => Connection -> Queue a -> m (Message a) Source #
Wait until a message is read from the queue. Throws an exception if the incoming message doesn't match the key type
m <- Worker.takeMessage conn queue print (value m)
data ParseError Source #
Instances
Exception ParseError Source # | |
Defined in Network.AMQP.Worker.Message toException :: ParseError -> SomeException # fromException :: SomeException -> Maybe ParseError # displayException :: ParseError -> String # | |
Show ParseError Source # | |
Defined in Network.AMQP.Worker.Message showsPrec :: Int -> ParseError -> ShowS # show :: ParseError -> String # showList :: [ParseError] -> ShowS # |
a parsed message from the queue
Message | |
|
Worker
worker :: (FromJSON a, MonadIO m) => Connection -> Queue a -> (Message a -> m ()) -> m () Source #
Create a worker which loops and handles messages. Throws an exception if the incoming message doesn't match the key type. It is recommended that you catch errors in your handler and allow message parsing errors to crash your program.
Worker.worker conn queue $ \m -> do print (value m)