{-# LANGUAGE GADTs #-}
module Kafka.Worker.Internal where
import qualified Conduit
import qualified Control.Concurrent
import qualified Control.Concurrent.Async as Async
import qualified Control.Concurrent.STM as STM
import qualified Control.Concurrent.STM.TVar as TVar
import qualified Control.Exception.Safe as Exception
import qualified Data.Aeson as Aeson
import qualified Data.UUID
import qualified Data.UUID.V4
import qualified Dict
import qualified GHC.Clock
import qualified Kafka.Consumer as Consumer
import qualified Kafka.Internal as Kafka
import qualified Kafka.Metadata
import qualified Kafka.Worker.Analytics as Analytics
import qualified Kafka.Worker.Fetcher as Fetcher
import qualified Kafka.Worker.Partition as Partition
import qualified Kafka.Worker.Settings as Settings
import qualified Kafka.Worker.Stopping as Stopping
import qualified Observability
import qualified Set
import qualified System.Environment
import qualified System.Exit
import qualified System.Posix.Process
import qualified System.Posix.Signals as Signals
import qualified Prelude
type PartitionKey = (Consumer.TopicName, Consumer.PartitionId)
type AllPartitions = TVar.TVar (Dict.Dict PartitionKey Partition.Partition)
data Rebalance = Assign | Revoking | Revoked deriving (Int -> Rebalance -> ShowS
[Rebalance] -> ShowS
Rebalance -> String
(Int -> Rebalance -> ShowS)
-> (Rebalance -> String)
-> ([Rebalance] -> ShowS)
-> Show Rebalance
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [Rebalance] -> ShowS
$cshowList :: [Rebalance] -> ShowS
show :: Rebalance -> String
$cshow :: Rebalance -> String
showsPrec :: Int -> Rebalance -> ShowS
$cshowsPrec :: Int -> Rebalance -> ShowS
Show)
type RebalanceInfo = TVar.TVar (Dict.Dict PartitionKey (Rebalance, Float))
data State = State
{ State -> AllPartitions
partitions :: AllPartitions,
State -> Stopping
stopping :: Stopping.Stopping,
State -> Analytics
analytics :: Analytics.Analytics,
State -> RebalanceInfo
rebalanceInfo :: RebalanceInfo
}
data TopicSubscription = TopicSubscription
{ TopicSubscription -> Topic
topic :: Kafka.Topic,
TopicSubscription -> MessageCallback
onMessage :: Partition.MessageCallback,
TopicSubscription -> OffsetSource
offsetSource :: OffsetSource
}
data PartitionOffset = PartitionOffset
{
PartitionOffset -> Int
partitionId :: Int,
PartitionOffset -> Int
offset :: Int
}
subscription ::
(Aeson.FromJSON msg, Aeson.ToJSON msg) =>
Text ->
(msg -> Task Text ()) ->
TopicSubscription
subscription :: Text -> (msg -> Task Text ()) -> TopicSubscription
subscription Text
topic msg -> Task Text ()
callback =
TopicSubscription :: Topic -> MessageCallback -> OffsetSource -> TopicSubscription
TopicSubscription
{ topic :: Topic
topic = Text -> Topic
Kafka.Topic Text
topic,
onMessage :: MessageCallback
onMessage =
(ConsumerRecord () () -> msg -> Task Text SeekCmd)
-> MessageCallback
forall e msg.
(Show e, ToJSON msg, FromJSON msg) =>
(ConsumerRecord () () -> msg -> Task e SeekCmd) -> MessageCallback
Partition.MessageCallback
( \ConsumerRecord () ()
_ msg
msg -> do
msg -> Task Text ()
callback msg
msg
SeekCmd -> Task Text SeekCmd
forall a x. a -> Task x a
Task.succeed SeekCmd
Partition.NoSeek
),
offsetSource :: OffsetSource
offsetSource = OffsetSource
InKafka
}
subscriptionManageOwnOffsets ::
(Aeson.FromJSON msg, Aeson.ToJSON msg) =>
Text ->
([Int] -> Task Text (List PartitionOffset)) ->
(PartitionOffset -> msg -> Task Text Partition.SeekCmd) ->
TopicSubscription
subscriptionManageOwnOffsets :: Text
-> ([Int] -> Task Text (List PartitionOffset))
-> (PartitionOffset -> msg -> Task Text SeekCmd)
-> TopicSubscription
subscriptionManageOwnOffsets Text
topic [Int] -> Task Text (List PartitionOffset)
fetchOffsets PartitionOffset -> msg -> Task Text SeekCmd
callback =
TopicSubscription :: Topic -> MessageCallback -> OffsetSource -> TopicSubscription
TopicSubscription
{ topic :: Topic
topic = Text -> Topic
Kafka.Topic Text
topic,
onMessage :: MessageCallback
onMessage =
(ConsumerRecord () () -> msg -> Task Text SeekCmd)
-> MessageCallback
forall e msg.
(Show e, ToJSON msg, FromJSON msg) =>
(ConsumerRecord () () -> msg -> Task e SeekCmd) -> MessageCallback
Partition.MessageCallback
( \ConsumerRecord () ()
record msg
msg -> do
let offsetParams :: PartitionOffset
offsetParams =
PartitionOffset :: Int -> Int -> PartitionOffset
PartitionOffset
{ partitionId :: Int
partitionId =
ConsumerRecord () () -> PartitionId
forall k v. ConsumerRecord k v -> PartitionId
Consumer.crPartition ConsumerRecord () ()
record
PartitionId -> (PartitionId -> Int) -> Int
forall a b. a -> (a -> b) -> b
|> PartitionId -> Int
partitionIdToInt,
offset :: Int
offset = Offset -> Int
Consumer.unOffset (ConsumerRecord () () -> Offset
forall k v. ConsumerRecord k v -> Offset
Consumer.crOffset ConsumerRecord () ()
record)
}
PartitionOffset -> msg -> Task Text SeekCmd
callback PartitionOffset
offsetParams msg
msg
),
offsetSource :: OffsetSource
offsetSource =
([PartitionKey] -> Task Text [(PartitionKey, Int)]) -> OffsetSource
Elsewhere
( \[PartitionKey]
partitionKeys -> do
let partitionIds :: [Int]
partitionIds =
[PartitionKey]
partitionKeys
[PartitionKey] -> ([PartitionKey] -> [Int]) -> [Int]
forall a b. a -> (a -> b) -> b
|> (PartitionKey -> Int) -> [PartitionKey] -> [Int]
forall a b. (a -> b) -> List a -> List b
List.map (PartitionId -> Int
partitionIdToInt (PartitionId -> Int)
-> (PartitionKey -> PartitionId) -> PartitionKey -> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
<< PartitionKey -> PartitionId
forall a b. (a, b) -> b
Tuple.second)
List PartitionOffset
offsets <- [Int] -> Task Text (List PartitionOffset)
fetchOffsets [Int]
partitionIds
List PartitionOffset
offsets
List PartitionOffset
-> (List PartitionOffset -> [(PartitionKey, Int)])
-> [(PartitionKey, Int)]
forall a b. a -> (a -> b) -> b
|> (PartitionOffset -> (PartitionKey, Int))
-> List PartitionOffset -> [(PartitionKey, Int)]
forall a b. (a -> b) -> List a -> List b
List.map PartitionOffset -> (PartitionKey, Int)
toPartitionKey
[(PartitionKey, Int)]
-> ([(PartitionKey, Int)] -> Task Text [(PartitionKey, Int)])
-> Task Text [(PartitionKey, Int)]
forall a b. a -> (a -> b) -> b
|> [(PartitionKey, Int)] -> Task Text [(PartitionKey, Int)]
forall a x. a -> Task x a
Task.succeed
)
}
where
toPartitionKey :: PartitionOffset -> (PartitionKey, Int)
toPartitionKey :: PartitionOffset -> (PartitionKey, Int)
toPartitionKey (PartitionOffset {Int
partitionId :: Int
partitionId :: PartitionOffset -> Int
partitionId, Int
offset :: Int
offset :: PartitionOffset -> Int
offset}) =
( ( Text -> TopicName
Consumer.TopicName Text
topic,
Int -> PartitionId
Consumer.PartitionId (Int -> Int
forall a b. (Integral a, Num b) => a -> b
Prelude.fromIntegral Int
partitionId)
),
Int
offset
)
partitionIdToInt :: Consumer.PartitionId -> Int
partitionIdToInt :: PartitionId -> Int
partitionIdToInt (Consumer.PartitionId Int
int) = Int -> Int
forall a b. (Integral a, Num b) => a -> b
Prelude.fromIntegral Int
int
data OffsetSource where
InKafka :: OffsetSource
Elsewhere ::
([PartitionKey] -> Task Text [(PartitionKey, Int)]) ->
OffsetSource
process :: Settings.Settings -> Text -> TopicSubscription -> Prelude.IO ()
process :: Settings -> Text -> TopicSubscription -> IO ()
process Settings
settings Text
groupIdText TopicSubscription
topicSubscriptions = do
Settings -> ConsumerGroupId -> TopicSubscription -> IO ()
processWithoutShutdownEnsurance Settings
settings (Text -> ConsumerGroupId
Consumer.ConsumerGroupId Text
groupIdText) TopicSubscription
topicSubscriptions
Async ()
_ <-
IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
Async.async (IO () -> IO (Async ())) -> IO () -> IO (Async ())
forall a b. (a -> b) -> a -> b
<| do
Int -> IO ()
Control.Concurrent.threadDelay Int
5_000_000
String -> IO ()
Prelude.putStrLn String
"Something is holding up shutdown. Going to die ungracefully now."
ExitCode -> IO ()
System.Posix.Process.exitImmediately (Int -> ExitCode
System.Exit.ExitFailure Int
1)
() -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
Prelude.pure ()
processWithoutShutdownEnsurance :: Settings.Settings -> Consumer.ConsumerGroupId -> TopicSubscription -> Prelude.IO ()
processWithoutShutdownEnsurance :: Settings -> ConsumerGroupId -> TopicSubscription -> IO ()
processWithoutShutdownEnsurance Settings
settings ConsumerGroupId
groupId TopicSubscription
topicSubscriptions = do
let TopicSubscription {MessageCallback
onMessage :: MessageCallback
onMessage :: TopicSubscription -> MessageCallback
onMessage, Topic
topic :: Topic
topic :: TopicSubscription -> Topic
topic, OffsetSource
offsetSource :: OffsetSource
offsetSource :: TopicSubscription -> OffsetSource
offsetSource} = TopicSubscription
topicSubscriptions
State
state <- IO State
initState
IO () -> IO ()
onQuitSignal (Stopping -> Text -> IO ()
Stopping.stopTakingRequests (State -> Stopping
stopping State
state) Text
"Received stop signal")
Acquire Handler -> (Handler -> IO ()) -> IO ()
forall (m :: * -> *) a b.
MonadUnliftIO m =>
Acquire a -> (a -> m b) -> m b
Conduit.withAcquire (Settings -> Acquire Handler
Observability.handler (Settings -> Settings
Settings.observability Settings
settings)) ((Handler -> IO ()) -> IO ()) -> (Handler -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
<| \Handler
observabilityHandler -> do
IO KafkaConsumer
-> (Maybe SomeException -> KafkaConsumer -> IO ())
-> (KafkaConsumer -> IO ())
-> IO ()
forall (m :: * -> *) a b c.
MonadMask m =>
m a -> (Maybe SomeException -> a -> m b) -> (a -> m c) -> m c
Exception.bracketWithError
(Settings
-> ConsumerGroupId
-> Handler
-> OffsetSource
-> MessageCallback
-> Topic
-> State
-> IO KafkaConsumer
createConsumer Settings
settings ConsumerGroupId
groupId Handler
observabilityHandler OffsetSource
offsetSource MessageCallback
onMessage Topic
topic State
state)
(Handler
-> RebalanceInfo
-> Stopping
-> Maybe SomeException
-> KafkaConsumer
-> IO ()
cleanUp Handler
observabilityHandler (State -> RebalanceInfo
rebalanceInfo State
state) (State -> Stopping
stopping State
state))
(Settings -> State -> KafkaConsumer -> IO ()
runThreads Settings
settings State
state)
initState :: Prelude.IO State
initState :: IO State
initState = do
Stopping
stopping <- IO Stopping
Stopping.init
AllPartitions
partitions <- Dict PartitionKey Partition -> IO AllPartitions
forall a. a -> IO (TVar a)
TVar.newTVarIO Dict PartitionKey Partition
forall k v. Dict k v
Dict.empty
Analytics
analytics <- IO Int -> IO Analytics
Analytics.init ((Dict PartitionKey Partition -> Int)
-> IO (Dict PartitionKey Partition) -> IO Int
forall (m :: * -> *) a value.
Functor m =>
(a -> value) -> m a -> m value
map Dict PartitionKey Partition -> Int
forall k v. Dict k v -> Int
Dict.size (AllPartitions -> IO (Dict PartitionKey Partition)
forall a. TVar a -> IO a
TVar.readTVarIO AllPartitions
partitions))
RebalanceInfo
rebalanceInfo <- Dict PartitionKey (Rebalance, Float) -> IO RebalanceInfo
forall a. a -> IO (TVar a)
TVar.newTVarIO Dict PartitionKey (Rebalance, Float)
forall k v. Dict k v
Dict.empty
State -> IO State
forall (f :: * -> *) a. Applicative f => a -> f a
Prelude.pure
State :: AllPartitions -> Stopping -> Analytics -> RebalanceInfo -> State
State
{ AllPartitions
partitions :: AllPartitions
partitions :: AllPartitions
partitions,
Stopping
stopping :: Stopping
stopping :: Stopping
stopping,
Analytics
analytics :: Analytics
analytics :: Analytics
analytics,
RebalanceInfo
rebalanceInfo :: RebalanceInfo
rebalanceInfo :: RebalanceInfo
rebalanceInfo
}
createConsumer ::
Settings.Settings ->
Consumer.ConsumerGroupId ->
Observability.Handler ->
OffsetSource ->
Partition.MessageCallback ->
Kafka.Topic ->
State ->
Prelude.IO Consumer.KafkaConsumer
createConsumer :: Settings
-> ConsumerGroupId
-> Handler
-> OffsetSource
-> MessageCallback
-> Topic
-> State
-> IO KafkaConsumer
createConsumer
Settings.Settings
{ [BrokerAddress]
brokerAddresses :: Settings -> [BrokerAddress]
brokerAddresses :: [BrokerAddress]
Settings.brokerAddresses,
KafkaLogLevel
logLevel :: Settings -> KafkaLogLevel
logLevel :: KafkaLogLevel
Settings.logLevel,
MaxPollIntervalMs
maxPollIntervalMs :: Settings -> MaxPollIntervalMs
maxPollIntervalMs :: MaxPollIntervalMs
Settings.maxPollIntervalMs,
SkipOrNot
onProcessMessageSkip :: Settings -> SkipOrNot
onProcessMessageSkip :: SkipOrNot
Settings.onProcessMessageSkip
}
ConsumerGroupId
groupId
Handler
observability
OffsetSource
offsetSource
MessageCallback
callback
Topic
topic
State
state = do
let rebalance :: KafkaConsumer -> RebalanceEvent -> IO ()
rebalance =
SkipOrNot
-> Handler
-> MessageCallback
-> OffsetSource
-> State
-> KafkaConsumer
-> RebalanceEvent
-> IO ()
rebalanceCallback
SkipOrNot
onProcessMessageSkip
Handler
observability
MessageCallback
callback
OffsetSource
offsetSource
State
state
let properties :: ConsumerProperties
properties =
[BrokerAddress] -> ConsumerProperties
Consumer.brokersList
[BrokerAddress]
brokerAddresses
ConsumerProperties -> ConsumerProperties -> ConsumerProperties
forall appendable.
Semigroup appendable =>
appendable -> appendable -> appendable
++ ConsumerGroupId -> ConsumerProperties
Consumer.groupId ConsumerGroupId
groupId
ConsumerProperties -> ConsumerProperties -> ConsumerProperties
forall appendable.
Semigroup appendable =>
appendable -> appendable -> appendable
++ ConsumerProperties
Consumer.noAutoCommit
ConsumerProperties -> ConsumerProperties -> ConsumerProperties
forall appendable.
Semigroup appendable =>
appendable -> appendable -> appendable
++ KafkaLogLevel -> ConsumerProperties
Consumer.logLevel KafkaLogLevel
logLevel
ConsumerProperties -> ConsumerProperties -> ConsumerProperties
forall appendable.
Semigroup appendable =>
appendable -> appendable -> appendable
++ Callback -> ConsumerProperties
Consumer.setCallback ((KafkaConsumer -> RebalanceEvent -> IO ()) -> Callback
Consumer.rebalanceCallback KafkaConsumer -> RebalanceEvent -> IO ()
rebalance)
ConsumerProperties -> ConsumerProperties -> ConsumerProperties
forall appendable.
Semigroup appendable =>
appendable -> appendable -> appendable
++ KafkaCompressionCodec -> ConsumerProperties
Consumer.compression KafkaCompressionCodec
Consumer.Snappy
ConsumerProperties -> ConsumerProperties -> ConsumerProperties
forall appendable.
Semigroup appendable =>
appendable -> appendable -> appendable
++ Map Text Text -> ConsumerProperties
Consumer.extraProps
( List (Text, Text) -> Map Text Text
forall comparable v.
Ord comparable =>
List (comparable, v) -> Dict comparable v
Dict.fromList
[(Text
"max.poll.interval.ms", Int -> Text
Text.fromInt (MaxPollIntervalMs -> Int
Settings.unMaxPollIntervalMs MaxPollIntervalMs
maxPollIntervalMs))]
)
let subscription' :: Subscription
subscription' =
[TopicName] -> Subscription
Consumer.topics [Text -> TopicName
Consumer.TopicName (Topic -> Text
Kafka.unTopic Topic
topic)]
Subscription -> Subscription -> Subscription
forall appendable.
Semigroup appendable =>
appendable -> appendable -> appendable
++ OffsetReset -> Subscription
Consumer.offsetReset OffsetReset
Consumer.Earliest
Either KafkaError KafkaConsumer
eitherConsumer <- ConsumerProperties
-> Subscription -> IO (Either KafkaError KafkaConsumer)
forall (m :: * -> *).
MonadIO m =>
ConsumerProperties
-> Subscription -> m (Either KafkaError KafkaConsumer)
Consumer.newConsumer ConsumerProperties
properties Subscription
subscription'
case Either KafkaError KafkaConsumer
eitherConsumer of
Prelude.Left KafkaError
err ->
KafkaError -> IO KafkaConsumer
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
Exception.throwIO KafkaError
err
Prelude.Right KafkaConsumer
consumer ->
KafkaConsumer -> IO KafkaConsumer
forall (f :: * -> *) a. Applicative f => a -> f a
Prelude.pure KafkaConsumer
consumer
rebalanceCallback ::
Settings.SkipOrNot ->
Observability.Handler ->
Partition.MessageCallback ->
OffsetSource ->
State ->
Consumer.KafkaConsumer ->
Consumer.RebalanceEvent ->
Prelude.IO ()
rebalanceCallback :: SkipOrNot
-> Handler
-> MessageCallback
-> OffsetSource
-> State
-> KafkaConsumer
-> RebalanceEvent
-> IO ()
rebalanceCallback SkipOrNot
skipOrNot Handler
observability MessageCallback
callback OffsetSource
offsetSource State
state KafkaConsumer
consumer RebalanceEvent
rebalanceEvent = do
Float
now <- IO Float
GHC.Clock.getMonotonicTime
Float -> Analytics -> IO ()
Analytics.updateTimeOfLastRebalance Float
now (State -> Analytics
analytics State
state)
case RebalanceEvent
rebalanceEvent of
Consumer.RebalanceBeforeAssign [PartitionKey]
newPartitions -> do
List (PartitionKey, CommitOffsets)
keysWithOffsets <-
case OffsetSource
offsetSource of
OffsetSource
InKafka ->
List (PartitionKey, CommitOffsets)
-> IO (List (PartitionKey, CommitOffsets))
forall (f :: * -> *) a. Applicative f => a -> f a
Prelude.pure (List (PartitionKey, CommitOffsets)
-> IO (List (PartitionKey, CommitOffsets)))
-> List (PartitionKey, CommitOffsets)
-> IO (List (PartitionKey, CommitOffsets))
forall a b. (a -> b) -> a -> b
<| (PartitionKey -> (PartitionKey, CommitOffsets))
-> [PartitionKey] -> List (PartitionKey, CommitOffsets)
forall a b. (a -> b) -> List a -> List b
List.map (\PartitionKey
partitionKey -> (PartitionKey
partitionKey, CommitOffsets
Partition.ToKafka)) [PartitionKey]
newPartitions
Elsewhere [PartitionKey] -> Task Text [(PartitionKey, Int)]
fetch -> do
LogHandler
log <- IO LogHandler
Platform.silentHandler
Result Text [(PartitionKey, Int)]
fetchResult <- LogHandler
-> Task Text [(PartitionKey, Int)]
-> IO (Result Text [(PartitionKey, Int)])
forall x a. LogHandler -> Task x a -> IO (Result x a)
Task.attempt LogHandler
log ([PartitionKey] -> Task Text [(PartitionKey, Int)]
fetch [PartitionKey]
newPartitions)
case Result Text [(PartitionKey, Int)]
fetchResult of
Err Text
err -> String -> IO (List (PartitionKey, CommitOffsets))
forall (m :: * -> *) a.
(MonadThrow m, HasCallStack) =>
String -> m a
Exception.throwString (Text -> String
Text.toList Text
err)
Ok [(PartitionKey, Int)]
fetched -> do
let storedOffsets :: Dict PartitionKey CommitOffsets
storedOffsets =
((PartitionKey, Int) -> (PartitionKey, CommitOffsets))
-> [(PartitionKey, Int)] -> List (PartitionKey, CommitOffsets)
forall a b. (a -> b) -> List a -> List b
List.map ((Int -> CommitOffsets)
-> (PartitionKey, Int) -> (PartitionKey, CommitOffsets)
forall b y a. (b -> y) -> (a, b) -> (a, y)
Tuple.mapSecond Int -> CommitOffsets
Partition.Elsewhere) [(PartitionKey, Int)]
fetched
List (PartitionKey, CommitOffsets)
-> (List (PartitionKey, CommitOffsets)
-> Dict PartitionKey CommitOffsets)
-> Dict PartitionKey CommitOffsets
forall a b. a -> (a -> b) -> b
|> List (PartitionKey, CommitOffsets)
-> Dict PartitionKey CommitOffsets
forall comparable v.
Ord comparable =>
List (comparable, v) -> Dict comparable v
Dict.fromList
let storedKeys :: Set PartitionKey
storedKeys =
[(PartitionKey, Int)]
fetched
[(PartitionKey, Int)]
-> ([(PartitionKey, Int)] -> [PartitionKey]) -> [PartitionKey]
forall a b. a -> (a -> b) -> b
|> ((PartitionKey, Int) -> PartitionKey)
-> [(PartitionKey, Int)] -> [PartitionKey]
forall a b. (a -> b) -> List a -> List b
List.map (PartitionKey, Int) -> PartitionKey
forall a b. (a, b) -> a
Tuple.first
[PartitionKey]
-> ([PartitionKey] -> Set PartitionKey) -> Set PartitionKey
forall a b. a -> (a -> b) -> b
|> [PartitionKey] -> Set PartitionKey
forall comparable.
Ord comparable =>
List comparable -> Set comparable
Set.fromList
let missingPartitions :: Set PartitionKey
missingPartitions = Set PartitionKey -> Set PartitionKey -> Set PartitionKey
forall comparable.
Ord comparable =>
Set comparable -> Set comparable -> Set comparable
Set.diff ([PartitionKey] -> Set PartitionKey
forall comparable.
Ord comparable =>
List comparable -> Set comparable
Set.fromList [PartitionKey]
newPartitions) Set PartitionKey
storedKeys
Either KafkaError (List WatermarkOffsets)
waterMarkInfo :: Prelude.Either Consumer.KafkaError (List Kafka.Metadata.WatermarkOffsets) <-
Set PartitionKey
missingPartitions
Set PartitionKey
-> (Set PartitionKey -> [PartitionKey]) -> [PartitionKey]
forall a b. a -> (a -> b) -> b
|> Set PartitionKey -> [PartitionKey]
forall a. Set a -> List a
Set.toList
[PartitionKey]
-> ([PartitionKey] -> IO [Either KafkaError WatermarkOffsets])
-> IO [Either KafkaError WatermarkOffsets]
forall a b. a -> (a -> b) -> b
|> (PartitionKey -> IO (Either KafkaError WatermarkOffsets))
-> [PartitionKey] -> IO [Either KafkaError WatermarkOffsets]
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
Prelude.traverse
( \(TopicName
topicName, PartitionId
partitionId) ->
KafkaConsumer
-> Timeout
-> TopicName
-> PartitionId
-> IO (Either KafkaError WatermarkOffsets)
forall (m :: * -> *) k.
(MonadIO m, HasKafka k) =>
k
-> Timeout
-> TopicName
-> PartitionId
-> m (Either KafkaError WatermarkOffsets)
Kafka.Metadata.partitionWatermarkOffsets
KafkaConsumer
consumer
(Int -> Timeout
Consumer.Timeout Int
5000 )
TopicName
topicName
PartitionId
partitionId
)
IO [Either KafkaError WatermarkOffsets]
-> (IO [Either KafkaError WatermarkOffsets]
-> IO (Either KafkaError (List WatermarkOffsets)))
-> IO (Either KafkaError (List WatermarkOffsets))
forall a b. a -> (a -> b) -> b
|> ([Either KafkaError WatermarkOffsets]
-> Either KafkaError (List WatermarkOffsets))
-> IO [Either KafkaError WatermarkOffsets]
-> IO (Either KafkaError (List WatermarkOffsets))
forall (m :: * -> *) a value.
Functor m =>
(a -> value) -> m a -> m value
map [Either KafkaError WatermarkOffsets]
-> Either KafkaError (List WatermarkOffsets)
forall (t :: * -> *) (m :: * -> *) a.
(Traversable t, Monad m) =>
t (m a) -> m (t a)
Prelude.sequence
case Either KafkaError (List WatermarkOffsets)
waterMarkInfo of
Prelude.Left KafkaError
err -> KafkaError -> IO (List (PartitionKey, CommitOffsets))
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
Exception.throwIO KafkaError
err
Prelude.Right List WatermarkOffsets
waterMarks ->
let fallbackOffsets :: Dict PartitionKey CommitOffsets
fallbackOffsets =
List WatermarkOffsets
waterMarks
List WatermarkOffsets
-> (List WatermarkOffsets -> List (PartitionKey, CommitOffsets))
-> List (PartitionKey, CommitOffsets)
forall a b. a -> (a -> b) -> b
|> (WatermarkOffsets -> (PartitionKey, CommitOffsets))
-> List WatermarkOffsets -> List (PartitionKey, CommitOffsets)
forall a b. (a -> b) -> List a -> List b
List.map
( \Kafka.Metadata.WatermarkOffsets {TopicName
woTopicName :: WatermarkOffsets -> TopicName
woTopicName :: TopicName
Kafka.Metadata.woTopicName, PartitionId
woPartitionId :: WatermarkOffsets -> PartitionId
woPartitionId :: PartitionId
Kafka.Metadata.woPartitionId, Offset
woHighWatermark :: WatermarkOffsets -> Offset
woHighWatermark :: Offset
Kafka.Metadata.woHighWatermark} ->
((TopicName
woTopicName, PartitionId
woPartitionId), Int -> CommitOffsets
Partition.Elsewhere (Offset -> Int
Consumer.unOffset Offset
woHighWatermark))
)
List (PartitionKey, CommitOffsets)
-> (List (PartitionKey, CommitOffsets)
-> Dict PartitionKey CommitOffsets)
-> Dict PartitionKey CommitOffsets
forall a b. a -> (a -> b) -> b
|> List (PartitionKey, CommitOffsets)
-> Dict PartitionKey CommitOffsets
forall comparable v.
Ord comparable =>
List (comparable, v) -> Dict comparable v
Dict.fromList
in Dict PartitionKey CommitOffsets
-> Dict PartitionKey CommitOffsets
-> Dict PartitionKey CommitOffsets
forall comparable v.
Ord comparable =>
Dict comparable v -> Dict comparable v -> Dict comparable v
Dict.union Dict PartitionKey CommitOffsets
storedOffsets Dict PartitionKey CommitOffsets
fallbackOffsets
Dict PartitionKey CommitOffsets
-> (Dict PartitionKey CommitOffsets
-> List (PartitionKey, CommitOffsets))
-> List (PartitionKey, CommitOffsets)
forall a b. a -> (a -> b) -> b
|> Dict PartitionKey CommitOffsets
-> List (PartitionKey, CommitOffsets)
forall k v. Dict k v -> List (k, v)
Dict.toList
List (PartitionKey, CommitOffsets)
-> (List (PartitionKey, CommitOffsets)
-> IO (List (PartitionKey, CommitOffsets)))
-> IO (List (PartitionKey, CommitOffsets))
forall a b. a -> (a -> b) -> b
|> List (PartitionKey, CommitOffsets)
-> IO (List (PartitionKey, CommitOffsets))
forall (f :: * -> *) a. Applicative f => a -> f a
Prelude.pure
List (PartitionKey, CommitOffsets)
keysWithOffsets
List (PartitionKey, CommitOffsets)
-> (List (PartitionKey, CommitOffsets) -> IO [()]) -> IO [()]
forall a b. a -> (a -> b) -> b
|> ((PartitionKey, CommitOffsets) -> IO ())
-> List (PartitionKey, CommitOffsets) -> IO [()]
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
Prelude.traverse
( \(PartitionKey
partitionKey, CommitOffsets
offset) -> do
SkipOrNot
-> CommitOffsets
-> Handler
-> KafkaConsumer
-> MessageCallback
-> State
-> PartitionKey
-> IO ()
initPartition
SkipOrNot
skipOrNot
CommitOffsets
offset
Handler
observability
KafkaConsumer
consumer
MessageCallback
callback
State
state
PartitionKey
partitionKey
STM () -> IO ()
forall a. STM a -> IO a
STM.atomically
(STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
<| RebalanceInfo
-> (Dict PartitionKey (Rebalance, Float)
-> Dict PartitionKey (Rebalance, Float))
-> STM ()
forall a. TVar a -> (a -> a) -> STM ()
TVar.modifyTVar' (State -> RebalanceInfo
rebalanceInfo State
state) (PartitionKey
-> (Rebalance, Float)
-> Dict PartitionKey (Rebalance, Float)
-> Dict PartitionKey (Rebalance, Float)
forall comparable v.
Ord comparable =>
comparable -> v -> Dict comparable v -> Dict comparable v
Dict.insert PartitionKey
partitionKey (Rebalance
Assign, Float
now))
)
IO [()] -> (IO [()] -> IO ()) -> IO ()
forall a b. a -> (a -> b) -> b
|> ([()] -> ()) -> IO [()] -> IO ()
forall (m :: * -> *) a value.
Functor m =>
(a -> value) -> m a -> m value
map (\[()]
_ -> ())
Consumer.RebalanceAssign [PartitionKey]
_ -> () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
Prelude.pure ()
Consumer.RebalanceBeforeRevoke [PartitionKey]
revokedPartitions -> do
[()]
_ <-
[PartitionKey]
revokedPartitions
[PartitionKey] -> ([PartitionKey] -> IO [()]) -> IO [()]
forall a b. a -> (a -> b) -> b
|> (PartitionKey -> IO ()) -> [PartitionKey] -> IO [()]
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
Prelude.traverse
( \PartitionKey
partitionKey -> do
STM () -> IO ()
forall a. STM a -> IO a
STM.atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
<| do
Dict PartitionKey Partition
partitions <- AllPartitions -> STM (Dict PartitionKey Partition)
forall a. TVar a -> STM a
TVar.readTVar (State -> AllPartitions
partitions State
state)
case PartitionKey -> Dict PartitionKey Partition -> Maybe Partition
forall comparable v.
Ord comparable =>
comparable -> Dict comparable v -> Maybe v
Dict.get PartitionKey
partitionKey Dict PartitionKey Partition
partitions of
Maybe Partition
Nothing -> () -> STM ()
forall (f :: * -> *) a. Applicative f => a -> f a
Prelude.pure ()
Just Partition
partition -> Partition -> STM ()
Partition.revoke Partition
partition
STM () -> IO ()
forall a. STM a -> IO a
STM.atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
<| RebalanceInfo
-> (Dict PartitionKey (Rebalance, Float)
-> Dict PartitionKey (Rebalance, Float))
-> STM ()
forall a. TVar a -> (a -> a) -> STM ()
TVar.modifyTVar' (State -> RebalanceInfo
rebalanceInfo State
state) (PartitionKey
-> (Rebalance, Float)
-> Dict PartitionKey (Rebalance, Float)
-> Dict PartitionKey (Rebalance, Float)
forall comparable v.
Ord comparable =>
comparable -> v -> Dict comparable v -> Dict comparable v
Dict.insert PartitionKey
partitionKey (Rebalance
Revoking, Float
now))
)
[()]
_ <-
[PartitionKey]
revokedPartitions
[PartitionKey] -> ([PartitionKey] -> IO [()]) -> IO [()]
forall a b. a -> (a -> b) -> b
|> (PartitionKey -> IO ()) -> [PartitionKey] -> IO [()]
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
Prelude.traverse
( \PartitionKey
partitionKey -> do
STM () -> IO ()
forall a. STM a -> IO a
STM.atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
<| do
Dict PartitionKey Partition
partitions <- AllPartitions -> STM (Dict PartitionKey Partition)
forall a. TVar a -> STM a
TVar.readTVar (State -> AllPartitions
partitions State
state)
case PartitionKey -> Dict PartitionKey Partition -> Maybe Partition
forall comparable v.
Ord comparable =>
comparable -> Dict comparable v -> Maybe v
Dict.get PartitionKey
partitionKey Dict PartitionKey Partition
partitions of
Maybe Partition
Nothing -> () -> STM ()
forall (f :: * -> *) a. Applicative f => a -> f a
Prelude.pure ()
Just Partition
_ -> do
STM ()
forall a. STM a
STM.retry
STM () -> IO ()
forall a. STM a -> IO a
STM.atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
<| RebalanceInfo
-> (Dict PartitionKey (Rebalance, Float)
-> Dict PartitionKey (Rebalance, Float))
-> STM ()
forall a. TVar a -> (a -> a) -> STM ()
TVar.modifyTVar' (State -> RebalanceInfo
rebalanceInfo State
state) (PartitionKey
-> (Rebalance, Float)
-> Dict PartitionKey (Rebalance, Float)
-> Dict PartitionKey (Rebalance, Float)
forall comparable v.
Ord comparable =>
comparable -> v -> Dict comparable v -> Dict comparable v
Dict.insert PartitionKey
partitionKey (Rebalance
Revoked, Float
now))
)
() -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
Prelude.pure ()
Consumer.RebalanceRevoke [PartitionKey]
_revokedPartitions -> do
() -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
Prelude.pure ()
cleanUp :: Observability.Handler -> RebalanceInfo -> Stopping.Stopping -> Maybe Exception.SomeException -> Consumer.KafkaConsumer -> Prelude.IO ()
cleanUp :: Handler
-> RebalanceInfo
-> Stopping
-> Maybe SomeException
-> KafkaConsumer
-> IO ()
cleanUp Handler
observabilityHandler RebalanceInfo
rebalanceInfo Stopping
stopping Maybe SomeException
maybeException KafkaConsumer
consumer = do
String -> IO ()
Prelude.putStrLn String
"Cleaning up"
Maybe KafkaError
_ <- KafkaConsumer -> IO (Maybe KafkaError)
forall (m :: * -> *).
MonadIO m =>
KafkaConsumer -> m (Maybe KafkaError)
Consumer.closeConsumer KafkaConsumer
consumer
Maybe Text
maybeStopReason <- Stopping -> IO (Maybe Text)
Stopping.stopReason Stopping
stopping
Stopping -> Text -> IO ()
Stopping.stopTakingRequests Stopping
stopping Text
"Shutting down"
Text
requestId <- (UUID -> Text) -> IO UUID -> IO Text
forall (m :: * -> *) a value.
Functor m =>
(a -> value) -> m a -> m value
map UUID -> Text
Data.UUID.toText IO UUID
Data.UUID.V4.nextRandom
Text
-> (TracingSpan -> IO ()) -> Text -> (LogHandler -> IO ()) -> IO ()
forall a.
HasCallStack =>
Text
-> (TracingSpan -> IO ()) -> Text -> (LogHandler -> IO a) -> IO a
Platform.rootTracingSpanIO
Text
requestId
(Handler -> Text -> TracingSpan -> IO ()
Observability.report Handler
observabilityHandler Text
requestId)
Text
"Kafka consumer shutting down"
((LogHandler -> IO ()) -> IO ()) -> (LogHandler -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
<| \LogHandler
log -> do
case Maybe SomeException
maybeException of
Maybe SomeException
Nothing -> () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
Prelude.pure ()
Just SomeException
exception -> do
Dict PartitionKey (Rebalance, Float)
rebalanceInfo' <- RebalanceInfo -> IO (Dict PartitionKey (Rebalance, Float))
forall a. TVar a -> IO a
TVar.readTVarIO RebalanceInfo
rebalanceInfo
Text -> [Context] -> Task Never ()
forall e. HasCallStack => Text -> [Context] -> Task e ()
Log.error
Text
"Kafka consumer crashed"
[ Text -> Text -> Context
forall a. (Show a, ToJSON a) => Text -> a -> Context
Log.context Text
"triage" (Text
"The consumer should automatically restart. If we see lots of these in a short period of time we should try to figure out what's wrong" :: Text),
Text -> Text -> Context
forall a. (Show a, ToJSON a) => Text -> a -> Context
Log.context Text
"error" (SomeException -> Text
forall a. Show a => a -> Text
Debug.toString SomeException
exception),
Text -> Text -> Context
forall a. (Show a, ToJSON a) => Text -> a -> Context
Log.context Text
"rebalance info" (Dict PartitionKey (Rebalance, Float) -> Text
forall a. Show a => a -> Text
Debug.toString Dict PartitionKey (Rebalance, Float)
rebalanceInfo')
]
Task Never () -> (Task Never () -> IO ()) -> IO ()
forall a b. a -> (a -> b) -> b
|> LogHandler -> Task Never () -> IO ()
forall a. LogHandler -> Task Never a -> IO a
Task.perform LogHandler
log
Maybe SomeException -> IO ()
writeCrashLogOnError Maybe SomeException
maybeException
case (Maybe SomeException
maybeException, Maybe Text
maybeStopReason) of
(Just SomeException
exception, Maybe Text
_) -> String -> IO ()
Prelude.putStrLn (String
"Shut down because of exception: " String -> ShowS
forall appendable.
Semigroup appendable =>
appendable -> appendable -> appendable
++ SomeException -> String
forall e. Exception e => e -> String
Exception.displayException SomeException
exception)
(Maybe SomeException
_, Just Text
stopReason) -> String -> IO ()
Prelude.putStrLn (String
"Shut down because of: " String -> ShowS
forall appendable.
Semigroup appendable =>
appendable -> appendable -> appendable
++ Text -> String
Text.toList Text
stopReason)
(Maybe SomeException
Nothing, Maybe Text
Nothing) -> String -> IO ()
Prelude.putStrLn String
"Shut down for an unknown reason."
writeCrashLogOnError :: Maybe Exception.SomeException -> Prelude.IO ()
writeCrashLogOnError :: Maybe SomeException -> IO ()
writeCrashLogOnError Maybe SomeException
maybeException = do
Maybe String
crashLogPath <- String -> IO (Maybe String)
System.Environment.lookupEnv String
"CRASHLOG_PATH"
let crashLog :: String
crashLog =
case Maybe SomeException
maybeException of
Maybe SomeException
Nothing -> String
"System exited in response to signal"
Just SomeException
exception -> SomeException -> String
forall e. Exception e => e -> String
Exception.displayException SomeException
exception
case Maybe String
crashLogPath of
Maybe String
Nothing -> () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
Prelude.pure ()
Just String
"" -> () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
Prelude.pure ()
Just String
path -> String -> String -> IO ()
Prelude.writeFile String
path String
crashLog
initPartition ::
Settings.SkipOrNot ->
Partition.CommitOffsets ->
Observability.Handler ->
Consumer.KafkaConsumer ->
Partition.MessageCallback ->
State ->
PartitionKey ->
Prelude.IO ()
initPartition :: SkipOrNot
-> CommitOffsets
-> Handler
-> KafkaConsumer
-> MessageCallback
-> State
-> PartitionKey
-> IO ()
initPartition SkipOrNot
skipOrNot CommitOffsets
commitOffset Handler
observabilityHandler KafkaConsumer
consumer MessageCallback
callback State
state PartitionKey
key = do
SkipOrNot
-> CommitOffsets
-> Handler
-> Analytics
-> Stopping
-> KafkaConsumer
-> MessageCallback
-> OnStartup
-> OnCleanup
-> IO ()
Partition.spawnWorkerThread
SkipOrNot
skipOrNot
CommitOffsets
commitOffset
Handler
observabilityHandler
(State -> Analytics
analytics State
state)
(State -> Stopping
stopping State
state)
KafkaConsumer
consumer
MessageCallback
callback
( (Partition -> IO ()) -> OnStartup
Partition.OnStartup
( \Partition
partition ->
STM () -> IO ()
forall a. STM a -> IO a
STM.atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
<| do
Dict PartitionKey Partition
queues <- AllPartitions -> STM (Dict PartitionKey Partition)
forall a. TVar a -> STM a
TVar.readTVar (State -> AllPartitions
partitions State
state)
case PartitionKey -> Dict PartitionKey Partition -> Maybe Partition
forall comparable v.
Ord comparable =>
comparable -> Dict comparable v -> Maybe v
Dict.get PartitionKey
key Dict PartitionKey Partition
queues of
Just Partition
_ ->
RuntimeExceptions -> STM ()
forall e a. Exception e => e -> STM a
STM.throwSTM (PartitionKey -> RuntimeExceptions
AskedToInitPartitionThatAlreadyExists PartitionKey
key)
Maybe Partition
Nothing -> do
AllPartitions -> Dict PartitionKey Partition -> STM ()
forall a. TVar a -> a -> STM ()
TVar.writeTVar (State -> AllPartitions
partitions State
state) (PartitionKey
-> Partition
-> Dict PartitionKey Partition
-> Dict PartitionKey Partition
forall comparable v.
Ord comparable =>
comparable -> v -> Dict comparable v -> Dict comparable v
Dict.insert PartitionKey
key Partition
partition Dict PartitionKey Partition
queues)
)
)
( IO () -> OnCleanup
Partition.OnCleanup
( do
STM () -> IO ()
forall a. STM a -> IO a
STM.atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
<| AllPartitions
-> (Dict PartitionKey Partition -> Dict PartitionKey Partition)
-> STM ()
forall a. TVar a -> (a -> a) -> STM ()
TVar.modifyTVar' (State -> AllPartitions
partitions State
state) (PartitionKey
-> Dict PartitionKey Partition -> Dict PartitionKey Partition
forall comparable v.
Ord comparable =>
comparable -> Dict comparable v -> Dict comparable v
Dict.remove PartitionKey
key)
String -> IO ()
Prelude.putStrLn (String
"Stop processing messages for partition: " String -> ShowS
forall appendable.
Semigroup appendable =>
appendable -> appendable -> appendable
++ PartitionKey -> String
forall a. Show a => a -> String
Prelude.show PartitionKey
key)
)
)
runThreads ::
Settings.Settings ->
State ->
Consumer.KafkaConsumer ->
Prelude.IO ()
runThreads :: Settings -> State -> KafkaConsumer -> IO ()
runThreads Settings
settings State
state KafkaConsumer
consumer = do
Stopping -> () -> IO () -> IO ()
forall a. Stopping -> a -> IO a -> IO a
Stopping.runUnlessStopping
(State -> Stopping
stopping State
state)
()
( IO () -> IO () -> IO (Either () ())
forall a b. IO a -> IO b -> IO (Either a b)
Async.race
(MaxMsgsPerPartitionBufferedLocally
-> KafkaConsumer -> State -> Set PartitionKey -> IO ()
pauseAndAnalyticsLoop (Settings -> MaxMsgsPerPartitionBufferedLocally
Settings.maxMsgsPerPartitionBufferedLocally Settings
settings) KafkaConsumer
consumer State
state Set PartitionKey
forall a. Set a
Set.empty)
(Settings -> EnqueueRecord -> Analytics -> KafkaConsumer -> IO ()
Fetcher.pollingLoop Settings
settings (AllPartitions -> EnqueueRecord
enqueueRecord (State -> AllPartitions
partitions State
state)) (State -> Analytics
analytics State
state) KafkaConsumer
consumer)
IO (Either () ()) -> (IO (Either () ()) -> IO ()) -> IO ()
forall a b. a -> (a -> b) -> b
|> (Either () () -> ()) -> IO (Either () ()) -> IO ()
forall (m :: * -> *) a value.
Functor m =>
(a -> value) -> m a -> m value
map (\Either () ()
_ -> ())
)
data RuntimeExceptions
= AskedToInitPartitionThatAlreadyExists (Consumer.TopicName, Consumer.PartitionId)
deriving (Int -> RuntimeExceptions -> ShowS
[RuntimeExceptions] -> ShowS
RuntimeExceptions -> String
(Int -> RuntimeExceptions -> ShowS)
-> (RuntimeExceptions -> String)
-> ([RuntimeExceptions] -> ShowS)
-> Show RuntimeExceptions
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [RuntimeExceptions] -> ShowS
$cshowList :: [RuntimeExceptions] -> ShowS
show :: RuntimeExceptions -> String
$cshow :: RuntimeExceptions -> String
showsPrec :: Int -> RuntimeExceptions -> ShowS
$cshowsPrec :: Int -> RuntimeExceptions -> ShowS
Show)
instance Exception.Exception RuntimeExceptions
enqueueRecord ::
AllPartitions ->
Partition.ConsumerRecord ->
Prelude.IO Partition.SeekCmd
enqueueRecord :: AllPartitions -> EnqueueRecord
enqueueRecord AllPartitions
partitions ConsumerRecord
record =
STM SeekCmd -> IO SeekCmd
forall a. STM a -> IO a
STM.atomically (STM SeekCmd -> IO SeekCmd) -> STM SeekCmd -> IO SeekCmd
forall a b. (a -> b) -> a -> b
<| do
let key :: PartitionKey
key = (ConsumerRecord -> TopicName
forall k v. ConsumerRecord k v -> TopicName
Consumer.crTopic ConsumerRecord
record, ConsumerRecord -> PartitionId
forall k v. ConsumerRecord k v -> PartitionId
Consumer.crPartition ConsumerRecord
record)
Dict PartitionKey Partition
partitions' <- AllPartitions -> STM (Dict PartitionKey Partition)
forall a. TVar a -> STM a
TVar.readTVar AllPartitions
partitions
let maybePartition :: Maybe Partition
maybePartition = PartitionKey -> Dict PartitionKey Partition -> Maybe Partition
forall comparable v.
Ord comparable =>
comparable -> Dict comparable v -> Maybe v
Dict.get PartitionKey
key Dict PartitionKey Partition
partitions'
case Maybe Partition
maybePartition of
Maybe Partition
Nothing -> SeekCmd -> STM SeekCmd
forall (f :: * -> *) a. Applicative f => a -> f a
Prelude.pure SeekCmd
Partition.NoSeek
Just Partition
partition -> ConsumerRecord -> Partition -> STM SeekCmd
Partition.append ConsumerRecord
record Partition
partition
pauseAndAnalyticsLoop ::
Settings.MaxMsgsPerPartitionBufferedLocally ->
Consumer.KafkaConsumer ->
State ->
Set.Set PartitionKey ->
Prelude.IO ()
pauseAndAnalyticsLoop :: MaxMsgsPerPartitionBufferedLocally
-> KafkaConsumer -> State -> Set PartitionKey -> IO ()
pauseAndAnalyticsLoop MaxMsgsPerPartitionBufferedLocally
maxBufferSize KafkaConsumer
consumer State
state Set PartitionKey
pausedPartitions = do
Set PartitionKey
desiredPausedPartitions <- MaxMsgsPerPartitionBufferedLocally
-> AllPartitions -> IO (Set PartitionKey)
pausedPartitionKeys MaxMsgsPerPartitionBufferedLocally
maxBufferSize (State -> AllPartitions
partitions State
state)
Int -> Analytics -> IO ()
Analytics.updatePaused (Set PartitionKey -> Int
forall a. Set a -> Int
Set.size Set PartitionKey
desiredPausedPartitions) (State -> Analytics
analytics State
state)
let newlyPaused :: Set PartitionKey
newlyPaused = Set PartitionKey -> Set PartitionKey -> Set PartitionKey
forall comparable.
Ord comparable =>
Set comparable -> Set comparable -> Set comparable
Set.diff Set PartitionKey
desiredPausedPartitions Set PartitionKey
pausedPartitions
KafkaError
_ <- KafkaConsumer -> [PartitionKey] -> IO KafkaError
forall (m :: * -> *).
MonadIO m =>
KafkaConsumer -> [PartitionKey] -> m KafkaError
Consumer.pausePartitions KafkaConsumer
consumer (Set PartitionKey -> [PartitionKey]
forall a. Set a -> List a
Set.toList Set PartitionKey
newlyPaused)
let newlyResumed :: Set PartitionKey
newlyResumed = Set PartitionKey -> Set PartitionKey -> Set PartitionKey
forall comparable.
Ord comparable =>
Set comparable -> Set comparable -> Set comparable
Set.diff Set PartitionKey
pausedPartitions Set PartitionKey
desiredPausedPartitions
KafkaError
_ <- KafkaConsumer -> [PartitionKey] -> IO KafkaError
forall (m :: * -> *).
MonadIO m =>
KafkaConsumer -> [PartitionKey] -> m KafkaError
Consumer.resumePartitions KafkaConsumer
consumer (Set PartitionKey -> [PartitionKey]
forall a. Set a -> List a
Set.toList Set PartitionKey
newlyResumed)
Int -> IO ()
Control.Concurrent.threadDelay Int
1_000_000
MaxMsgsPerPartitionBufferedLocally
-> KafkaConsumer -> State -> Set PartitionKey -> IO ()
pauseAndAnalyticsLoop MaxMsgsPerPartitionBufferedLocally
maxBufferSize KafkaConsumer
consumer State
state Set PartitionKey
desiredPausedPartitions
pausedPartitionKeys :: Settings.MaxMsgsPerPartitionBufferedLocally -> AllPartitions -> Prelude.IO (Set.Set PartitionKey)
pausedPartitionKeys :: MaxMsgsPerPartitionBufferedLocally
-> AllPartitions -> IO (Set PartitionKey)
pausedPartitionKeys (Settings.MaxMsgsPerPartitionBufferedLocally Int
maxBufferSize) AllPartitions
partitions = do
Dict PartitionKey Partition
partitions' <- AllPartitions -> IO (Dict PartitionKey Partition)
forall a. TVar a -> IO a
TVar.readTVarIO AllPartitions
partitions
Dict PartitionKey Partition
partitions'
Dict PartitionKey Partition
-> (Dict PartitionKey Partition -> List (PartitionKey, Partition))
-> List (PartitionKey, Partition)
forall a b. a -> (a -> b) -> b
|> Dict PartitionKey Partition -> List (PartitionKey, Partition)
forall k v. Dict k v -> List (k, v)
Dict.toList
List (PartitionKey, Partition)
-> (List (PartitionKey, Partition) -> IO [Maybe PartitionKey])
-> IO [Maybe PartitionKey]
forall a b. a -> (a -> b) -> b
|> ((PartitionKey, Partition) -> IO (Maybe PartitionKey))
-> List (PartitionKey, Partition) -> IO [Maybe PartitionKey]
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
Prelude.traverse
( \(PartitionKey
key, Partition
partition) -> do
Maybe Int
maybeLen <- Partition -> IO (Maybe Int)
Partition.length Partition
partition
Maybe PartitionKey -> IO (Maybe PartitionKey)
forall (f :: * -> *) a. Applicative f => a -> f a
Prelude.pure
(Maybe PartitionKey -> IO (Maybe PartitionKey))
-> Maybe PartitionKey -> IO (Maybe PartitionKey)
forall a b. (a -> b) -> a -> b
<| case Maybe Int
maybeLen of
Maybe Int
Nothing -> Maybe PartitionKey
forall a. Maybe a
Nothing
Just Int
length ->
if Int
length Int -> Int -> Bool
forall comparable.
Ord comparable =>
comparable -> comparable -> Bool
> Int
maxBufferSize
then PartitionKey -> Maybe PartitionKey
forall a. a -> Maybe a
Just PartitionKey
key
else Maybe PartitionKey
forall a. Maybe a
Nothing
)
IO [Maybe PartitionKey]
-> (IO [Maybe PartitionKey] -> IO (Set PartitionKey))
-> IO (Set PartitionKey)
forall a b. a -> (a -> b) -> b
|> ([Maybe PartitionKey] -> Set PartitionKey)
-> IO [Maybe PartitionKey] -> IO (Set PartitionKey)
forall (m :: * -> *) a value.
Functor m =>
(a -> value) -> m a -> m value
map ((Maybe PartitionKey -> Maybe PartitionKey)
-> [Maybe PartitionKey] -> [PartitionKey]
forall a b. (a -> Maybe b) -> List a -> List b
List.filterMap Maybe PartitionKey -> Maybe PartitionKey
forall a. a -> a
identity ([Maybe PartitionKey] -> [PartitionKey])
-> ([PartitionKey] -> Set PartitionKey)
-> [Maybe PartitionKey]
-> Set PartitionKey
forall a b c. (a -> b) -> (b -> c) -> a -> c
>> [PartitionKey] -> Set PartitionKey
forall comparable.
Ord comparable =>
List comparable -> Set comparable
Set.fromList)
quitSignals :: [Signals.Signal]
quitSignals :: [Signal]
quitSignals =
[ Signal
Signals.sigINT,
Signal
Signals.sigQUIT,
Signal
Signals.sigTERM
]
onQuitSignal :: Prelude.IO () -> Prelude.IO ()
onQuitSignal :: IO () -> IO ()
onQuitSignal IO ()
release = do
let handleQuit :: Signal -> IO Handler
handleQuit Signal
signal =
Signal -> Handler -> Maybe SignalSet -> IO Handler
Signals.installHandler
Signal
signal
(IO () -> Handler
Signals.Catch IO ()
release)
Maybe SignalSet
forall a. Maybe a
Nothing
[Handler]
_ <- (Signal -> IO Handler) -> [Signal] -> IO [Handler]
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
Prelude.traverse Signal -> IO Handler
handleQuit [Signal]
quitSignals
() -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
Prelude.pure ()