module Data.PowerQueue.Backend.SQS
( SqsConfig(..)
, newSQSBackend
)
where
import Data.PowerQueue
import Data.Time.TimeSpan
import Network.AWS.Simple
import qualified Control.Monad.Fail as Fail
import qualified Data.Text as T
data SqsConfig j
= SqsConfig
{ sc_aws :: !AWSHandle
, sc_queueName :: !T.Text
, sc_writeJob :: j -> T.Text
, sc_readJob :: forall m. Fail.MonadFail m => T.Text -> m j
}
newSQSBackend :: SqsConfig j -> IO (QueueBackend j)
newSQSBackend cfg =
do qu <- sqsGetQueue (sc_aws cfg) (sc_queueName cfg)
pure $
QueueBackend
{ qb_lift = id
, qb_enqueue = enqueue cfg qu
, qb_dequeue = dequeue cfg qu
, qb_confirm = confirm cfg qu
, qb_rollback = rollback cfg qu
, qb_reportProgress = repProgress cfg qu
, qb_progressReportInterval = seconds 40
}
enqueue :: SqsConfig j -> AWSQueue -> j -> IO Bool
enqueue cfg q job =
do sqsSendMessage (sc_aws cfg) q $ sc_writeJob cfg job
pure True
dequeue :: SqsConfig j -> AWSQueue -> IO (MessageHandle, j)
dequeue cfg q =
do let fetchLoop =
do msgs <-
sqsGetMessage (sc_aws cfg) q $
GetMessageCfg
{ gmc_ackTimeout = minutes 1
, gmc_messages = 1
, gmc_waitTime = seconds 10
}
case msgs of
[] -> fetchLoop
(x : _) -> pure x
msg <- fetchLoop
payload <- sc_readJob cfg (sm_payload msg)
pure (sm_handle msg, payload)
confirm :: SqsConfig j -> AWSQueue -> MessageHandle -> IO ()
confirm cfg q handle =
sqsAckMessage (sc_aws cfg) q handle
repProgress :: SqsConfig j -> AWSQueue -> MessageHandle -> IO ()
repProgress cfg q handle =
sqsChangeMessageTimeout (sc_aws cfg) q handle (minutes 1)
rollback :: SqsConfig j -> AWSQueue -> MessageHandle -> IO ()
rollback _ _ _ = pure ()