module Antiope.SQS
  ( QueueUrl(..)
  , SQSError(..)
  , HasReceiptHandle
  , ConsumerMode(..)
  , ConsumerResult(..)
  , readQueue
  , readQueue'
  , drainQueue
  , drainQueue'
  , ackMessage
  , ackMessages
  , queueSource
  , forAllMessages
  , forAllMessages'
  , defaultReceiveMessage

  -- * Re-exports
  , Message
  , mBody, mMD5OfBody, mMessageId, mReceiptHandle, mAttributes
  ) where

import Control.Lens
import Control.Monad            (forM, forM_, join, void)
import Control.Monad.IO.Unlift  (MonadUnliftIO)
import Control.Monad.Loops      (unfoldWhileM)
import Control.Monad.Trans      (lift)
import Data.Coerce              (coerce)
import Data.Conduit
import Data.Conduit.Combinators (yieldMany)
import Data.List.Split          (chunksOf)
import Data.Maybe               (catMaybes)
import Data.Text                (pack)
import Network.AWS              (HasEnv, MonadAWS, runAWS, runResourceT)
import Network.AWS.SQS

import Antiope.SQS.Types

import qualified Network.AWS as AWS

defaultReceiveMessage :: QueueUrl -> ReceiveMessage
defaultReceiveMessage (QueueUrl url)
  = receiveMessage url
  & rmWaitTimeSeconds ?~ 10
  & rmMaxNumberOfMessages ?~ 10

-- | Reads the specified SQS queue once returning a bath of messages
readQueue :: MonadAWS m
  => QueueUrl
  -> m [Message]
readQueue = readQueue' . defaultReceiveMessage

-- | Reads the specified SQS queue once returning a bath of messages
readQueue' :: MonadAWS m
  => ReceiveMessage
  -> m [Message]
readQueue' recMsg = do
  -- max wait time allowed by aws is 20sec, max number messages to recieve is 10
  resp <- AWS.send recMsg
  return $ resp ^. rmrsMessages

-- | Reads the specified SQS queue until it is empty and returns a list of messages
drainQueue :: MonadAWS m
  => QueueUrl
  -> m [Message]
drainQueue = drainQueue' . defaultReceiveMessage

-- | Reads the specified SQS queue until it is empty and returns a list of messages
drainQueue' :: MonadAWS m
  => ReceiveMessage
  -> m [Message]
drainQueue' recMsg = join <$> unfoldWhileM (not . null) (readQueue' recMsg)

-- | Acknowledges a single SQS message
ackMessage :: (MonadAWS m, HasReceiptHandle msg)
  => QueueUrl
  -> msg
  -> m (Either SQSError ())
ackMessage q msg = ackMessages q [msg]

-- | Acknowledges a group of SQS messages
ackMessages :: (MonadAWS m, HasReceiptHandle msg)
  => QueueUrl
  -> [msg]
  -> m (Either SQSError ())
ackMessages (QueueUrl queueUrl) msgs = do
  let receipts' = msgs ^.. each . to getReceiptHandle & catMaybes
  let maxBatchSize = 10 -- Amazon enforces this
  results <- forM (chunksOf maxBatchSize receipts') $ \receipts -> do
    -- each dmbr needs an ID. just use the list index.
    let dmbres = (\(r, i) -> deleteMessageBatchRequestEntry (pack (show i)) r) <$> zip (coerce receipts) ([0..] :: [Int])
    resp <- AWS.send $ deleteMessageBatch queueUrl & dmbEntries .~ dmbres
    -- only acceptable if no errors.
    if resp ^. dmbrsResponseStatus == 200
      then case resp ^. dmbrsFailed of
          [] -> return $ Right ()
          _  -> return $ Left DeleteMessageBatchError
      else return $ Left DeleteMessageBatchError
  pure $ sequence_ results


-- | Reads from an SQS indefinitely, producing messages into a conduit
queueSource :: MonadAWS m => QueueUrl -> ConduitT () Message m ()
queueSource (QueueUrl queueUrl) = do
  m <- lift $ AWS.send $ receiveMessage queueUrl & rmWaitTimeSeconds ?~ 10 & rmMaxNumberOfMessages ?~ 10
  yieldMany (m ^. rmrsMessages)
  queueSource (QueueUrl queueUrl)

forAllMessages :: (MonadUnliftIO m, HasEnv env)
  => env
  -> QueueUrl
  -> ConsumerMode
  -> (Message -> m ConsumerResult)
  -> m ()
forAllMessages env queue = forAllMessages' env (defaultReceiveMessage queue)

forAllMessages' :: (MonadUnliftIO m, HasEnv env)
  => env
  -> ReceiveMessage
  -> ConsumerMode
  -> (Message -> m ConsumerResult)
  -> m ()
forAllMessages' env recMsg mode process = go
  where
    go = do
      msgs <- runResourceT $ runAWS env (readQueue' recMsg)
      case (mode, msgs) of
        (Drain, []) -> pure ()
        _           -> processBatch msgs >> go
    processBatch msgs =
      forM_ msgs $ \msg -> do
        res <- process msg
        case res of
          Ack  -> void . runResourceT . runAWS env $ ackMessage (QueueUrl $ recMsg ^. rmQueueURL) msg
          Nack -> pure ()