module Aws.Kinesis.Client.Consumer
(
KinesisConsumer
, managedKinesisConsumer
, withKinesisConsumer
, consumerSource
, readConsumer
, tryReadConsumer
, consumerStreamState
, ConsumerKit(..)
, ckKinesisKit
, ckStreamName
, ckBatchSize
, ConsumerError(..)
, MonadConsumer
, SavedStreamState
) where
import qualified Aws.Kinesis as Kin
import Aws.Kinesis.Client.Common
import Control.Applicative
import Control.Applicative.Unicode
import Control.Concurrent.Async.Lifted
import Control.Concurrent.Lifted hiding (yield)
import Control.Concurrent.STM
import Control.Concurrent.STM.Queue
import Control.Exception
import Control.Lens
import Control.Lens.Action
import Control.Monad.Codensity
import Control.Monad.Error.Class
import Control.Monad.Reader
import Control.Monad.Trans.Control
import Control.Monad.Unicode
import qualified Data.Aeson as Æ
import qualified Data.Carousel as CR
import qualified Data.Map as M
import qualified Data.HashMap.Strict as HM
import Data.Traversable (for)
import Data.Conduit
import qualified Data.Conduit.List as CL
import Prelude.Unicode
data ShardState
= ShardState
{ _ssIterator ∷ !(TVar (Maybe Kin.ShardIterator))
, _ssShardId ∷ !Kin.ShardId
, _ssLastSequenceNumber ∷ !(TVar (Maybe Kin.SequenceNumber))
}
ssIterator ∷ Getter ShardState (TVar (Maybe Kin.ShardIterator))
ssIterator = to _ssIterator
ssShardId ∷ Lens' ShardState Kin.ShardId
ssShardId = lens _ssShardId $ \ss sid → ss { _ssShardId = sid }
ssLastSequenceNumber ∷ Getter ShardState (TVar (Maybe Kin.SequenceNumber))
ssLastSequenceNumber = to _ssLastSequenceNumber
instance Eq ShardState where
ss == ss' = ss ^. ssShardId ≡ ss' ^. ssShardId
data ConsumerError
= NoShards
| KinesisError !SomeException
deriving Show
data ConsumerKit
= ConsumerKit
{ _ckKinesisKit ∷ !KinesisKit
, _ckStreamName ∷ !Kin.StreamName
, _ckBatchSize ∷ !Int
, _ckIteratorType ∷ !Kin.ShardIteratorType
, _ckSavedStreamState ∷ !(Maybe SavedStreamState)
}
ckKinesisKit ∷ Lens' ConsumerKit KinesisKit
ckKinesisKit = lens _ckKinesisKit $ \ck kk → ck { _ckKinesisKit = kk }
ckStreamName ∷ Lens' ConsumerKit Kin.StreamName
ckStreamName = lens _ckStreamName $ \ck sn → ck { _ckStreamName = sn }
ckBatchSize ∷ Lens' ConsumerKit Int
ckBatchSize = lens _ckBatchSize $ \ck bs → ck { _ckBatchSize = bs }
ckIteratorType ∷ Lens' ConsumerKit Kin.ShardIteratorType
ckIteratorType = lens _ckIteratorType $ \ck it → ck { _ckIteratorType = it }
ckSavedStreamState ∷ Lens' ConsumerKit (Maybe SavedStreamState)
ckSavedStreamState = lens _ckSavedStreamState $ \ck ss → ck { _ckSavedStreamState = ss }
type MessageQueueItem = (ShardState, Kin.Record)
type StreamState = CR.Carousel ShardState
newtype SavedStreamState
= SavedStreamState
{ _savedStreamState ∷ M.Map Kin.ShardId Kin.SequenceNumber
}
_SavedStreamState ∷ Iso' SavedStreamState (M.Map Kin.ShardId Kin.SequenceNumber)
_SavedStreamState = iso _savedStreamState SavedStreamState
instance Æ.ToJSON SavedStreamState where
toJSON (SavedStreamState m) =
Æ.Object ∘ HM.fromList ∘ flip fmap (M.toList m) $ \(sid, sn) →
let Æ.String sid' = Æ.toJSON sid
in sid' Æ..= sn
instance Æ.FromJSON SavedStreamState where
parseJSON =
Æ.withObject "SavedStreamState" $ \xs → do
fmap (SavedStreamState ∘ M.fromList) ∘ for (HM.toList xs) $ \(sid, sn) → do
pure (,)
⊛ Æ.parseJSON (Æ.String sid)
⊛ Æ.parseJSON sn
data KinesisConsumer
= KinesisConsumer
{ _kcMessageQueue ∷ !(TBQueue MessageQueueItem)
, _kcStreamState ∷ !(TVar StreamState)
}
kcMessageQueue ∷ Getter KinesisConsumer (TBQueue MessageQueueItem)
kcMessageQueue = to _kcMessageQueue
kcStreamState ∷ Getter KinesisConsumer (TVar StreamState)
kcStreamState = to _kcStreamState
type MonadConsumer m
= ( MonadIO m
, MonadBaseControl IO m
, MonadError ConsumerError m
)
type MonadConsumerInternal m
= ( MonadConsumer m
, MonadReader ConsumerKit m
)
managedKinesisConsumer
∷ MonadConsumer m
⇒ ConsumerKit
→ Codensity m KinesisConsumer
managedKinesisConsumer kit =
Codensity $ withKinesisConsumer kit
withKinesisConsumer
∷ MonadConsumer m
⇒ ConsumerKit
→ (KinesisConsumer → m α)
→ m α
withKinesisConsumer kit inner =
flip runReaderT kit $ do
batchSize ← view ckBatchSize
messageQueue ← liftIO ∘ newTBQueueIO $ batchSize * 10
state ← updateStreamState CR.empty ≫= liftIO ∘ newTVarIO
let reshardingLoop = forever $
handleError (\_ → liftIO $ threadDelay 3000000) $ do
liftIO (readTVarIO state)
≫= updateStreamState
≫= liftIO ∘ atomically ∘ writeTVar state
liftIO $ threadDelay 10000000
producerLoop = forever $
handleError (\_ → liftIO $ threadDelay 2000000) $ do
recordsCount ← replenishMessages messageQueue state
liftIO ∘ threadDelay $
case recordsCount of
0 → 5000000
_ → 70000
withAsync reshardingLoop $ \reshardingHandle → do
link reshardingHandle
withAsync producerLoop $ \producerHandle → do
link producerHandle
res ← lift ∘ inner $ KinesisConsumer messageQueue state
return res
updateStreamState
∷ MonadConsumerInternal m
⇒ StreamState
→ m StreamState
updateStreamState state = do
streamName ← view ckStreamName
defaultIteratorType ← view ckIteratorType
savedState ← view ckSavedStreamState
mapError KinesisError ∘ mapEnvironment ckKinesisKit $ do
let existingShardIds = state ^. CR.clList <&> view ssShardId
shardSource = flip mapOutputMaybe (streamOpenShardSource streamName) $ \sh@Kin.Shard{..} →
if shardShardId `elem` existingShardIds
then Nothing
else Just sh
newShards ← shardSource $$ CL.consume
shardStates ← forM newShards $ \Kin.Shard{..} → do
let startingSequenceNumber =
savedState ^? _Just ∘ _SavedStreamState ∘ ix shardShardId
iteratorType =
maybe
defaultIteratorType
(const Kin.AfterSequenceNumber)
startingSequenceNumber
Kin.GetShardIteratorResponse it ← runKinesis Kin.GetShardIterator
{ Kin.getShardIteratorShardId = shardShardId
, Kin.getShardIteratorShardIteratorType = iteratorType
, Kin.getShardIteratorStartingSequenceNumber = startingSequenceNumber
, Kin.getShardIteratorStreamName = streamName
}
liftIO $ do
iteratorVar ← newTVarIO $ Just it
sequenceNumberVar ← newTVarIO Nothing
return ShardState
{ _ssIterator = iteratorVar
, _ssShardId = shardShardId
, _ssLastSequenceNumber = sequenceNumberVar
}
return ∘ CR.nub $ CR.append shardStates state
replenishMessages
∷ MonadConsumerInternal m
⇒ TBQueue MessageQueueItem
→ TVar StreamState
→ m Int
replenishMessages messageQueue shardsVar = do
bufferSize ← view ckBatchSize
liftIO ∘ atomically ∘ awaitQueueEmpty $ messageQueue
(shard, iterator) ← liftIO ∘ atomically $ do
mshard ← shardsVar ^!? act readTVar ∘ CR.cursor
shard ← maybe retry return mshard
miterator ← shard ^! ssIterator ∘ act readTVar
iterator ← maybe retry return miterator
return (shard, iterator)
Kin.GetRecordsResponse{..} ← mapError KinesisError ∘ mapEnvironment ckKinesisKit $ runKinesis Kin.GetRecords
{ getRecordsLimit = Just bufferSize
, getRecordsShardIterator = iterator
}
liftIO ∘ atomically $ do
writeTVar (shard ^. ssIterator) getRecordsResNextShardIterator
forM_ getRecordsResRecords $ writeTBQueue messageQueue ∘ (shard ,)
modifyTVar shardsVar CR.moveRight
return $ length getRecordsResRecords
readConsumer
∷ MonadConsumer m
⇒ KinesisConsumer
→ m Kin.Record
readConsumer consumer =
liftIO ∘ atomically $ do
(ss, rec) ← consumer ^! kcMessageQueue ∘ act readTBQueue
writeTVar (ss ^. ssLastSequenceNumber) ∘ Just $ Kin.recordSequenceNumber rec
return rec
tryReadConsumer
∷ MonadConsumer m
⇒ KinesisConsumer
→ m (Maybe Kin.Record)
tryReadConsumer consumer =
liftIO ∘ atomically $ do
mitem ← consumer ^! kcMessageQueue ∘ act tryReadTBQueue
for mitem $ \(ss, rec) → do
writeTVar (ss ^. ssLastSequenceNumber) ∘ Just $ Kin.recordSequenceNumber rec
return rec
consumerSource
∷ MonadConsumer m
⇒ KinesisConsumer
→ Source m Kin.Record
consumerSource consumer =
forever $
lift (readConsumer consumer)
≫= yield
consumerStreamState
∷ MonadConsumer m
⇒ KinesisConsumer
→ m SavedStreamState
consumerStreamState consumer =
liftIO ∘ atomically $ do
shards ← consumer ^! kcStreamState ∘ act readTVar ∘ CR.clList
pairs ← for shards $ \ss →
(ss ^. ssShardId,) <$>
ss ^! ssLastSequenceNumber ∘ act readTVar
return ∘ SavedStreamState ∘ M.fromList $ pairs ≫= \(sid, msn) →
msn ^.. _Just ∘ to (sid,)