module Network.AMQP.Worker.Message where
import Control.Monad.Trans.Control (MonadBaseControl)
import Control.Monad.Base (liftBase)
import Data.Aeson (ToJSON, FromJSON)
import qualified Data.Aeson as Aeson
import Data.ByteString.Lazy (ByteString)
import Network.AMQP (newMsg, DeliveryMode(..), Ack(..), QueueOpts(..))
import qualified Network.AMQP as AMQP
import Network.AMQP.Worker.Key (RoutingKey(..))
import Network.AMQP.Worker.Poll (poll)
import Network.AMQP.Worker.Connection (Connection, withChannel)
import Network.AMQP.Worker.Queue (Queue(..), Exchange(..), ExchangeName, Direct)
data Message a = Message
{ body :: ByteString
, value :: a
} deriving (Show, Eq)
data ConsumeResult a
= Parsed (Message a)
| Error ParseError
data ParseError = ParseError String ByteString
type Microseconds = Int
jsonMessage :: ToJSON a => a -> AMQP.Message
jsonMessage a = newMsg
{ AMQP.msgBody = Aeson.encode a
, AMQP.msgContentType = Just "application/json"
, AMQP.msgContentEncoding = Just "UTF-8"
, AMQP.msgDeliveryMode = Just Persistent
}
publishToExchange :: (ToJSON a, MonadBaseControl IO m) => Connection -> ExchangeName -> RoutingKey -> a -> m ()
publishToExchange conn exg (RoutingKey rk) msg =
withChannel conn $ \chan -> do
_ <- liftBase $ AMQP.publishMsg chan exg rk (jsonMessage msg)
return ()
publish :: (ToJSON msg, MonadBaseControl IO m) => Connection -> Queue Direct msg -> msg -> m ()
publish conn (Queue (Exchange exg) key _) =
publishToExchange conn (AMQP.exchangeName exg) key
consume :: (FromJSON msg, MonadBaseControl IO m) => Connection -> Queue key msg -> m (Maybe (ConsumeResult msg))
consume conn (Queue _ _ options) = do
mme <- withChannel conn $ \chan -> do
m <- liftBase $ AMQP.getMsg chan Ack (queueName options)
pure m
case mme of
Nothing ->
return Nothing
Just (msg, env) -> do
liftBase $ AMQP.ackEnv env
let bd = AMQP.msgBody msg
case Aeson.eitherDecode bd of
Left err ->
return $ Just $ Error (ParseError err bd)
Right v ->
return $ Just $ Parsed (Message bd v)
consumeNext :: (FromJSON msg, MonadBaseControl IO m) => Microseconds -> Connection -> Queue key msg -> m (ConsumeResult msg)
consumeNext pd conn queue =
poll pd $ consume conn queue