module Antiope.SQS
( QueueUrl(..)
, SQSError(..)
, HasReceiptHandle
, ConsumerMode(..)
, ConsumerResult(..)
, readQueue
, readQueue'
, drainQueue
, drainQueue'
, ackMessage
, ackMessages
, queueSource
, forAllMessages
, forAllMessages'
, defaultReceiveMessage
, Message
, mBody, mMD5OfBody, mMessageId, mReceiptHandle, mAttributes
) where
import Control.Lens
import Control.Monad (forM, forM_, join, void)
import Control.Monad.IO.Unlift (MonadUnliftIO)
import Control.Monad.Loops (unfoldWhileM)
import Control.Monad.Trans (lift)
import Data.Coerce (coerce)
import Data.Conduit
import Data.Conduit.Combinators (yieldMany)
import Data.List.Split (chunksOf)
import Data.Maybe (catMaybes)
import Data.Text (pack)
import Network.AWS (HasEnv, MonadAWS, runAWS, runResourceT)
import Network.AWS.SQS
import Antiope.SQS.Types
import qualified Network.AWS as AWS
defaultReceiveMessage :: QueueUrl -> ReceiveMessage
defaultReceiveMessage (QueueUrl url)
= receiveMessage url
& rmWaitTimeSeconds ?~ 10
& rmMaxNumberOfMessages ?~ 10
readQueue :: MonadAWS m
=> QueueUrl
-> m [Message]
readQueue = readQueue' . defaultReceiveMessage
readQueue' :: MonadAWS m
=> ReceiveMessage
-> m [Message]
readQueue' recMsg = do
resp <- AWS.send recMsg
return $ resp ^. rmrsMessages
drainQueue :: MonadAWS m
=> QueueUrl
-> m [Message]
drainQueue = drainQueue' . defaultReceiveMessage
drainQueue' :: MonadAWS m
=> ReceiveMessage
-> m [Message]
drainQueue' recMsg = join <$> unfoldWhileM (not . null) (readQueue' recMsg)
ackMessage :: (MonadAWS m, HasReceiptHandle msg)
=> QueueUrl
-> msg
-> m (Either SQSError ())
ackMessage q msg = ackMessages q [msg]
ackMessages :: (MonadAWS m, HasReceiptHandle msg)
=> QueueUrl
-> [msg]
-> m (Either SQSError ())
ackMessages (QueueUrl queueUrl) msgs = do
let receipts' = msgs ^.. each . to getReceiptHandle & catMaybes
let maxBatchSize = 10
results <- forM (chunksOf maxBatchSize receipts') $ \receipts -> do
let dmbres = (\(r, i) -> deleteMessageBatchRequestEntry (pack (show i)) r) <$> zip (coerce receipts) ([0..] :: [Int])
resp <- AWS.send $ deleteMessageBatch queueUrl & dmbEntries .~ dmbres
if resp ^. dmbrsResponseStatus == 200
then case resp ^. dmbrsFailed of
[] -> return $ Right ()
_ -> return $ Left DeleteMessageBatchError
else return $ Left DeleteMessageBatchError
pure $ sequence_ results
queueSource :: MonadAWS m => QueueUrl -> ConduitT () Message m ()
queueSource (QueueUrl queueUrl) = do
m <- lift $ AWS.send $ receiveMessage queueUrl & rmWaitTimeSeconds ?~ 10 & rmMaxNumberOfMessages ?~ 10
yieldMany (m ^. rmrsMessages)
queueSource (QueueUrl queueUrl)
forAllMessages :: (MonadUnliftIO m, HasEnv env)
=> env
-> QueueUrl
-> ConsumerMode
-> (Message -> m ConsumerResult)
-> m ()
forAllMessages env queue = forAllMessages' env (defaultReceiveMessage queue)
forAllMessages' :: (MonadUnliftIO m, HasEnv env)
=> env
-> ReceiveMessage
-> ConsumerMode
-> (Message -> m ConsumerResult)
-> m ()
forAllMessages' env recMsg mode process = go
where
go = do
msgs <- runResourceT $ runAWS env (readQueue' recMsg)
case (mode, msgs) of
(Drain, []) -> pure ()
_ -> processBatch msgs >> go
processBatch msgs =
forM_ msgs $ \msg -> do
res <- process msg
case res of
Ack -> void . runResourceT . runAWS env $ ackMessage (QueueUrl $ recMsg ^. rmQueueURL) msg
Nack -> pure ()