module Aws.Kinesis.Reshard.Shards
( fetchOpenShards
, countOpenShards
, awaitStreamActive
, ReshardingAction(..)
, performReshardingAction
) where
import Aws.Core
import Aws.Kinesis
import Aws.Kinesis.Reshard.Common
import Aws.Kinesis.Reshard.Monad
import Control.Applicative
import Control.Exception.Lifted
import Control.Lens
import Control.Lens.Action
import Control.Monad
import Control.Monad.Error.Hoist
import Control.Monad.Trans
import Control.Monad.Unicode
import qualified Data.List as L
import Data.Maybe
import Data.Conduit
import qualified Data.Conduit.List as CL
import Prelude.Unicode
data ReshardingAction
= SplitShardsAction
| MergeShardsAction
deriving (Eq, Show)
awaitStreamActive
∷ MonadReshard m
⇒ m ()
awaitStreamActive = do
name ← kinesisStreamName
DescribeStreamResponse StreamDescription{..} ← runKinesis DescribeStream
{ describeStreamLimit = Just 1
, describeStreamExclusiveStartShardId = Nothing
, describeStreamStreamName = name
}
case streamDescriptionStreamStatus of
StreamStatusActive → return ()
_ → awaitStreamActive
fetchShardsConduit
∷ MonadReshard m
⇒ Conduit (Maybe ShardId) m Shard
fetchShardsConduit = do
name ← lift kinesisStreamName
awaitForever $ \mshardId → do
let req = DescribeStream
{ describeStreamExclusiveStartShardId = mshardId
, describeStreamLimit = Nothing
, describeStreamStreamName = name
}
resp@(DescribeStreamResponse StreamDescription{..}) ←
lift $ runKinesis req
yield `mapM_` streamDescriptionShards
_ ← traverse (leftover ∘ Just) $
describeStreamExclusiveStartShardId =≪
nextIteratedRequest req resp
return ()
shardsSource
∷ MonadReshard m
⇒ Source m Shard
shardsSource =
CL.sourceList [Nothing]
$= fetchShardsConduit
shardIsOpen
∷ Shard
→ Bool
shardIsOpen =
isNothing
∘ view _2
∘ shardSequenceNumberRange
counterSink
∷ ( Monad m
, Integral i
)
⇒ Sink α m i
counterSink =
CL.fold (\i _ → i + 1) 0
fetchOpenShards
∷ MonadReshard m
⇒ m [Shard]
fetchOpenShards = do
awaitStreamActive
shards ← shardsSource
$= CL.filter shardIsOpen
$$ CL.consume
let orderShards s s' = compare (endingHashKey s) (endingHashKey s')
endingHashKey = view _2 ∘ shardHashKeyRange
return $ L.sortBy orderShards shards
countOpenShards
∷ ( MonadReshard m
, Integral i
)
⇒ m i
countOpenShards = do
awaitStreamActive
shardsSource
$= CL.filter shardIsOpen
$$ counterSink
partitionHashInRange
∷ (PartitionHash, PartitionHash)
→ Either InvalidPartitionHashException PartitionHash
partitionHashInRange range@(lower, upper) = do
unless (upper > lower) $
throwError $ InvalidPartitionHashRange range
let upperInteger = partitionHashInteger upper
lowerInteger = partitionHashInteger lower
middleInteger = lowerInteger + (upperInteger lowerInteger) `div` 2
partitionHash middleInteger
& either (Left ∘ InvalidPartitionHash middleInteger) return
performReshardingAction
∷ MonadReshard m
⇒ ReshardingAction
→ m ()
performReshardingAction SplitShardsAction = do
stream ← kinesisStreamName
shard ← (fetchOpenShards ^!? acts ∘ _head) <!?> SomeException (NoShardsFoundException stream)
startingHashKey ← partitionHashInRange (shardHashKeyRange shard) <%?> SomeException
SplitShardResponse ← runKinesis SplitShard
{ splitShardNewStartingHashKey = startingHashKey
, splitShardShardToSplit = shardShardId shard
, splitShardStreamName = stream
}
return ()
performReshardingAction MergeShardsAction = do
stream ← kinesisStreamName
openShards ← fetchOpenShards
case take 2 $ shardShardId <$> openShards of
[s,s'] → do
MergeShardsResponse ← runKinesis MergeShards
{ mergeShardsShardToMerge = s
, mergeShardsAdjacentShardToMerge = s'
, mergeShardsStreamName = stream
}
return ()
_ → return ()