amqp-worker: Type-safe AMQP workers

[ bsd3, library, network, program ] [ Propose Tags ]
Versions [RSS] 0.2.0, 0.2.1, 0.2.2, 0.2.3, 0.2.4, 0.2.5, 0.3.2, 0.4.0, 1.0.0, 2.0.0
Change log CHANGELOG.md
Dependencies aeson (>=2.0 && <2.3), amqp (>=0.20 && <1), amqp-worker, base (>=4.9 && <5), bytestring (>=0.11 && <0.12), exceptions (>=0.10 && <0.11), monad-loops (>=0.4 && <0.5), mtl (>=2.2 && <2.3), resource-pool (>=0.3 && <0.5), text (>=1.2 && <1.3) [details]
License BSD-3-Clause
Copyright Orbital Labs
Author Sean Hess
Maintainer seanhess@gmail.com
Category Network
Home page https://github.com/seanhess/amqp-worker#readme
Bug tracker https://github.com/seanhess/amqp-worker/issues
Source repo head: git clone https://github.com/seanhess/amqp-worker
Uploaded by seanhess at 2023-09-05T17:09:46Z
Distributions
Executables example
Downloads 4745 total (18 in the last 30 days)
Rating (no votes yet) [estimated by Bayesian average]
Your Rating
  • λ
  • λ
  • λ
Status Docs available [build log]
Last success reported on 2023-09-05 [all 1 reports]

Readme for amqp-worker-2.0.0

[back to package description]

AMQP Worker

Type-safe and simplified message queues with AMQP. Compatible with RabbitMQ.

View on Hackage

{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE OverloadedStrings #-}

module Main where

import Control.Concurrent (forkIO)
import Data.Aeson (FromJSON, ToJSON)
import Data.Function ((&))
import Data.Text (Text)
import GHC.Generics (Generic)
import Network.AMQP.Worker
import qualified Network.AMQP.Worker as Worker

newtype Greeting = Greeting
    {message :: Text}
    deriving (Generic, Show, Eq)

instance FromJSON Greeting
instance ToJSON Greeting

newGreetings :: Key Route Greeting
newGreetings = key "greetings" & word "new"

anyGreetings :: Key Bind Greeting
anyGreetings = key "greetings" & any1

example :: IO ()
example = do
    conn <- Worker.connect (fromURI "amqp://guest:guest@localhost:5672")
    simple conn

publishing :: Connection -> IO ()
publishing conn = do
    Worker.publish conn newGreetings $ Greeting "Hello"

-- | Create a queue to process messages
simple :: Connection -> IO ()
simple conn = do
    -- create a queue to receive them
    q <- Worker.queue conn "main" newGreetings

    -- publish a message (delivered to queue)
    Worker.publish conn newGreetings $ Greeting "Hello"

    -- wait until we receive the message
    m <- Worker.takeMessage conn q
    print (value m)

-- | Multiple queues with distinct names will each get copies of published messages
multiple :: Connection -> IO ()
multiple conn = do
    -- create two separate queues
    one <- Worker.queue conn "one" newGreetings
    two <- Worker.queue conn "two" newGreetings

    -- publish a message (delivered to both)
    Worker.publish conn newGreetings $ Greeting "Hello"

    -- Each of these queues will receive the same message
    m1 <- Worker.takeMessage conn one
    m2 <- Worker.takeMessage conn two

    print $ value m1
    print $ value m2

-- | Create workers to continually process messages
workers :: Connection -> IO ()
workers conn = do
    -- create a single queue
    q <- Worker.queue conn "main" newGreetings

    -- publish some messages
    Worker.publish conn newGreetings $ Greeting "Hello1"
    Worker.publish conn newGreetings $ Greeting "Hello2"
    Worker.publish conn newGreetings $ Greeting "Hello3"

    -- Create a worker to process any messages on the queue
    _ <- forkIO $ Worker.worker conn q $ \m -> do
        putStrLn "one"
        print (value m)

    -- Listening to the same queue with N workers will load balance them
    _ <- forkIO $ Worker.worker conn q $ \m -> do
        putStrLn "two"
        print (value m)

    putStrLn "Press any key to exit"
    _ <- getLine
    return ()

-- | You can bind to messages dynamically with wildcards in Binding Keys
dynamic :: Connection -> IO ()
dynamic conn = do
    -- anyGreetings matches `greetings.*`
    q <- Worker.queue conn "main" anyGreetings

    -- You can only publish to a specific Routing Key, like `greetings.new`
    Worker.publish conn newGreetings $ Greeting "Hello"

    -- We cannot publish to anyGreetings because it is a Binding Key (with wildcards in it)
    -- Worker.publish conn anyGreetings $ Greeting "Compiler Error"

    m <- Worker.takeMessage conn q
    print $ value m