module Aws.Kinesis.Client.Consumer
(
KinesisConsumer
, managedKinesisConsumer
, withKinesisConsumer
, consumerSource
, readConsumer
, tryReadConsumer
, consumerStreamState
, ConsumerKit(..)
, makeConsumerKit
, SavedStreamState
, ckKinesisKit
, ckStreamName
, ckBatchSize
, ckIteratorType
, ckSavedStreamState
) where
import Aws.Kinesis
import Aws.Kinesis.Client.Consumer.Internal
import Control.Concurrent.Async.Lifted
import Control.Concurrent.Lifted hiding (yield)
import Control.Concurrent.STM
import Control.Exception.Lifted
import Control.Lens
import Control.Lens.Action
import Control.Monad.Codensity
import Control.Monad.Reader
import Control.Monad.Trans.Control
import Control.Monad.Unicode
import qualified Data.Carousel as CR
import qualified Data.Map as M
import Data.Traversable (for)
import Data.Conduit
import Prelude.Unicode
data KinesisConsumer
= KinesisConsumer
{ _kcMessageQueue ∷ !MessageQueue
, _kcStreamState ∷ !(TVar StreamState)
}
kcMessageQueue ∷ Getter KinesisConsumer MessageQueue
kcMessageQueue = to _kcMessageQueue
kcStreamState ∷ Getter KinesisConsumer (TVar StreamState)
kcStreamState = to _kcStreamState
managedKinesisConsumer
∷ ( MonadIO m
, MonadBaseControl IO m
)
⇒ ConsumerKit
→ Codensity m KinesisConsumer
managedKinesisConsumer kit =
Codensity $ withKinesisConsumer kit
withKinesisConsumer
∷ ( MonadIO m
, MonadBaseControl IO m
)
⇒ ConsumerKit
→ (KinesisConsumer → m α)
→ m α
withKinesisConsumer kit inner = do
let batchSize = kit ^. ckBatchSize
messageQueue ← liftIO ∘ newTBQueueIO $ fromIntegral batchSize * 10
state ← liftIO $ updateStreamState kit CR.empty ≫= newTVarIO
let
reshardingLoop =
forever ∘ handle (\(SomeException _) → threadDelay 3000000) $ do
readTVarIO state
≫= updateStreamState kit
≫= atomically ∘ writeTVar state
threadDelay 10000000
producerLoop =
forever ∘ handle (\(SomeException _) → threadDelay 2000000) $ do
recordsCount ← replenishMessages kit messageQueue state
threadDelay $
case recordsCount of
0 → 5000000
_ → 70000
withAsync (liftIO reshardingLoop) $ \reshardingHandle → do
link reshardingHandle
withAsync (liftIO producerLoop) $ \producerHandle → do
link producerHandle
res ← inner $ KinesisConsumer messageQueue state
return res
readConsumer
∷ MonadIO m
⇒ KinesisConsumer
→ m Record
readConsumer consumer =
liftIO ∘ atomically $ do
(ss, rec) ← consumer ^! kcMessageQueue ∘ act readTBQueue
writeTVar (ss ^. ssLastSequenceNumber) ∘ Just $ recordSequenceNumber rec
return rec
tryReadConsumer
∷ MonadIO m
⇒ KinesisConsumer
→ m (Maybe Record)
tryReadConsumer consumer =
liftIO ∘ atomically $ do
mitem ← consumer ^! kcMessageQueue ∘ act tryReadTBQueue
for mitem $ \(ss, rec) → do
writeTVar (ss ^. ssLastSequenceNumber) ∘ Just $ recordSequenceNumber rec
return rec
consumerSource
∷ MonadIO m
⇒ KinesisConsumer
→ Source m Record
consumerSource consumer =
forever $
lift (readConsumer consumer)
≫= yield
consumerStreamState
∷ MonadIO m
⇒ KinesisConsumer
→ m SavedStreamState
consumerStreamState consumer =
liftIO ∘ atomically $ do
shards ← consumer
^! kcStreamState
∘ act readTVar
∘ CR.clList
pairs ← for shards $ \state → state
^! ssLastSequenceNumber
∘ act readTVar
∘ to (state ^. ssShardId,)
return ∘ review _SavedStreamState ∘ M.fromList $
pairs ≫= \(sid, msn) →
msn ^.. _Just ∘ to (sid,)