module Aws.Kinesis.Client.Producer.Internal
( ProducerKit(..)
, makeProducerKit
, pkKinesisKit
, pkStreamName
, pkBatchPolicy
, pkRetryPolicy
, pkMessageQueueBounds
, pkMaxConcurrency
, pkCleanupTimeout
, pkQueueImplementation
, QueueImplementation(..)
, defaultQueueImplementation
, BatchPolicy
, defaultBatchPolicy
, bpBatchSize
, RetryPolicy
, defaultRetryPolicy
, rpRetryCount
, Message
, MessageQueueItem(..)
, mqiMessage
, mqiRemainingAttempts
, mqiPartitionKey
, messageQueueItemIsEligible
) where
import Aws.Kinesis
import Aws.Kinesis.Client.Common
import Aws.Kinesis.Client.Internal.Queue
import Control.Concurrent.STM.TBMChan
import Control.Lens
import Data.Proxy
import qualified Data.Text as T
import Numeric.Natural
import Prelude.Unicode
type Message = T.Text
data MessageQueueItem
= MessageQueueItem
{ _mqiMessage ∷ !Message
, _mqiPartitionKey ∷ !PartitionKey
, _mqiRemainingAttempts ∷ !Natural
} deriving (Eq, Show)
mqiMessage ∷ Lens' MessageQueueItem Message
mqiMessage = lens _mqiMessage $ \i m → i { _mqiMessage = m }
mqiPartitionKey ∷ Lens' MessageQueueItem PartitionKey
mqiPartitionKey = lens _mqiPartitionKey $ \i s → i { _mqiPartitionKey = s }
mqiRemainingAttempts ∷ Lens' MessageQueueItem Natural
mqiRemainingAttempts = lens _mqiRemainingAttempts $ \i n → i { _mqiRemainingAttempts = n }
messageQueueItemIsEligible
∷ MessageQueueItem
→ Bool
messageQueueItemIsEligible =
(≥ 1) ∘ _mqiRemainingAttempts
data BatchPolicy
= BatchPolicy
{ _bpBatchSize ∷ !Natural
} deriving (Eq, Show)
bpBatchSize ∷ Lens' BatchPolicy Natural
bpBatchSize = lens _bpBatchSize $ \bp bs → bp { _bpBatchSize = bs }
defaultBatchPolicy ∷ BatchPolicy
defaultBatchPolicy = BatchPolicy
{ _bpBatchSize = 200
}
data RetryPolicy
= RetryPolicy
{ _rpRetryCount ∷ !Natural
} deriving (Eq, Show)
rpRetryCount ∷ Lens' RetryPolicy Natural
rpRetryCount = lens _rpRetryCount $ \rp n → rp { _rpRetryCount = n }
defaultRetryPolicy ∷ RetryPolicy
defaultRetryPolicy = RetryPolicy
{ _rpRetryCount = 5
}
data QueueImplementation
= ∀ proxy q
. BoundedCloseableQueue q MessageQueueItem
⇒ QueueImplementation (proxy q)
defaultQueueImplementation ∷ QueueImplementation
defaultQueueImplementation = QueueImplementation (Proxy ∷ ∀ α. Proxy (TBMChan α))
data ProducerKit
= ProducerKit
{ _pkKinesisKit ∷ !KinesisKit
, _pkStreamName ∷ !StreamName
, _pkBatchPolicy ∷ !BatchPolicy
, _pkRetryPolicy ∷ !RetryPolicy
, _pkMessageQueueBounds ∷ !Natural
, _pkMaxConcurrency ∷ !Natural
, _pkCleanupTimeout ∷ !(Maybe Natural)
, _pkQueueImplementation ∷ QueueImplementation
}
makeProducerKit
∷ KinesisKit
→ StreamName
→ ProducerKit
makeProducerKit kinesisKit stream = ProducerKit
{ _pkKinesisKit = kinesisKit
, _pkStreamName = stream
, _pkBatchPolicy = defaultBatchPolicy
, _pkRetryPolicy = defaultRetryPolicy
, _pkMessageQueueBounds = 10000
, _pkMaxConcurrency = 3
, _pkCleanupTimeout = Nothing
, _pkQueueImplementation = defaultQueueImplementation
}
pkKinesisKit ∷ Lens' ProducerKit KinesisKit
pkKinesisKit = lens _pkKinesisKit $ \pk kk → pk { _pkKinesisKit = kk }
pkStreamName ∷ Lens' ProducerKit StreamName
pkStreamName = lens _pkStreamName $ \pk sn → pk { _pkStreamName = sn }
pkBatchPolicy ∷ Lens' ProducerKit BatchPolicy
pkBatchPolicy = lens _pkBatchPolicy $ \pk bp → pk { _pkBatchPolicy = bp }
pkRetryPolicy ∷ Lens' ProducerKit RetryPolicy
pkRetryPolicy = lens _pkRetryPolicy $ \pk rp → pk { _pkRetryPolicy = rp }
pkMessageQueueBounds ∷ Lens' ProducerKit Natural
pkMessageQueueBounds = lens _pkMessageQueueBounds $ \pk qb → pk { _pkMessageQueueBounds = qb }
pkMaxConcurrency ∷ Lens' ProducerKit Natural
pkMaxConcurrency = lens _pkMaxConcurrency $ \pk n → pk { _pkMaxConcurrency = n }
pkCleanupTimeout ∷ Lens' ProducerKit (Maybe Natural)
pkCleanupTimeout = lens _pkCleanupTimeout $ \pk n → pk { _pkCleanupTimeout = n }
pkQueueImplementation ∷ Setter' ProducerKit QueueImplementation
pkQueueImplementation = lens _pkQueueImplementation $ \pk q → pk { _pkQueueImplementation = q }