{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE FlexibleContexts #-}
module Network.AMQP.Worker.Message where
import Control.Monad.IO.Class (MonadIO, liftIO)
import Control.Monad.Base (liftBase)
import Data.Aeson (ToJSON, FromJSON)
import qualified Data.Aeson as Aeson
import Data.ByteString.Lazy (ByteString)
import Network.AMQP (newMsg, DeliveryMode(..), Ack(..))
import qualified Network.AMQP as AMQP
import Network.AMQP.Worker.Key (Key, Routing, keyText)
import Network.AMQP.Worker.Poll (poll)
import Network.AMQP.Worker.Connection (Connection, withChannel, exchange)
import Network.AMQP.Worker.Queue (Queue(..))
data Message a = Message
{ body :: ByteString
, value :: a
} deriving (Show, Eq)
data ConsumeResult a
= Parsed (Message a)
| Error ParseError
data ParseError = ParseError String ByteString
type Microseconds = Int
jsonMessage :: ToJSON a => a -> AMQP.Message
jsonMessage a = newMsg
{ AMQP.msgBody = Aeson.encode a
, AMQP.msgContentType = Just "application/json"
, AMQP.msgContentEncoding = Just "UTF-8"
, AMQP.msgDeliveryMode = Just Persistent
}
publishToExchange :: (ToJSON a, MonadIO m) => Connection -> Key Routing a -> a -> m ()
publishToExchange conn rk msg =
liftIO $ withChannel conn $ \chan -> do
_ <- AMQP.publishMsg chan (exchange conn) (keyText rk) (jsonMessage msg)
return ()
publish :: (ToJSON a, MonadIO m) => Connection -> Key Routing a -> a -> m ()
publish = publishToExchange
consume :: (FromJSON msg, MonadIO m) => Connection -> Queue msg -> m (Maybe (ConsumeResult msg))
consume conn (Queue _ name) = do
mme <- liftIO $ withChannel conn $ \chan -> do
m <- liftBase $ AMQP.getMsg chan Ack name
pure m
case mme of
Nothing ->
return Nothing
Just (msg, env) -> do
liftIO $ AMQP.ackEnv env
let bd = AMQP.msgBody msg
case Aeson.eitherDecode bd of
Left err ->
return $ Just $ Error (ParseError err bd)
Right v ->
return $ Just $ Parsed (Message bd v)
consumeNext :: (FromJSON msg, MonadIO m) => Microseconds -> Connection -> Queue msg -> m (ConsumeResult msg)
consumeNext pd conn key =
poll pd $ consume conn key