{-# LANGUAGE RankNTypes #-} module Data.PowerQueue.Backend.SQS ( SqsConfig(..) , newSQSBackend ) where import Data.PowerQueue import Data.Time.TimeSpan import Network.AWS.Simple import qualified Control.Monad.Fail as Fail import qualified Data.Text as T data SqsConfig j = SqsConfig { sc_aws :: !AWSHandle , sc_queueName :: !T.Text , sc_writeJob :: j -> T.Text , sc_readJob :: forall m. Fail.MonadFail m => T.Text -> m j } newSQSBackend :: SqsConfig j -> IO (QueueBackend j) newSQSBackend cfg = do qu <- sqsGetQueue (sc_aws cfg) (sc_queueName cfg) pure $ QueueBackend { qb_lift = id , qb_enqueue = enqueue cfg qu , qb_dequeue = dequeue cfg qu , qb_confirm = confirm cfg qu , qb_rollback = rollback cfg qu , qb_reportProgress = repProgress cfg qu , qb_progressReportInterval = seconds 40 } enqueue :: SqsConfig j -> AWSQueue -> j -> IO Bool enqueue cfg q job = do sqsSendMessage (sc_aws cfg) q $ sc_writeJob cfg job pure True dequeue :: SqsConfig j -> AWSQueue -> IO (MessageHandle, j) dequeue cfg q = do let fetchLoop = do msgs <- sqsGetMessage (sc_aws cfg) q $ GetMessageCfg { gmc_ackTimeout = minutes 1 , gmc_messages = 1 , gmc_waitTime = seconds 10 } case msgs of [] -> fetchLoop (x : _) -> pure x msg <- fetchLoop payload <- sc_readJob cfg (sm_payload msg) pure (sm_handle msg, payload) confirm :: SqsConfig j -> AWSQueue -> MessageHandle -> IO () confirm cfg q handle = sqsAckMessage (sc_aws cfg) q handle repProgress :: SqsConfig j -> AWSQueue -> MessageHandle -> IO () repProgress cfg q handle = sqsChangeMessageTimeout (sc_aws cfg) q handle (minutes 1) rollback :: SqsConfig j -> AWSQueue -> MessageHandle -> IO () rollback _ _ _ = pure ()