module Kafka.Worker.Fetcher (pollingLoop) where
import qualified Control.Concurrent
import qualified Control.Exception.Safe as Exception
import qualified Data.ByteString as ByteString
import qualified Dict
import qualified GHC.Clock
import qualified Kafka.Consumer as Consumer
import qualified Kafka.Worker.Analytics as Analytics
import qualified Kafka.Worker.Partition as Partition
import qualified Kafka.Worker.Settings as Settings
import qualified Prelude
type EnqueueRecord = (ConsumerRecord -> Prelude.IO Partition.SeekCmd)
pollingLoop ::
Settings.Settings ->
EnqueueRecord ->
Analytics.Analytics ->
Consumer.KafkaConsumer ->
Prelude.IO ()
pollingLoop :: Settings -> EnqueueRecord -> Analytics -> KafkaConsumer -> IO ()
pollingLoop Settings
settings EnqueueRecord
enqueueRecord Analytics
analytics KafkaConsumer
consumer = do
NextPollingTimestamp
now <- IO NextPollingTimestamp
nextPollingTimestamp
Settings
-> EnqueueRecord
-> Analytics
-> KafkaConsumer
-> LastPollingTimestamp
-> IO ()
pollingLoop' Settings
settings EnqueueRecord
enqueueRecord Analytics
analytics KafkaConsumer
consumer (NextPollingTimestamp -> LastPollingTimestamp
pollTimeIsOld NextPollingTimestamp
now)
newtype LastPollingTimestamp = LastPollingTimestamp Float
newtype NextPollingTimestamp = NextPollingTimestamp Float
pollTimeIsOld :: NextPollingTimestamp -> LastPollingTimestamp
pollTimeIsOld :: NextPollingTimestamp -> LastPollingTimestamp
pollTimeIsOld (NextPollingTimestamp Float
time) = Float -> LastPollingTimestamp
LastPollingTimestamp Float
time
nextPollingTimestamp :: Prelude.IO NextPollingTimestamp
nextPollingTimestamp :: IO NextPollingTimestamp
nextPollingTimestamp = do
(Float -> NextPollingTimestamp)
-> IO Float -> IO NextPollingTimestamp
forall (m :: * -> *) a value.
Functor m =>
(a -> value) -> m a -> m value
map Float -> NextPollingTimestamp
NextPollingTimestamp IO Float
GHC.Clock.getMonotonicTime
type ConsumerRecord = Consumer.ConsumerRecord (Maybe ByteString.ByteString) (Maybe ByteString.ByteString)
pollingLoop' ::
Settings.Settings ->
EnqueueRecord ->
Analytics.Analytics ->
Consumer.KafkaConsumer ->
LastPollingTimestamp ->
Prelude.IO ()
pollingLoop' :: Settings
-> EnqueueRecord
-> Analytics
-> KafkaConsumer
-> LastPollingTimestamp
-> IO ()
pollingLoop'
settings :: Settings
settings@Settings.Settings
{ Timeout
pollingTimeout :: Settings -> Timeout
pollingTimeout :: Timeout
Settings.pollingTimeout,
BatchSize
pollBatchSize :: Settings -> BatchSize
pollBatchSize :: BatchSize
Settings.pollBatchSize,
MaxMsgsPerSecondPerPartition
maxMsgsPerSecondPerPartition :: Settings -> MaxMsgsPerSecondPerPartition
maxMsgsPerSecondPerPartition :: MaxMsgsPerSecondPerPartition
Settings.maxMsgsPerSecondPerPartition,
MaxPollIntervalMs
maxPollIntervalMs :: Settings -> MaxPollIntervalMs
maxPollIntervalMs :: MaxPollIntervalMs
Settings.maxPollIntervalMs
}
EnqueueRecord
enqueueRecord
Analytics
analytics
KafkaConsumer
consumer
LastPollingTimestamp
lastPollTimestamp = do
[Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))]
eitherMsgs <- KafkaConsumer
-> Timeout
-> BatchSize
-> IO
[Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))]
forall (m :: * -> *).
MonadIO m =>
KafkaConsumer
-> Timeout
-> BatchSize
-> m [Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))]
Consumer.pollMessageBatch KafkaConsumer
consumer Timeout
pollingTimeout BatchSize
pollBatchSize
[ConsumerRecord (Maybe ByteString) (Maybe ByteString)]
msgs <- (Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))
-> IO (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
-> [Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))]
-> IO [ConsumerRecord (Maybe ByteString) (Maybe ByteString)]
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
Prelude.traverse Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))
-> IO (ConsumerRecord (Maybe ByteString) (Maybe ByteString))
forall a. Either KafkaError a -> IO a
handleKafkaError [Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))]
eitherMsgs
Map TopicName [PartitionId]
assignment <-
KafkaConsumer
-> IO (Either KafkaError (Map TopicName [PartitionId]))
forall (m :: * -> *).
MonadIO m =>
KafkaConsumer
-> m (Either KafkaError (Map TopicName [PartitionId]))
Consumer.assignment KafkaConsumer
consumer
IO (Either KafkaError (Map TopicName [PartitionId]))
-> (IO (Either KafkaError (Map TopicName [PartitionId]))
-> IO (Map TopicName [PartitionId]))
-> IO (Map TopicName [PartitionId])
forall a b. a -> (a -> b) -> b
|> (Either KafkaError (Map TopicName [PartitionId])
-> IO (Map TopicName [PartitionId]))
-> IO (Either KafkaError (Map TopicName [PartitionId]))
-> IO (Map TopicName [PartitionId])
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
andThen Either KafkaError (Map TopicName [PartitionId])
-> IO (Map TopicName [PartitionId])
forall a. Either KafkaError a -> IO a
handleKafkaError
[SeekCmd]
appendResults <-
[ConsumerRecord (Maybe ByteString) (Maybe ByteString)]
msgs
[ConsumerRecord (Maybe ByteString) (Maybe ByteString)]
-> ([ConsumerRecord (Maybe ByteString) (Maybe ByteString)]
-> [ConsumerRecord (Maybe ByteString) (Maybe ByteString)])
-> [ConsumerRecord (Maybe ByteString) (Maybe ByteString)]
forall a b. a -> (a -> b) -> b
|> (ConsumerRecord (Maybe ByteString) (Maybe ByteString) -> Bool)
-> [ConsumerRecord (Maybe ByteString) (Maybe ByteString)]
-> [ConsumerRecord (Maybe ByteString) (Maybe ByteString)]
forall a. (a -> Bool) -> List a -> List a
List.filter (Map TopicName [PartitionId]
-> ConsumerRecord (Maybe ByteString) (Maybe ByteString) -> Bool
msgIsForAssignedPartition Map TopicName [PartitionId]
assignment)
[ConsumerRecord (Maybe ByteString) (Maybe ByteString)]
-> ([ConsumerRecord (Maybe ByteString) (Maybe ByteString)]
-> IO [SeekCmd])
-> IO [SeekCmd]
forall a b. a -> (a -> b) -> b
|> EnqueueRecord
-> [ConsumerRecord (Maybe ByteString) (Maybe ByteString)]
-> IO [SeekCmd]
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
Prelude.traverse EnqueueRecord
enqueueRecord
((TopicName, PartitionId)
-> SeekCmd -> ((TopicName, PartitionId), SeekCmd))
-> List (TopicName, PartitionId)
-> [SeekCmd]
-> List ((TopicName, PartitionId), SeekCmd)
forall a b result.
(a -> b -> result) -> List a -> List b -> List result
List.map2 (,) ((ConsumerRecord (Maybe ByteString) (Maybe ByteString)
-> (TopicName, PartitionId))
-> [ConsumerRecord (Maybe ByteString) (Maybe ByteString)]
-> List (TopicName, PartitionId)
forall a b. (a -> b) -> List a -> List b
List.map ConsumerRecord (Maybe ByteString) (Maybe ByteString)
-> (TopicName, PartitionId)
forall k v. ConsumerRecord k v -> (TopicName, PartitionId)
getPartitionKey [ConsumerRecord (Maybe ByteString) (Maybe ByteString)]
msgs) [SeekCmd]
appendResults
List ((TopicName, PartitionId), SeekCmd)
-> (List ((TopicName, PartitionId), SeekCmd)
-> Dict (TopicName, PartitionId) [SeekCmd])
-> Dict (TopicName, PartitionId) [SeekCmd]
forall a b. a -> (a -> b) -> b
|> (((TopicName, PartitionId), SeekCmd)
-> ((TopicName, PartitionId), SeekCmd))
-> List ((TopicName, PartitionId), SeekCmd)
-> Dict (TopicName, PartitionId) [SeekCmd]
forall b a c. Ord b => (a -> (b, c)) -> List a -> Dict b (List c)
groupDictAndMap ((TopicName, PartitionId), SeekCmd)
-> ((TopicName, PartitionId), SeekCmd)
forall a. a -> a
identity
Dict (TopicName, PartitionId) [SeekCmd]
-> (Dict (TopicName, PartitionId) [SeekCmd]
-> List ((TopicName, PartitionId), [SeekCmd]))
-> List ((TopicName, PartitionId), [SeekCmd])
forall a b. a -> (a -> b) -> b
|> Dict (TopicName, PartitionId) [SeekCmd]
-> List ((TopicName, PartitionId), [SeekCmd])
forall k v. Dict k v -> List (k, v)
Dict.toList
List ((TopicName, PartitionId), [SeekCmd])
-> (List ((TopicName, PartitionId), [SeekCmd])
-> List TopicPartition)
-> List TopicPartition
forall a b. a -> (a -> b) -> b
|> (((TopicName, PartitionId), [SeekCmd]) -> Maybe TopicPartition)
-> List ((TopicName, PartitionId), [SeekCmd])
-> List TopicPartition
forall a b. (a -> Maybe b) -> List a -> List b
List.filterMap ((TopicName, PartitionId), [SeekCmd]) -> Maybe TopicPartition
toSeekPartition
List TopicPartition -> (List TopicPartition -> IO ()) -> IO ()
forall a b. a -> (a -> b) -> b
|> KafkaConsumer -> List TopicPartition -> IO ()
seek KafkaConsumer
consumer
NextPollingTimestamp
now <- IO NextPollingTimestamp
nextPollingTimestamp
MaxMsgsPerSecondPerPartition
-> MaxPollIntervalMs
-> Int
-> Analytics
-> NextPollingTimestamp
-> LastPollingTimestamp
-> IO ()
throttle MaxMsgsPerSecondPerPartition
maxMsgsPerSecondPerPartition MaxPollIntervalMs
maxPollIntervalMs ([SeekCmd] -> Int
forall a. List a -> Int
List.length [SeekCmd]
appendResults) Analytics
analytics NextPollingTimestamp
now LastPollingTimestamp
lastPollTimestamp
Settings
-> EnqueueRecord
-> Analytics
-> KafkaConsumer
-> LastPollingTimestamp
-> IO ()
pollingLoop' Settings
settings EnqueueRecord
enqueueRecord Analytics
analytics KafkaConsumer
consumer (NextPollingTimestamp -> LastPollingTimestamp
pollTimeIsOld NextPollingTimestamp
now)
getPartitionKey :: Consumer.ConsumerRecord k v -> (Consumer.TopicName, Consumer.PartitionId)
getPartitionKey :: ConsumerRecord k v -> (TopicName, PartitionId)
getPartitionKey ConsumerRecord k v
record =
( ConsumerRecord k v -> TopicName
forall k v. ConsumerRecord k v -> TopicName
Consumer.crTopic ConsumerRecord k v
record,
ConsumerRecord k v -> PartitionId
forall k v. ConsumerRecord k v -> PartitionId
Consumer.crPartition ConsumerRecord k v
record
)
toSeekPartition ::
( (Consumer.TopicName, Consumer.PartitionId),
List Partition.SeekCmd
) ->
Maybe Consumer.TopicPartition
toSeekPartition :: ((TopicName, PartitionId), [SeekCmd]) -> Maybe TopicPartition
toSeekPartition ((TopicName
topicName, PartitionId
partitionId), [SeekCmd]
appendResults) =
case [SeekCmd] -> Maybe SeekCmd
forall a. List a -> Maybe a
last [SeekCmd]
appendResults of
Maybe SeekCmd
Nothing -> Maybe TopicPartition
forall a. Maybe a
Nothing
Just SeekCmd
Partition.NoSeek -> Maybe TopicPartition
forall a. Maybe a
Nothing
Just (Partition.SeekToOffset Int
offset) ->
TopicPartition -> Maybe TopicPartition
forall a. a -> Maybe a
Just
TopicPartition :: TopicName -> PartitionId -> PartitionOffset -> TopicPartition
Consumer.TopicPartition
{ tpTopicName :: TopicName
Consumer.tpTopicName = TopicName
topicName,
tpPartition :: PartitionId
Consumer.tpPartition = PartitionId
partitionId,
tpOffset :: PartitionOffset
Consumer.tpOffset = Int -> PartitionOffset
Consumer.PartitionOffset Int
offset
}
last :: List a -> Maybe a
last :: List a -> Maybe a
last List a
list = List a -> Maybe a
forall a. List a -> Maybe a
List.head (List a -> List a
forall a. List a -> List a
List.reverse List a
list)
seek :: Consumer.KafkaConsumer -> List Consumer.TopicPartition -> Prelude.IO ()
seek :: KafkaConsumer -> List TopicPartition -> IO ()
seek KafkaConsumer
consumer List TopicPartition
partitions = do
let goSeek :: IO (Maybe KafkaError)
goSeek = KafkaConsumer
-> Timeout -> List TopicPartition -> IO (Maybe KafkaError)
forall (m :: * -> *).
MonadIO m =>
KafkaConsumer
-> Timeout -> List TopicPartition -> m (Maybe KafkaError)
Consumer.seek KafkaConsumer
consumer (Int -> Timeout
Consumer.Timeout Int
5000 ) List TopicPartition
partitions
Maybe KafkaError
maybeSeekError <- IO (Maybe KafkaError)
goSeek
case Maybe KafkaError
maybeSeekError of
Maybe KafkaError
Nothing -> () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
Prelude.pure ()
Just KafkaError
_ -> do
Int -> IO ()
Control.Concurrent.threadDelay Int
5_000_000
Maybe KafkaError
maybeSeekError2 <- IO (Maybe KafkaError)
goSeek
case Maybe KafkaError
maybeSeekError2 of
Maybe KafkaError
Nothing -> () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
Prelude.pure ()
Just KafkaError
seekError2 -> KafkaError -> IO ()
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
Exception.throwIO KafkaError
seekError2
msgIsForAssignedPartition ::
Dict.Dict Consumer.TopicName [Consumer.PartitionId] ->
ConsumerRecord ->
Bool
msgIsForAssignedPartition :: Map TopicName [PartitionId]
-> ConsumerRecord (Maybe ByteString) (Maybe ByteString) -> Bool
msgIsForAssignedPartition Map TopicName [PartitionId]
assignment ConsumerRecord (Maybe ByteString) (Maybe ByteString)
msg =
case TopicName -> Map TopicName [PartitionId] -> Maybe [PartitionId]
forall comparable v.
Ord comparable =>
comparable -> Dict comparable v -> Maybe v
Dict.get (ConsumerRecord (Maybe ByteString) (Maybe ByteString) -> TopicName
forall k v. ConsumerRecord k v -> TopicName
Consumer.crTopic ConsumerRecord (Maybe ByteString) (Maybe ByteString)
msg) Map TopicName [PartitionId]
assignment of
Maybe [PartitionId]
Nothing -> Bool
False
Just [PartitionId]
partitionIds ->
PartitionId -> [PartitionId] -> Bool
forall a. Eq a => a -> List a -> Bool
List.member (ConsumerRecord (Maybe ByteString) (Maybe ByteString) -> PartitionId
forall k v. ConsumerRecord k v -> PartitionId
Consumer.crPartition ConsumerRecord (Maybe ByteString) (Maybe ByteString)
msg) [PartitionId]
partitionIds
handleKafkaError :: Prelude.Either Consumer.KafkaError a -> Prelude.IO a
handleKafkaError :: Either KafkaError a -> IO a
handleKafkaError Either KafkaError a
eitherMsg = do
case Either KafkaError a
eitherMsg of
Prelude.Left KafkaError
err ->
KafkaError -> IO a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
Exception.throwIO KafkaError
err
Prelude.Right a
record ->
a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
Prelude.pure a
record
throttle ::
Settings.MaxMsgsPerSecondPerPartition ->
Settings.MaxPollIntervalMs ->
Int ->
Analytics.Analytics ->
NextPollingTimestamp ->
LastPollingTimestamp ->
Prelude.IO ()
throttle :: MaxMsgsPerSecondPerPartition
-> MaxPollIntervalMs
-> Int
-> Analytics
-> NextPollingTimestamp
-> LastPollingTimestamp
-> IO ()
throttle MaxMsgsPerSecondPerPartition
Settings.DontThrottle MaxPollIntervalMs
_ Int
_ Analytics
_ NextPollingTimestamp
_ LastPollingTimestamp
_ = () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
Prelude.pure ()
throttle (Settings.ThrottleAt Int
maxMsgsPerSecondPerPartition) MaxPollIntervalMs
maxPollIntervalMs Int
newPolledMessages Analytics
analytics (NextPollingTimestamp Float
now) (LastPollingTimestamp Float
lastPollTimestamp) = do
(PausedPartitions
_, Analytics.AssignedPartitions Int
numPartitions, TimeOfLastRebalance
_) <- Analytics
-> IO (PausedPartitions, AssignedPartitions, TimeOfLastRebalance)
Analytics.read Analytics
analytics
let timeDiff :: Int
timeDiff = Float -> Int
forall a b. (RealFrac a, Integral b) => a -> b
Prelude.floor (Float
now Float -> Float -> Float
forall number. Num number => number -> number -> number
- Float
lastPollTimestamp)
let quotumPerSecond :: Int
quotumPerSecond = Int
maxMsgsPerSecondPerPartition Int -> Int -> Int
forall number. Num number => number -> number -> number
* Int
numPartitions
let quotum :: Int
quotum = Int
timeDiff Int -> Int -> Int
forall number. Num number => number -> number -> number
* Int
quotumPerSecond
let overQuotum :: Int
overQuotum = Int
newPolledMessages Int -> Int -> Int
forall number. Num number => number -> number -> number
- Int
quotum
let secondsToSleep :: Float
secondsToSleep =
Int -> Float
forall a b. (Integral a, Num b) => a -> b
Prelude.fromIntegral Int
overQuotum Float -> Float -> Float
/ Int -> Float
forall a b. (Integral a, Num b) => a -> b
Prelude.fromIntegral Int
quotumPerSecond
let microSecondsToSleep :: Int
microSecondsToSleep =
Float -> Int
forall a b. (RealFrac a, Integral b) => a -> b
Prelude.floor (Float
secondsToSleep Float -> Float -> Float
forall number. Num number => number -> number -> number
* Float
1e6)
Int -> (Int -> Int) -> Int
forall a b. a -> (a -> b) -> b
|> Int -> Int -> Int
forall comparable.
Ord comparable =>
comparable -> comparable -> comparable
min (Int -> Int
forall a b. (Integral a, Num b) => a -> b
Prelude.fromIntegral (Int -> Int) -> Int -> Int
forall a b. (a -> b) -> a -> b
<| MaxPollIntervalMs -> Int
Settings.unMaxPollIntervalMs MaxPollIntervalMs
maxPollIntervalMs Int -> Int -> Int
forall number. Num number => number -> number -> number
- Int
100)
if Int
microSecondsToSleep Int -> Int -> Bool
forall comparable.
Ord comparable =>
comparable -> comparable -> Bool
> Int
0
then Int -> IO ()
Control.Concurrent.threadDelay Int
microSecondsToSleep
else () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
Prelude.pure ()
groupDictAndMap :: Ord b => (a -> (b, c)) -> List a -> Dict.Dict b (List c)
groupDictAndMap :: (a -> (b, c)) -> List a -> Dict b (List c)
groupDictAndMap a -> (b, c)
f =
(a -> Dict b (List c) -> Dict b (List c))
-> Dict b (List c) -> List a -> Dict b (List c)
forall a b. (a -> b -> b) -> b -> List a -> b
List.foldr
( \a
x ->
b
-> (Maybe (List c) -> Maybe (List c))
-> Dict b (List c)
-> Dict b (List c)
forall comparable v.
Ord comparable =>
comparable
-> (Maybe v -> Maybe v) -> Dict comparable v -> Dict comparable v
Dict.update ((b, c) -> b
forall a b. (a, b) -> a
Tuple.first (a -> (b, c)
f a
x)) ((Maybe (List c) -> Maybe (List c))
-> Dict b (List c) -> Dict b (List c))
-> (Maybe (List c) -> Maybe (List c))
-> Dict b (List c)
-> Dict b (List c)
forall a b. (a -> b) -> a -> b
<| \Maybe (List c)
val ->
case Maybe (List c)
val of
Maybe (List c)
Nothing -> List c -> Maybe (List c)
forall a. a -> Maybe a
Just [(b, c) -> c
forall a b. (a, b) -> b
Tuple.second (a -> (b, c)
f a
x)]
Just List c
y -> List c -> Maybe (List c)
forall a. a -> Maybe a
Just ((b, c) -> c
forall a b. (a, b) -> b
Tuple.second (a -> (b, c)
f a
x) c -> List c -> List c
forall a. a -> [a] -> [a]
: List c
y)
)
Dict b (List c)
forall k v. Dict k v
Dict.empty