module Aws.Kinesis.Client.Consumer.Internal
(
MessageQueueItem
, MessageQueue
, StreamState
, updateStreamState
, replenishMessages
, module Aws.Kinesis.Client.Consumer.Internal.Kit
, module Aws.Kinesis.Client.Consumer.Internal.ShardState
, module Aws.Kinesis.Client.Consumer.Internal.SavedStreamState
) where
import Aws.Kinesis
import Aws.Kinesis.Client.Common
import Aws.Kinesis.Client.Consumer.Internal.Kit
import Aws.Kinesis.Client.Consumer.Internal.ShardState
import Aws.Kinesis.Client.Consumer.Internal.SavedStreamState
import Control.Lens
import Control.Lens.Action
import Control.Concurrent.STM
import Control.Concurrent.STM.Queue
import Control.Monad
import Control.Monad.Trans
import qualified Data.Carousel as CR
import Data.Conduit
import qualified Data.Conduit.List as CondL
import Prelude.Unicode
#ifdef DEBUG
import Data.Monoid.Unicode
import System.IO
#else
#endif
type MessageQueueItem = (ShardState, Record)
type MessageQueue = TBQueue MessageQueueItem
type StreamState = CR.Carousel ShardState
updateStreamState
∷ ConsumerKit
→ StreamState
→ IO StreamState
updateStreamState ConsumerKit{..} state = do
let
existingShardIds = state ^. CR.clList <&> view ssShardId
shardSource =
flip mapOutputMaybe (streamOpenShardSource _ckKinesisKit _ckStreamName) $ \sh@Shard{..} →
if shardShardId ∈ existingShardIds
then Nothing
else Just sh
newShards ← shardSource $$ CondL.consume
shardStates ← forM newShards $ \Shard{..} → do
let
startingSequenceNumber =
_ckSavedStreamState ^? _Just ∘ _SavedStreamState ∘ ix shardShardId
iteratorType =
maybe
_ckIteratorType
(const AfterSequenceNumber)
startingSequenceNumber
#ifdef DEBUG
debugPrint stdout $ "Getting " ⊕ show iteratorType ⊕ " iterator for shard " ⊕ show shardShardId
#else
return ()
#endif
GetShardIteratorResponse it ← runKinesis _ckKinesisKit GetShardIterator
{ getShardIteratorShardId = shardShardId
, getShardIteratorShardIteratorType = iteratorType
, getShardIteratorStartingSequenceNumber = startingSequenceNumber
, getShardIteratorStreamName = _ckStreamName
}
liftIO ∘ atomically $ do
iteratorVar ← newTVar $ Just it
sequenceNumberVar ← newTVar startingSequenceNumber
return $ makeShardState shardShardId iteratorVar sequenceNumberVar
return ∘ CR.nub $ CR.append shardStates state
replenishMessages
∷ ConsumerKit
→ MessageQueue
→ TVar StreamState
→ IO Int
replenishMessages ConsumerKit{..} messageQueue shardsVar = do
liftIO ∘ atomically ∘ awaitQueueEmpty $ messageQueue
(shard, iterator) ← liftIO ∘ atomically $ do
mshard ← shardsVar ^!? act readTVar ∘ CR.cursor
shard ← maybe retry return mshard
miterator ← shard ^! ssIterator ∘ act readTVar
iterator ← maybe retry return miterator
return (shard, iterator)
GetRecordsResponse{..} ← runKinesis _ckKinesisKit GetRecords
{ getRecordsLimit = Just $ fromIntegral _ckBatchSize
, getRecordsShardIterator = iterator
}
#ifdef DEBUG
debugPrint stdout $
"Replenished shard "
⊕ show (shard ^. ssShardId)
⊕ " with "
⊕ show (length getRecordsResRecords)
⊕ " records"
#else
return ()
#endif
liftIO ∘ atomically $ do
writeTVar (shard ^. ssIterator) getRecordsResNextShardIterator
forM_ getRecordsResRecords $ writeTBQueue messageQueue ∘ (shard ,)
modifyTVar shardsVar CR.moveRight
return $ length getRecordsResRecords
#ifdef DEBUG
debugPrint
∷ MonadIO m
⇒ Handle
→ String
→ m ()
debugPrint h =
liftIO
∘ hPutStrLn h
∘ ("[Kinesis Consumer] " ⊕)
#else
#endif