{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE TypeFamilies #-}

module Network.AMQP.Worker.Message where

import Control.Monad.IO.Class (MonadIO, liftIO)
import Data.Aeson (FromJSON, ToJSON)
import qualified Data.Aeson as Aeson
import Data.ByteString.Lazy (ByteString)
import Network.AMQP (Ack (..), DeliveryMode (..), newMsg)
import qualified Network.AMQP as AMQP
import Network.AMQP.Worker.Connection (Connection, exchange, withChannel)
import Network.AMQP.Worker.Key (Key, RequireRouting, Routing, keyText)
import Network.AMQP.Worker.Poll (poll)
import Network.AMQP.Worker.Queue (Queue (..))

-- types --------------------------

-- | a parsed message from the queue
data Message a = Message
    { forall a. Message a -> ByteString
body :: ByteString
    , forall a. Message a -> a
value :: a
    }
    deriving (Int -> Message a -> ShowS
forall a. Show a => Int -> Message a -> ShowS
forall a. Show a => [Message a] -> ShowS
forall a. Show a => Message a -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [Message a] -> ShowS
$cshowList :: forall a. Show a => [Message a] -> ShowS
show :: Message a -> String
$cshow :: forall a. Show a => Message a -> String
showsPrec :: Int -> Message a -> ShowS
$cshowsPrec :: forall a. Show a => Int -> Message a -> ShowS
Show, Message a -> Message a -> Bool
forall a. Eq a => Message a -> Message a -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: Message a -> Message a -> Bool
$c/= :: forall a. Eq a => Message a -> Message a -> Bool
== :: Message a -> Message a -> Bool
$c== :: forall a. Eq a => Message a -> Message a -> Bool
Eq)

data ConsumeResult a
    = Parsed (Message a)
    | Error ParseError

data ParseError = ParseError String ByteString

jsonMessage :: ToJSON a => a -> AMQP.Message
jsonMessage :: forall a. ToJSON a => a -> Message
jsonMessage a
a =
    Message
newMsg
        { msgBody :: ByteString
AMQP.msgBody = forall a. ToJSON a => a -> ByteString
Aeson.encode a
a
        , msgContentType :: Maybe Text
AMQP.msgContentType = forall a. a -> Maybe a
Just Text
"application/json"
        , msgContentEncoding :: Maybe Text
AMQP.msgContentEncoding = forall a. a -> Maybe a
Just Text
"UTF-8"
        , msgDeliveryMode :: Maybe DeliveryMode
AMQP.msgDeliveryMode = forall a. a -> Maybe a
Just DeliveryMode
Persistent
        }

-- | publish a message to a routing key, without making sure a queue exists to handle it or if it is the right type of message body
--
-- > publishToExchange conn key (User "username")
publishToExchange :: (RequireRouting a, ToJSON msg, MonadIO m) => Connection -> Key a msg -> msg -> m ()
publishToExchange :: forall a msg (m :: * -> *).
(RequireRouting a, ToJSON msg, MonadIO m) =>
Connection -> Key a msg -> msg -> m ()
publishToExchange Connection
conn Key a msg
rk msg
msg =
    forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall b. Connection -> (Channel -> IO b) -> IO b
withChannel Connection
conn forall a b. (a -> b) -> a -> b
$ \Channel
chan -> do
        Maybe Int
_ <- Channel -> Text -> Text -> Message -> IO (Maybe Int)
AMQP.publishMsg Channel
chan (Connection -> Text
exchange Connection
conn) (forall a msg. Key a msg -> Text
keyText Key a msg
rk) (forall a. ToJSON a => a -> Message
jsonMessage msg
msg)
        forall (m :: * -> *) a. Monad m => a -> m a
return ()

-- | send a message to a queue. Enforces that the message type and queue name are correct at the type level
--
-- > let key = key "users" :: Key Routing User
-- > publish conn key (User "username")
--
-- Publishing to a Binding Key results in an error
--
-- > -- Compiler error! This doesn't make sense
-- > let key = key "users" & many :: Key Binding User
-- > publish conn key (User "username")
publish :: (RequireRouting a, ToJSON msg, MonadIO m) => Connection -> Key a msg -> msg -> m ()
publish :: forall a msg (m :: * -> *).
(RequireRouting a, ToJSON msg, MonadIO m) =>
Connection -> Key a msg -> msg -> m ()
publish = forall a msg (m :: * -> *).
(RequireRouting a, ToJSON msg, MonadIO m) =>
Connection -> Key a msg -> msg -> m ()
publishToExchange

-- | Check for a message once and attempt to parse it
--
-- > res <- consume conn queue
-- > case res of
-- >   Just (Parsed m) -> print m
-- >   Just (Error e) -> putStrLn "could not parse message"
-- >   Nothing -> putStrLn "No messages on the queue"
consume :: (FromJSON msg, MonadIO m) => Connection -> Queue msg -> m (Maybe (ConsumeResult msg))
consume :: forall msg (m :: * -> *).
(FromJSON msg, MonadIO m) =>
Connection -> Queue msg -> m (Maybe (ConsumeResult msg))
consume Connection
conn (Queue Key Binding msg
_ Text
name) = do
    Maybe (Message, Envelope)
mme <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall b. Connection -> (Channel -> IO b) -> IO b
withChannel Connection
conn forall a b. (a -> b) -> a -> b
$ \Channel
chan -> do
        Maybe (Message, Envelope)
m <- Channel -> Ack -> Text -> IO (Maybe (Message, Envelope))
AMQP.getMsg Channel
chan Ack
Ack Text
name
        forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe (Message, Envelope)
m

    case Maybe (Message, Envelope)
mme of
        Maybe (Message, Envelope)
Nothing ->
            forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Maybe a
Nothing
        Just (Message
msg, Envelope
env) -> do
            forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ Envelope -> IO ()
AMQP.ackEnv Envelope
env
            let bd :: ByteString
bd = Message -> ByteString
AMQP.msgBody Message
msg
            case forall a. FromJSON a => ByteString -> Either String a
Aeson.eitherDecode ByteString
bd of
                Left String
err ->
                    forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a. a -> Maybe a
Just forall a b. (a -> b) -> a -> b
$ forall a. ParseError -> ConsumeResult a
Error (String -> ByteString -> ParseError
ParseError String
err ByteString
bd)
                Right msg
v ->
                    forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a. a -> Maybe a
Just forall a b. (a -> b) -> a -> b
$ forall a. Message a -> ConsumeResult a
Parsed (forall a. ByteString -> a -> Message a
Message ByteString
bd msg
v)

-- | Block while checking for messages every N microseconds. Return once you find one.
--
-- > res <- consumeNext conn queue
-- > case res of
-- >   (Parsed m) -> print m
-- >   (Error e) -> putStrLn "could not parse message"
consumeNext :: (FromJSON msg, MonadIO m) => Microseconds -> Connection -> Queue msg -> m (ConsumeResult msg)
consumeNext :: forall msg (m :: * -> *).
(FromJSON msg, MonadIO m) =>
Int -> Connection -> Queue msg -> m (ConsumeResult msg)
consumeNext Int
pd Connection
conn Queue msg
key =
    forall (m :: * -> *) a. MonadIO m => Int -> m (Maybe a) -> m a
poll Int
pd forall a b. (a -> b) -> a -> b
$ forall msg (m :: * -> *).
(FromJSON msg, MonadIO m) =>
Connection -> Queue msg -> m (Maybe (ConsumeResult msg))
consume Connection
conn Queue msg
key

type Microseconds = Int