{-# LANGUAGE GADTs #-}

module Kafka.Worker.Internal where

import qualified Conduit
import qualified Control.Concurrent
import qualified Control.Concurrent.Async as Async
import qualified Control.Concurrent.STM as STM
import qualified Control.Concurrent.STM.TVar as TVar
import qualified Control.Exception.Safe as Exception
import qualified Data.Aeson as Aeson
import qualified Data.UUID
import qualified Data.UUID.V4
import qualified Dict
import qualified GHC.Clock
import qualified Kafka.Consumer as Consumer
import qualified Kafka.Internal as Kafka
import qualified Kafka.Metadata
import qualified Kafka.Worker.Analytics as Analytics
import qualified Kafka.Worker.Fetcher as Fetcher
import qualified Kafka.Worker.Partition as Partition
import qualified Kafka.Worker.Settings as Settings
import qualified Kafka.Worker.Stopping as Stopping
import qualified Observability
import qualified Set
import qualified System.Environment
import qualified System.Exit
import qualified System.Posix.Process
import qualified System.Posix.Signals as Signals
import qualified Prelude

-- | Alias for a TopicName and PartitionId, something every message will have
type PartitionKey = (Consumer.TopicName, Consumer.PartitionId)

type AllPartitions = TVar.TVar (Dict.Dict PartitionKey Partition.Partition)

data Rebalance = Assign | Revoking | Revoked deriving (Int -> Rebalance -> ShowS
[Rebalance] -> ShowS
Rebalance -> String
(Int -> Rebalance -> ShowS)
-> (Rebalance -> String)
-> ([Rebalance] -> ShowS)
-> Show Rebalance
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [Rebalance] -> ShowS
$cshowList :: [Rebalance] -> ShowS
show :: Rebalance -> String
$cshow :: Rebalance -> String
showsPrec :: Int -> Rebalance -> ShowS
$cshowsPrec :: Int -> Rebalance -> ShowS
Show)

type RebalanceInfo = TVar.TVar (Dict.Dict PartitionKey (Rebalance, Float))

data State = State
  { State -> AllPartitions
partitions :: AllPartitions,
    -- | When we receive a shutdown signal this variable will change. Threads
    -- should stop processing new messages to allow the process to shut down.
    State -> Stopping
stopping :: Stopping.Stopping,
    State -> Analytics
analytics :: Analytics.Analytics,
    State -> RebalanceInfo
rebalanceInfo :: RebalanceInfo
  }

-- | The topics this worker should subscribe too. At the moment this library
-- only supports subscribing to a single topic.
data TopicSubscription = TopicSubscription
  { TopicSubscription -> Topic
topic :: Kafka.Topic,
    TopicSubscription -> MessageCallback
onMessage :: Partition.MessageCallback,
    TopicSubscription -> OffsetSource
offsetSource :: OffsetSource
  }

-- | Params needed to write / read offsets to another data store
data PartitionOffset = PartitionOffset
  { -- | The partition of a topic.
    PartitionOffset -> Int
partitionId :: Int,
    -- | The partition's offset.
    PartitionOffset -> Int
offset :: Int
  }

-- | Create a subscription for a topic.
--
-- > main :: IO ()
-- > main = do
-- >   settings <- Environment.decode decoder
-- >   let subscription =
-- >         subscription
-- >           "the-topic"
-- >           (\msg -> Debug.todo "Process your message here!")
-- >   process settings subscription
subscription ::
  (Aeson.FromJSON msg, Aeson.ToJSON msg) =>
  Text ->
  (msg -> Task Text ()) ->
  TopicSubscription
subscription :: Text -> (msg -> Task Text ()) -> TopicSubscription
subscription Text
topic msg -> Task Text ()
callback =
  TopicSubscription :: Topic -> MessageCallback -> OffsetSource -> TopicSubscription
TopicSubscription
    { topic :: Topic
topic = Text -> Topic
Kafka.Topic Text
topic,
      onMessage :: MessageCallback
onMessage =
        (ConsumerRecord () () -> msg -> Task Text SeekCmd)
-> MessageCallback
forall e msg.
(Show e, ToJSON msg, FromJSON msg) =>
(ConsumerRecord () () -> msg -> Task e SeekCmd) -> MessageCallback
Partition.MessageCallback
          ( \ConsumerRecord () ()
_ msg
msg -> do
              msg -> Task Text ()
callback msg
msg
              SeekCmd -> Task Text SeekCmd
forall a x. a -> Task x a
Task.succeed SeekCmd
Partition.NoSeek
          ),
      offsetSource :: OffsetSource
offsetSource = OffsetSource
InKafka
    }

-- | Create a subscription for a topic and manage offsets for that topic
-- yourself.
--
-- You'll need to tell Kafka where it can read starting offsets. When passed
-- a message you can also tell Kafka to seek to a different offset.
--
-- > main :: IO ()
-- > main = do
-- >   settings <- Environment.decode decoder
-- >   let subscription =
-- >         subscriptionManageOwnOffsets
-- >           "the-topic"
-- >           (\partitions ->
-- >              sql
-- >                "SELECT partition, offset FROM offsets WHERE partition = %"
-- >                [partitions] )
-- >           (\msg -> Debug.todo "Process your message here!")
-- >   process settings subscription
subscriptionManageOwnOffsets ::
  (Aeson.FromJSON msg, Aeson.ToJSON msg) =>
  Text ->
  ([Int] -> Task Text (List PartitionOffset)) ->
  (PartitionOffset -> msg -> Task Text Partition.SeekCmd) ->
  TopicSubscription
subscriptionManageOwnOffsets :: Text
-> ([Int] -> Task Text (List PartitionOffset))
-> (PartitionOffset -> msg -> Task Text SeekCmd)
-> TopicSubscription
subscriptionManageOwnOffsets Text
topic [Int] -> Task Text (List PartitionOffset)
fetchOffsets PartitionOffset -> msg -> Task Text SeekCmd
callback =
  TopicSubscription :: Topic -> MessageCallback -> OffsetSource -> TopicSubscription
TopicSubscription
    { topic :: Topic
topic = Text -> Topic
Kafka.Topic Text
topic,
      onMessage :: MessageCallback
onMessage =
        (ConsumerRecord () () -> msg -> Task Text SeekCmd)
-> MessageCallback
forall e msg.
(Show e, ToJSON msg, FromJSON msg) =>
(ConsumerRecord () () -> msg -> Task e SeekCmd) -> MessageCallback
Partition.MessageCallback
          ( \ConsumerRecord () ()
record msg
msg -> do
              let offsetParams :: PartitionOffset
offsetParams =
                    PartitionOffset :: Int -> Int -> PartitionOffset
PartitionOffset
                      { partitionId :: Int
partitionId =
                          ConsumerRecord () () -> PartitionId
forall k v. ConsumerRecord k v -> PartitionId
Consumer.crPartition ConsumerRecord () ()
record
                            PartitionId -> (PartitionId -> Int) -> Int
forall a b. a -> (a -> b) -> b
|> PartitionId -> Int
partitionIdToInt,
                        offset :: Int
offset = Offset -> Int
Consumer.unOffset (ConsumerRecord () () -> Offset
forall k v. ConsumerRecord k v -> Offset
Consumer.crOffset ConsumerRecord () ()
record)
                      }
              PartitionOffset -> msg -> Task Text SeekCmd
callback PartitionOffset
offsetParams msg
msg
          ),
      offsetSource :: OffsetSource
offsetSource =
        ([PartitionKey] -> Task Text [(PartitionKey, Int)]) -> OffsetSource
Elsewhere
          ( \[PartitionKey]
partitionKeys -> do
              let partitionIds :: [Int]
partitionIds =
                    [PartitionKey]
partitionKeys
                      [PartitionKey] -> ([PartitionKey] -> [Int]) -> [Int]
forall a b. a -> (a -> b) -> b
|> (PartitionKey -> Int) -> [PartitionKey] -> [Int]
forall a b. (a -> b) -> List a -> List b
List.map (PartitionId -> Int
partitionIdToInt (PartitionId -> Int)
-> (PartitionKey -> PartitionId) -> PartitionKey -> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
<< PartitionKey -> PartitionId
forall a b. (a, b) -> b
Tuple.second)
              List PartitionOffset
offsets <- [Int] -> Task Text (List PartitionOffset)
fetchOffsets [Int]
partitionIds
              List PartitionOffset
offsets
                List PartitionOffset
-> (List PartitionOffset -> [(PartitionKey, Int)])
-> [(PartitionKey, Int)]
forall a b. a -> (a -> b) -> b
|> (PartitionOffset -> (PartitionKey, Int))
-> List PartitionOffset -> [(PartitionKey, Int)]
forall a b. (a -> b) -> List a -> List b
List.map PartitionOffset -> (PartitionKey, Int)
toPartitionKey
                [(PartitionKey, Int)]
-> ([(PartitionKey, Int)] -> Task Text [(PartitionKey, Int)])
-> Task Text [(PartitionKey, Int)]
forall a b. a -> (a -> b) -> b
|> [(PartitionKey, Int)] -> Task Text [(PartitionKey, Int)]
forall a x. a -> Task x a
Task.succeed
          )
    }
  where
    toPartitionKey :: PartitionOffset -> (PartitionKey, Int)
    toPartitionKey :: PartitionOffset -> (PartitionKey, Int)
toPartitionKey (PartitionOffset {Int
partitionId :: Int
partitionId :: PartitionOffset -> Int
partitionId, Int
offset :: Int
offset :: PartitionOffset -> Int
offset}) =
      ( ( Text -> TopicName
Consumer.TopicName Text
topic,
          Int -> PartitionId
Consumer.PartitionId (Int -> Int
forall a b. (Integral a, Num b) => a -> b
Prelude.fromIntegral Int
partitionId)
        ),
        Int
offset
      )

    partitionIdToInt :: Consumer.PartitionId -> Int
    partitionIdToInt :: PartitionId -> Int
partitionIdToInt (Consumer.PartitionId Int
int) = Int -> Int
forall a b. (Integral a, Num b) => a -> b
Prelude.fromIntegral Int
int

-- | This determines how a worker that was just assigned a partition should
-- decide at which message offset to continue processing.
data OffsetSource where
  -- | Use Kafka's own offset storage mechanism.
  InKafka :: OffsetSource
  -- | Store offsets somewhere else, in which case you need to provide a
  -- function that the worker can use to load initial offsets. Storing offsets
  -- outside Kafka can be used to implement exactly-once-delivery schemes.
  -- Using this requires the message itself to commit the offset.
  Elsewhere ::
    ([PartitionKey] -> Task Text [(PartitionKey, Int)]) ->
    OffsetSource

-- | Starts the kafka worker handling messages.
process :: Settings.Settings -> Text -> TopicSubscription -> Prelude.IO ()
process :: Settings -> Text -> TopicSubscription -> IO ()
process Settings
settings Text
groupIdText TopicSubscription
topicSubscriptions = do
  Settings -> ConsumerGroupId -> TopicSubscription -> IO ()
processWithoutShutdownEnsurance Settings
settings (Text -> ConsumerGroupId
Consumer.ConsumerGroupId Text
groupIdText) TopicSubscription
topicSubscriptions
  -- Start an ensurance policy to make sure we exit in 5 seconds. We've seen
  -- cases where our graceful shutdown seems to hang, resulting in a worker
  -- that's not doing anything. We should try to fix those failures, but for the
  -- ones that remain this is our fallback.
  --
  -- Running it using `Async.async` makes it so we won't wait for this thread to
  -- complete. If the regular shutdown completes before this thread is done we
  -- will exit early.
  Async ()
_ <-
    IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
Async.async (IO () -> IO (Async ())) -> IO () -> IO (Async ())
forall a b. (a -> b) -> a -> b
<| do
      Int -> IO ()
Control.Concurrent.threadDelay Int
5_000_000 {- 5 seconds -}
      String -> IO ()
Prelude.putStrLn String
"Something is holding up shutdown. Going to die ungracefully now."
      ExitCode -> IO ()
System.Posix.Process.exitImmediately (Int -> ExitCode
System.Exit.ExitFailure Int
1)
  () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
Prelude.pure ()

-- | Like `process`, but doesn't exit the current process by itself. This risks
-- leaving zombie processes when used in production but is safer in tests, where
-- the worker shares the OS process with other test code and the test runner.
processWithoutShutdownEnsurance :: Settings.Settings -> Consumer.ConsumerGroupId -> TopicSubscription -> Prelude.IO ()
processWithoutShutdownEnsurance :: Settings -> ConsumerGroupId -> TopicSubscription -> IO ()
processWithoutShutdownEnsurance Settings
settings ConsumerGroupId
groupId TopicSubscription
topicSubscriptions = do
  let TopicSubscription {MessageCallback
onMessage :: MessageCallback
onMessage :: TopicSubscription -> MessageCallback
onMessage, Topic
topic :: Topic
topic :: TopicSubscription -> Topic
topic, OffsetSource
offsetSource :: OffsetSource
offsetSource :: TopicSubscription -> OffsetSource
offsetSource} = TopicSubscription
topicSubscriptions
  State
state <- IO State
initState
  IO () -> IO ()
onQuitSignal (Stopping -> Text -> IO ()
Stopping.stopTakingRequests (State -> Stopping
stopping State
state) Text
"Received stop signal")
  Acquire Handler -> (Handler -> IO ()) -> IO ()
forall (m :: * -> *) a b.
MonadUnliftIO m =>
Acquire a -> (a -> m b) -> m b
Conduit.withAcquire (Settings -> Acquire Handler
Observability.handler (Settings -> Settings
Settings.observability Settings
settings)) ((Handler -> IO ()) -> IO ()) -> (Handler -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
<| \Handler
observabilityHandler -> do
    IO KafkaConsumer
-> (Maybe SomeException -> KafkaConsumer -> IO ())
-> (KafkaConsumer -> IO ())
-> IO ()
forall (m :: * -> *) a b c.
MonadMask m =>
m a -> (Maybe SomeException -> a -> m b) -> (a -> m c) -> m c
Exception.bracketWithError
      (Settings
-> ConsumerGroupId
-> Handler
-> OffsetSource
-> MessageCallback
-> Topic
-> State
-> IO KafkaConsumer
createConsumer Settings
settings ConsumerGroupId
groupId Handler
observabilityHandler OffsetSource
offsetSource MessageCallback
onMessage Topic
topic State
state)
      (Handler
-> RebalanceInfo
-> Stopping
-> Maybe SomeException
-> KafkaConsumer
-> IO ()
cleanUp Handler
observabilityHandler (State -> RebalanceInfo
rebalanceInfo State
state) (State -> Stopping
stopping State
state))
      (Settings -> State -> KafkaConsumer -> IO ()
runThreads Settings
settings State
state)

initState :: Prelude.IO State
initState :: IO State
initState = do
  Stopping
stopping <- IO Stopping
Stopping.init
  AllPartitions
partitions <- Dict PartitionKey Partition -> IO AllPartitions
forall a. a -> IO (TVar a)
TVar.newTVarIO Dict PartitionKey Partition
forall k v. Dict k v
Dict.empty
  Analytics
analytics <- IO Int -> IO Analytics
Analytics.init ((Dict PartitionKey Partition -> Int)
-> IO (Dict PartitionKey Partition) -> IO Int
forall (m :: * -> *) a value.
Functor m =>
(a -> value) -> m a -> m value
map Dict PartitionKey Partition -> Int
forall k v. Dict k v -> Int
Dict.size (AllPartitions -> IO (Dict PartitionKey Partition)
forall a. TVar a -> IO a
TVar.readTVarIO AllPartitions
partitions))
  RebalanceInfo
rebalanceInfo <- Dict PartitionKey (Rebalance, Float) -> IO RebalanceInfo
forall a. a -> IO (TVar a)
TVar.newTVarIO Dict PartitionKey (Rebalance, Float)
forall k v. Dict k v
Dict.empty
  State -> IO State
forall (f :: * -> *) a. Applicative f => a -> f a
Prelude.pure
    State :: AllPartitions -> Stopping -> Analytics -> RebalanceInfo -> State
State
      { AllPartitions
partitions :: AllPartitions
partitions :: AllPartitions
partitions,
        Stopping
stopping :: Stopping
stopping :: Stopping
stopping,
        Analytics
analytics :: Analytics
analytics :: Analytics
analytics,
        RebalanceInfo
rebalanceInfo :: RebalanceInfo
rebalanceInfo :: RebalanceInfo
rebalanceInfo
      }

-- | Goes to Kafka and registers a consumer on the Topic
-- Kafka whould give the consumer some number of partitions to be responsible for
createConsumer ::
  Settings.Settings ->
  Consumer.ConsumerGroupId ->
  Observability.Handler ->
  OffsetSource ->
  Partition.MessageCallback ->
  Kafka.Topic ->
  State ->
  Prelude.IO Consumer.KafkaConsumer
createConsumer :: Settings
-> ConsumerGroupId
-> Handler
-> OffsetSource
-> MessageCallback
-> Topic
-> State
-> IO KafkaConsumer
createConsumer
  Settings.Settings
    { [BrokerAddress]
brokerAddresses :: Settings -> [BrokerAddress]
brokerAddresses :: [BrokerAddress]
Settings.brokerAddresses,
      KafkaLogLevel
logLevel :: Settings -> KafkaLogLevel
logLevel :: KafkaLogLevel
Settings.logLevel,
      MaxPollIntervalMs
maxPollIntervalMs :: Settings -> MaxPollIntervalMs
maxPollIntervalMs :: MaxPollIntervalMs
Settings.maxPollIntervalMs,
      SkipOrNot
onProcessMessageSkip :: Settings -> SkipOrNot
onProcessMessageSkip :: SkipOrNot
Settings.onProcessMessageSkip
    }
  ConsumerGroupId
groupId
  Handler
observability
  OffsetSource
offsetSource
  MessageCallback
callback
  Topic
topic
  State
state = do
    let rebalance :: KafkaConsumer -> RebalanceEvent -> IO ()
rebalance =
          SkipOrNot
-> Handler
-> MessageCallback
-> OffsetSource
-> State
-> KafkaConsumer
-> RebalanceEvent
-> IO ()
rebalanceCallback
            SkipOrNot
onProcessMessageSkip
            Handler
observability
            MessageCallback
callback
            OffsetSource
offsetSource
            State
state
    let properties :: ConsumerProperties
properties =
          [BrokerAddress] -> ConsumerProperties
Consumer.brokersList
            [BrokerAddress]
brokerAddresses
            ConsumerProperties -> ConsumerProperties -> ConsumerProperties
forall appendable.
Semigroup appendable =>
appendable -> appendable -> appendable
++ ConsumerGroupId -> ConsumerProperties
Consumer.groupId ConsumerGroupId
groupId
            ConsumerProperties -> ConsumerProperties -> ConsumerProperties
forall appendable.
Semigroup appendable =>
appendable -> appendable -> appendable
++ ConsumerProperties
Consumer.noAutoCommit
            ConsumerProperties -> ConsumerProperties -> ConsumerProperties
forall appendable.
Semigroup appendable =>
appendable -> appendable -> appendable
++ KafkaLogLevel -> ConsumerProperties
Consumer.logLevel KafkaLogLevel
logLevel
            ConsumerProperties -> ConsumerProperties -> ConsumerProperties
forall appendable.
Semigroup appendable =>
appendable -> appendable -> appendable
++ Callback -> ConsumerProperties
Consumer.setCallback ((KafkaConsumer -> RebalanceEvent -> IO ()) -> Callback
Consumer.rebalanceCallback KafkaConsumer -> RebalanceEvent -> IO ()
rebalance)
            ConsumerProperties -> ConsumerProperties -> ConsumerProperties
forall appendable.
Semigroup appendable =>
appendable -> appendable -> appendable
++ KafkaCompressionCodec -> ConsumerProperties
Consumer.compression KafkaCompressionCodec
Consumer.Snappy
            ConsumerProperties -> ConsumerProperties -> ConsumerProperties
forall appendable.
Semigroup appendable =>
appendable -> appendable -> appendable
++ Map Text Text -> ConsumerProperties
Consumer.extraProps
              ( List (Text, Text) -> Map Text Text
forall comparable v.
Ord comparable =>
List (comparable, v) -> Dict comparable v
Dict.fromList
                  [(Text
"max.poll.interval.ms", Int -> Text
Text.fromInt (MaxPollIntervalMs -> Int
Settings.unMaxPollIntervalMs MaxPollIntervalMs
maxPollIntervalMs))]
              )
    let subscription' :: Subscription
subscription' =
          [TopicName] -> Subscription
Consumer.topics [Text -> TopicName
Consumer.TopicName (Topic -> Text
Kafka.unTopic Topic
topic)]
            Subscription -> Subscription -> Subscription
forall appendable.
Semigroup appendable =>
appendable -> appendable -> appendable
++ OffsetReset -> Subscription
Consumer.offsetReset OffsetReset
Consumer.Earliest
    Either KafkaError KafkaConsumer
eitherConsumer <- ConsumerProperties
-> Subscription -> IO (Either KafkaError KafkaConsumer)
forall (m :: * -> *).
MonadIO m =>
ConsumerProperties
-> Subscription -> m (Either KafkaError KafkaConsumer)
Consumer.newConsumer ConsumerProperties
properties Subscription
subscription'
    case Either KafkaError KafkaConsumer
eitherConsumer of
      Prelude.Left KafkaError
err ->
        -- We create the worker as part of starting the application. Throwing
        -- means that if there's a problem with the settings the application will
        -- fail immediately upon start. It won't result in runtime errors during
        -- operation.
        KafkaError -> IO KafkaConsumer
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
Exception.throwIO KafkaError
err
      Prelude.Right KafkaConsumer
consumer ->
        KafkaConsumer -> IO KafkaConsumer
forall (f :: * -> *) a. Applicative f => a -> f a
Prelude.pure KafkaConsumer
consumer

-- | Triggered when a rebalance happens, due to workers going offline or coming
-- online. This typically happens when we're scaling up or down, or when a
-- worker dies because of an unexpected exception.
--
-- The following provides reasonable documentation for these events. It's Java
-- documentation, but it should apply to what's happening here fairly well (the
-- `hw-kafka-client` library we use is a wrapper over librdkafka).
-- https://docs.confluent.io/2.0.0/clients/librdkafka/classRdKafka_1_1RebalanceCb.html#a490a91c52724382a72380af621958741
rebalanceCallback ::
  Settings.SkipOrNot ->
  Observability.Handler ->
  Partition.MessageCallback ->
  OffsetSource ->
  State ->
  Consumer.KafkaConsumer ->
  Consumer.RebalanceEvent ->
  Prelude.IO ()
rebalanceCallback :: SkipOrNot
-> Handler
-> MessageCallback
-> OffsetSource
-> State
-> KafkaConsumer
-> RebalanceEvent
-> IO ()
rebalanceCallback SkipOrNot
skipOrNot Handler
observability MessageCallback
callback OffsetSource
offsetSource State
state KafkaConsumer
consumer RebalanceEvent
rebalanceEvent = do
  Float
now <- IO Float
GHC.Clock.getMonotonicTime
  Float -> Analytics -> IO ()
Analytics.updateTimeOfLastRebalance Float
now (State -> Analytics
analytics State
state)
  case RebalanceEvent
rebalanceEvent of
    Consumer.RebalanceBeforeAssign [PartitionKey]
newPartitions -> do
      List (PartitionKey, CommitOffsets)
keysWithOffsets <-
        case OffsetSource
offsetSource of
          OffsetSource
InKafka ->
            List (PartitionKey, CommitOffsets)
-> IO (List (PartitionKey, CommitOffsets))
forall (f :: * -> *) a. Applicative f => a -> f a
Prelude.pure (List (PartitionKey, CommitOffsets)
 -> IO (List (PartitionKey, CommitOffsets)))
-> List (PartitionKey, CommitOffsets)
-> IO (List (PartitionKey, CommitOffsets))
forall a b. (a -> b) -> a -> b
<| (PartitionKey -> (PartitionKey, CommitOffsets))
-> [PartitionKey] -> List (PartitionKey, CommitOffsets)
forall a b. (a -> b) -> List a -> List b
List.map (\PartitionKey
partitionKey -> (PartitionKey
partitionKey, CommitOffsets
Partition.ToKafka)) [PartitionKey]
newPartitions
          Elsewhere [PartitionKey] -> Task Text [(PartitionKey, Int)]
fetch -> do
            LogHandler
log <- IO LogHandler
Platform.silentHandler
            Result Text [(PartitionKey, Int)]
fetchResult <- LogHandler
-> Task Text [(PartitionKey, Int)]
-> IO (Result Text [(PartitionKey, Int)])
forall x a. LogHandler -> Task x a -> IO (Result x a)
Task.attempt LogHandler
log ([PartitionKey] -> Task Text [(PartitionKey, Int)]
fetch [PartitionKey]
newPartitions)
            case Result Text [(PartitionKey, Int)]
fetchResult of
              Err Text
err -> String -> IO (List (PartitionKey, CommitOffsets))
forall (m :: * -> *) a.
(MonadThrow m, HasCallStack) =>
String -> m a
Exception.throwString (Text -> String
Text.toList Text
err)
              Ok [(PartitionKey, Int)]
fetched -> do
                let storedOffsets :: Dict PartitionKey CommitOffsets
storedOffsets =
                      ((PartitionKey, Int) -> (PartitionKey, CommitOffsets))
-> [(PartitionKey, Int)] -> List (PartitionKey, CommitOffsets)
forall a b. (a -> b) -> List a -> List b
List.map ((Int -> CommitOffsets)
-> (PartitionKey, Int) -> (PartitionKey, CommitOffsets)
forall b y a. (b -> y) -> (a, b) -> (a, y)
Tuple.mapSecond Int -> CommitOffsets
Partition.Elsewhere) [(PartitionKey, Int)]
fetched
                        List (PartitionKey, CommitOffsets)
-> (List (PartitionKey, CommitOffsets)
    -> Dict PartitionKey CommitOffsets)
-> Dict PartitionKey CommitOffsets
forall a b. a -> (a -> b) -> b
|> List (PartitionKey, CommitOffsets)
-> Dict PartitionKey CommitOffsets
forall comparable v.
Ord comparable =>
List (comparable, v) -> Dict comparable v
Dict.fromList
                let storedKeys :: Set PartitionKey
storedKeys =
                      [(PartitionKey, Int)]
fetched
                        [(PartitionKey, Int)]
-> ([(PartitionKey, Int)] -> [PartitionKey]) -> [PartitionKey]
forall a b. a -> (a -> b) -> b
|> ((PartitionKey, Int) -> PartitionKey)
-> [(PartitionKey, Int)] -> [PartitionKey]
forall a b. (a -> b) -> List a -> List b
List.map (PartitionKey, Int) -> PartitionKey
forall a b. (a, b) -> a
Tuple.first
                        [PartitionKey]
-> ([PartitionKey] -> Set PartitionKey) -> Set PartitionKey
forall a b. a -> (a -> b) -> b
|> [PartitionKey] -> Set PartitionKey
forall comparable.
Ord comparable =>
List comparable -> Set comparable
Set.fromList
                let missingPartitions :: Set PartitionKey
missingPartitions = Set PartitionKey -> Set PartitionKey -> Set PartitionKey
forall comparable.
Ord comparable =>
Set comparable -> Set comparable -> Set comparable
Set.diff ([PartitionKey] -> Set PartitionKey
forall comparable.
Ord comparable =>
List comparable -> Set comparable
Set.fromList [PartitionKey]
newPartitions) Set PartitionKey
storedKeys
                -- NOTE: we fallback to the end of the queue so we can reset
                -- offsets in staging.
                -- in production, we should never hit this code. Perhaps we
                -- should explicitly guard against it?
                Either KafkaError (List WatermarkOffsets)
waterMarkInfo :: Prelude.Either Consumer.KafkaError (List Kafka.Metadata.WatermarkOffsets) <-
                  Set PartitionKey
missingPartitions
                    Set PartitionKey
-> (Set PartitionKey -> [PartitionKey]) -> [PartitionKey]
forall a b. a -> (a -> b) -> b
|> Set PartitionKey -> [PartitionKey]
forall a. Set a -> List a
Set.toList
                    [PartitionKey]
-> ([PartitionKey] -> IO [Either KafkaError WatermarkOffsets])
-> IO [Either KafkaError WatermarkOffsets]
forall a b. a -> (a -> b) -> b
|> (PartitionKey -> IO (Either KafkaError WatermarkOffsets))
-> [PartitionKey] -> IO [Either KafkaError WatermarkOffsets]
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
Prelude.traverse
                      ( \(TopicName
topicName, PartitionId
partitionId) ->
                          KafkaConsumer
-> Timeout
-> TopicName
-> PartitionId
-> IO (Either KafkaError WatermarkOffsets)
forall (m :: * -> *) k.
(MonadIO m, HasKafka k) =>
k
-> Timeout
-> TopicName
-> PartitionId
-> m (Either KafkaError WatermarkOffsets)
Kafka.Metadata.partitionWatermarkOffsets
                            KafkaConsumer
consumer
                            (Int -> Timeout
Consumer.Timeout Int
5000 {- 5 seconds -})
                            TopicName
topicName
                            PartitionId
partitionId
                      )
                    IO [Either KafkaError WatermarkOffsets]
-> (IO [Either KafkaError WatermarkOffsets]
    -> IO (Either KafkaError (List WatermarkOffsets)))
-> IO (Either KafkaError (List WatermarkOffsets))
forall a b. a -> (a -> b) -> b
|> ([Either KafkaError WatermarkOffsets]
 -> Either KafkaError (List WatermarkOffsets))
-> IO [Either KafkaError WatermarkOffsets]
-> IO (Either KafkaError (List WatermarkOffsets))
forall (m :: * -> *) a value.
Functor m =>
(a -> value) -> m a -> m value
map [Either KafkaError WatermarkOffsets]
-> Either KafkaError (List WatermarkOffsets)
forall (t :: * -> *) (m :: * -> *) a.
(Traversable t, Monad m) =>
t (m a) -> m (t a)
Prelude.sequence
                case Either KafkaError (List WatermarkOffsets)
waterMarkInfo of
                  Prelude.Left KafkaError
err -> KafkaError -> IO (List (PartitionKey, CommitOffsets))
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
Exception.throwIO KafkaError
err
                  Prelude.Right List WatermarkOffsets
waterMarks ->
                    let fallbackOffsets :: Dict PartitionKey CommitOffsets
fallbackOffsets =
                          List WatermarkOffsets
waterMarks
                            List WatermarkOffsets
-> (List WatermarkOffsets -> List (PartitionKey, CommitOffsets))
-> List (PartitionKey, CommitOffsets)
forall a b. a -> (a -> b) -> b
|> (WatermarkOffsets -> (PartitionKey, CommitOffsets))
-> List WatermarkOffsets -> List (PartitionKey, CommitOffsets)
forall a b. (a -> b) -> List a -> List b
List.map
                              ( \Kafka.Metadata.WatermarkOffsets {TopicName
woTopicName :: WatermarkOffsets -> TopicName
woTopicName :: TopicName
Kafka.Metadata.woTopicName, PartitionId
woPartitionId :: WatermarkOffsets -> PartitionId
woPartitionId :: PartitionId
Kafka.Metadata.woPartitionId, Offset
woHighWatermark :: WatermarkOffsets -> Offset
woHighWatermark :: Offset
Kafka.Metadata.woHighWatermark} ->
                                  ((TopicName
woTopicName, PartitionId
woPartitionId), Int -> CommitOffsets
Partition.Elsewhere (Offset -> Int
Consumer.unOffset Offset
woHighWatermark))
                              )
                            List (PartitionKey, CommitOffsets)
-> (List (PartitionKey, CommitOffsets)
    -> Dict PartitionKey CommitOffsets)
-> Dict PartitionKey CommitOffsets
forall a b. a -> (a -> b) -> b
|> List (PartitionKey, CommitOffsets)
-> Dict PartitionKey CommitOffsets
forall comparable v.
Ord comparable =>
List (comparable, v) -> Dict comparable v
Dict.fromList
                     in Dict PartitionKey CommitOffsets
-> Dict PartitionKey CommitOffsets
-> Dict PartitionKey CommitOffsets
forall comparable v.
Ord comparable =>
Dict comparable v -> Dict comparable v -> Dict comparable v
Dict.union Dict PartitionKey CommitOffsets
storedOffsets Dict PartitionKey CommitOffsets
fallbackOffsets
                          Dict PartitionKey CommitOffsets
-> (Dict PartitionKey CommitOffsets
    -> List (PartitionKey, CommitOffsets))
-> List (PartitionKey, CommitOffsets)
forall a b. a -> (a -> b) -> b
|> Dict PartitionKey CommitOffsets
-> List (PartitionKey, CommitOffsets)
forall k v. Dict k v -> List (k, v)
Dict.toList
                          List (PartitionKey, CommitOffsets)
-> (List (PartitionKey, CommitOffsets)
    -> IO (List (PartitionKey, CommitOffsets)))
-> IO (List (PartitionKey, CommitOffsets))
forall a b. a -> (a -> b) -> b
|> List (PartitionKey, CommitOffsets)
-> IO (List (PartitionKey, CommitOffsets))
forall (f :: * -> *) a. Applicative f => a -> f a
Prelude.pure
      -- We are being assigned new partitions. Lets prepare threads for the new
      -- messages we're about to receive. Lib-rdkafka hasn't yet started to send
      -- messages, so the queues exist but should be idle
      List (PartitionKey, CommitOffsets)
keysWithOffsets
        List (PartitionKey, CommitOffsets)
-> (List (PartitionKey, CommitOffsets) -> IO [()]) -> IO [()]
forall a b. a -> (a -> b) -> b
|> ((PartitionKey, CommitOffsets) -> IO ())
-> List (PartitionKey, CommitOffsets) -> IO [()]
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
Prelude.traverse
          ( \(PartitionKey
partitionKey, CommitOffsets
offset) -> do
              SkipOrNot
-> CommitOffsets
-> Handler
-> KafkaConsumer
-> MessageCallback
-> State
-> PartitionKey
-> IO ()
initPartition
                SkipOrNot
skipOrNot
                CommitOffsets
offset
                Handler
observability
                KafkaConsumer
consumer
                MessageCallback
callback
                State
state
                PartitionKey
partitionKey
              STM () -> IO ()
forall a. STM a -> IO a
STM.atomically
                (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
<| RebalanceInfo
-> (Dict PartitionKey (Rebalance, Float)
    -> Dict PartitionKey (Rebalance, Float))
-> STM ()
forall a. TVar a -> (a -> a) -> STM ()
TVar.modifyTVar' (State -> RebalanceInfo
rebalanceInfo State
state) (PartitionKey
-> (Rebalance, Float)
-> Dict PartitionKey (Rebalance, Float)
-> Dict PartitionKey (Rebalance, Float)
forall comparable v.
Ord comparable =>
comparable -> v -> Dict comparable v -> Dict comparable v
Dict.insert PartitionKey
partitionKey (Rebalance
Assign, Float
now))
          )
        IO [()] -> (IO [()] -> IO ()) -> IO ()
forall a b. a -> (a -> b) -> b
|> ([()] -> ()) -> IO [()] -> IO ()
forall (m :: * -> *) a value.
Functor m =>
(a -> value) -> m a -> m value
map (\[()]
_ -> ())
    Consumer.RebalanceAssign [PartitionKey]
_ -> () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
Prelude.pure ()
    Consumer.RebalanceBeforeRevoke [PartitionKey]
revokedPartitions -> do
      -- This callback is intended to allow us to finish committing any
      -- ongoing work before rebalancing. This avoids processing messages twice.
      --
      -- When the callback returns, the rebalancing occurs.
      --
      -- We will tell workers to wrap up current work, and wait for them to
      -- complete before rebalancing.
      --
      -- First: mark partitions as Stopping. Worker threads will stop.
      [()]
_ <-
        [PartitionKey]
revokedPartitions
          [PartitionKey] -> ([PartitionKey] -> IO [()]) -> IO [()]
forall a b. a -> (a -> b) -> b
|> (PartitionKey -> IO ()) -> [PartitionKey] -> IO [()]
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
Prelude.traverse
            ( \PartitionKey
partitionKey -> do
                STM () -> IO ()
forall a. STM a -> IO a
STM.atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
<| do
                  Dict PartitionKey Partition
partitions <- AllPartitions -> STM (Dict PartitionKey Partition)
forall a. TVar a -> STM a
TVar.readTVar (State -> AllPartitions
partitions State
state)
                  case PartitionKey -> Dict PartitionKey Partition -> Maybe Partition
forall comparable v.
Ord comparable =>
comparable -> Dict comparable v -> Maybe v
Dict.get PartitionKey
partitionKey Dict PartitionKey Partition
partitions of
                    Maybe Partition
Nothing -> () -> STM ()
forall (f :: * -> *) a. Applicative f => a -> f a
Prelude.pure ()
                    Just Partition
partition -> Partition -> STM ()
Partition.revoke Partition
partition
                STM () -> IO ()
forall a. STM a -> IO a
STM.atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
<| RebalanceInfo
-> (Dict PartitionKey (Rebalance, Float)
    -> Dict PartitionKey (Rebalance, Float))
-> STM ()
forall a. TVar a -> (a -> a) -> STM ()
TVar.modifyTVar' (State -> RebalanceInfo
rebalanceInfo State
state) (PartitionKey
-> (Rebalance, Float)
-> Dict PartitionKey (Rebalance, Float)
-> Dict PartitionKey (Rebalance, Float)
forall comparable v.
Ord comparable =>
comparable -> v -> Dict comparable v -> Dict comparable v
Dict.insert PartitionKey
partitionKey (Rebalance
Revoking, Float
now))
            )
      -- Second: Wait for the workers to stop working.
      -- (They will only remove themselves from the partitions when done processing
      -- any ongoing message, so checking non-existence should suffice.)
      [()]
_ <-
        [PartitionKey]
revokedPartitions
          [PartitionKey] -> ([PartitionKey] -> IO [()]) -> IO [()]
forall a b. a -> (a -> b) -> b
|> (PartitionKey -> IO ()) -> [PartitionKey] -> IO [()]
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
Prelude.traverse
            ( \PartitionKey
partitionKey -> do
                STM () -> IO ()
forall a. STM a -> IO a
STM.atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
<| do
                  Dict PartitionKey Partition
partitions <- AllPartitions -> STM (Dict PartitionKey Partition)
forall a. TVar a -> STM a
TVar.readTVar (State -> AllPartitions
partitions State
state)
                  case PartitionKey -> Dict PartitionKey Partition -> Maybe Partition
forall comparable v.
Ord comparable =>
comparable -> Dict comparable v -> Maybe v
Dict.get PartitionKey
partitionKey Dict PartitionKey Partition
partitions of
                    Maybe Partition
Nothing -> () -> STM ()
forall (f :: * -> *) a. Applicative f => a -> f a
Prelude.pure ()
                    Just Partition
_ -> do
                      -- we cannot block on work happening in the FETCHER
                      -- but here, we're blocking work happening in the WORKER
                      -- which is fine!
                      STM ()
forall a. STM a
STM.retry
                STM () -> IO ()
forall a. STM a -> IO a
STM.atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
<| RebalanceInfo
-> (Dict PartitionKey (Rebalance, Float)
    -> Dict PartitionKey (Rebalance, Float))
-> STM ()
forall a. TVar a -> (a -> a) -> STM ()
TVar.modifyTVar' (State -> RebalanceInfo
rebalanceInfo State
state) (PartitionKey
-> (Rebalance, Float)
-> Dict PartitionKey (Rebalance, Float)
-> Dict PartitionKey (Rebalance, Float)
forall comparable v.
Ord comparable =>
comparable -> v -> Dict comparable v -> Dict comparable v
Dict.insert PartitionKey
partitionKey (Rebalance
Revoked, Float
now))
            )
      -- Now workers have stopped and it's safe to rebalance.
      -- Returning from this callback starts the rebalance.
      () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
Prelude.pure ()
    Consumer.RebalanceRevoke [PartitionKey]
_revokedPartitions -> do
      () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
Prelude.pure ()

-- | Disconnects our Consumer / yields back partitions on quit / node shutdown
cleanUp :: Observability.Handler -> RebalanceInfo -> Stopping.Stopping -> Maybe Exception.SomeException -> Consumer.KafkaConsumer -> Prelude.IO ()
cleanUp :: Handler
-> RebalanceInfo
-> Stopping
-> Maybe SomeException
-> KafkaConsumer
-> IO ()
cleanUp Handler
observabilityHandler RebalanceInfo
rebalanceInfo Stopping
stopping Maybe SomeException
maybeException KafkaConsumer
consumer = do
  String -> IO ()
Prelude.putStrLn String
"Cleaning up"
  Maybe KafkaError
_ <- KafkaConsumer -> IO (Maybe KafkaError)
forall (m :: * -> *).
MonadIO m =>
KafkaConsumer -> m (Maybe KafkaError)
Consumer.closeConsumer KafkaConsumer
consumer
  -- In case we're already stopping, get the reason we're doing so.
  Maybe Text
maybeStopReason <- Stopping -> IO (Maybe Text)
Stopping.stopReason Stopping
stopping
  -- Ensure we enter stopping mode if we weren't already.
  Stopping -> Text -> IO ()
Stopping.stopTakingRequests Stopping
stopping Text
"Shutting down"
  Text
requestId <- (UUID -> Text) -> IO UUID -> IO Text
forall (m :: * -> *) a value.
Functor m =>
(a -> value) -> m a -> m value
map UUID -> Text
Data.UUID.toText IO UUID
Data.UUID.V4.nextRandom
  -- at some point, k8s should report system crashes. In the mean time, we'll do it.
  Text
-> (TracingSpan -> IO ()) -> Text -> (LogHandler -> IO ()) -> IO ()
forall a.
HasCallStack =>
Text
-> (TracingSpan -> IO ()) -> Text -> (LogHandler -> IO a) -> IO a
Platform.rootTracingSpanIO
    Text
requestId
    (Handler -> Text -> TracingSpan -> IO ()
Observability.report Handler
observabilityHandler Text
requestId)
    Text
"Kafka consumer shutting down"
    ((LogHandler -> IO ()) -> IO ()) -> (LogHandler -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
<| \LogHandler
log -> do
      case Maybe SomeException
maybeException of
        Maybe SomeException
Nothing -> () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
Prelude.pure ()
        Just SomeException
exception -> do
          Dict PartitionKey (Rebalance, Float)
rebalanceInfo' <- RebalanceInfo -> IO (Dict PartitionKey (Rebalance, Float))
forall a. TVar a -> IO a
TVar.readTVarIO RebalanceInfo
rebalanceInfo
          Text -> [Context] -> Task Never ()
forall e. HasCallStack => Text -> [Context] -> Task e ()
Log.error
            Text
"Kafka consumer crashed"
            [ Text -> Text -> Context
forall a. (Show a, ToJSON a) => Text -> a -> Context
Log.context Text
"triage" (Text
"The consumer should automatically restart. If we see lots of these in a short period of time we should try to figure out what's wrong" :: Text),
              Text -> Text -> Context
forall a. (Show a, ToJSON a) => Text -> a -> Context
Log.context Text
"error" (SomeException -> Text
forall a. Show a => a -> Text
Debug.toString SomeException
exception),
              Text -> Text -> Context
forall a. (Show a, ToJSON a) => Text -> a -> Context
Log.context Text
"rebalance info" (Dict PartitionKey (Rebalance, Float) -> Text
forall a. Show a => a -> Text
Debug.toString Dict PartitionKey (Rebalance, Float)
rebalanceInfo')
            ]
            Task Never () -> (Task Never () -> IO ()) -> IO ()
forall a b. a -> (a -> b) -> b
|> LogHandler -> Task Never () -> IO ()
forall a. LogHandler -> Task Never a -> IO a
Task.perform LogHandler
log
  Maybe SomeException -> IO ()
writeCrashLogOnError Maybe SomeException
maybeException
  case (Maybe SomeException
maybeException, Maybe Text
maybeStopReason) of
    (Just SomeException
exception, Maybe Text
_) -> String -> IO ()
Prelude.putStrLn (String
"Shut down because of exception: " String -> ShowS
forall appendable.
Semigroup appendable =>
appendable -> appendable -> appendable
++ SomeException -> String
forall e. Exception e => e -> String
Exception.displayException SomeException
exception)
    (Maybe SomeException
_, Just Text
stopReason) -> String -> IO ()
Prelude.putStrLn (String
"Shut down because of: " String -> ShowS
forall appendable.
Semigroup appendable =>
appendable -> appendable -> appendable
++ Text -> String
Text.toList Text
stopReason)
    (Maybe SomeException
Nothing, Maybe Text
Nothing) -> String -> IO ()
Prelude.putStrLn String
"Shut down for an unknown reason."

-- | Handle crash logging
writeCrashLogOnError :: Maybe Exception.SomeException -> Prelude.IO ()
writeCrashLogOnError :: Maybe SomeException -> IO ()
writeCrashLogOnError Maybe SomeException
maybeException = do
  -- Not using the nri-env-parser lib for this configuration option because it would
  -- require us to run code to make it available. If that code failed it
  -- wouldn't end up in the crash log! Using only `base` functionality allows us
  -- to put the crashlog reporting in the very root of the application.
  Maybe String
crashLogPath <- String -> IO (Maybe String)
System.Environment.lookupEnv String
"CRASHLOG_PATH"
  let crashLog :: String
crashLog =
        case Maybe SomeException
maybeException of
          Maybe SomeException
Nothing -> String
"System exited in response to signal"
          Just SomeException
exception -> SomeException -> String
forall e. Exception e => e -> String
Exception.displayException SomeException
exception
  case Maybe String
crashLogPath of
    Maybe String
Nothing -> () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
Prelude.pure ()
    Just String
"" -> () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
Prelude.pure ()
    Just String
path -> String -> String -> IO ()
Prelude.writeFile String
path String
crashLog

-- Adds a partition to our partitions dict. These partitions will be idle until
-- lib-rdkafka actually starts sending us new messages
-- (after the Consumer.rebalanceassign event occurs)
initPartition ::
  Settings.SkipOrNot ->
  Partition.CommitOffsets ->
  Observability.Handler ->
  Consumer.KafkaConsumer ->
  Partition.MessageCallback ->
  State ->
  PartitionKey ->
  Prelude.IO ()
initPartition :: SkipOrNot
-> CommitOffsets
-> Handler
-> KafkaConsumer
-> MessageCallback
-> State
-> PartitionKey
-> IO ()
initPartition SkipOrNot
skipOrNot CommitOffsets
commitOffset Handler
observabilityHandler KafkaConsumer
consumer MessageCallback
callback State
state PartitionKey
key = do
  -- # Start worker thread for handling messages in partition.
  SkipOrNot
-> CommitOffsets
-> Handler
-> Analytics
-> Stopping
-> KafkaConsumer
-> MessageCallback
-> OnStartup
-> OnCleanup
-> IO ()
Partition.spawnWorkerThread
    SkipOrNot
skipOrNot
    CommitOffsets
commitOffset
    Handler
observabilityHandler
    (State -> Analytics
analytics State
state)
    (State -> Stopping
stopping State
state)
    KafkaConsumer
consumer
    MessageCallback
callback
    -- startup function
    ( (Partition -> IO ()) -> OnStartup
Partition.OnStartup
        ( \Partition
partition ->
            STM () -> IO ()
forall a. STM a -> IO a
STM.atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
<| do
              Dict PartitionKey Partition
queues <- AllPartitions -> STM (Dict PartitionKey Partition)
forall a. TVar a -> STM a
TVar.readTVar (State -> AllPartitions
partitions State
state)
              case PartitionKey -> Dict PartitionKey Partition -> Maybe Partition
forall comparable v.
Ord comparable =>
comparable -> Dict comparable v -> Maybe v
Dict.get PartitionKey
key Dict PartitionKey Partition
queues of
                Just Partition
_ ->
                  -- We never expect to be asked to create a partition that already exists.
                  -- We expect that revoke has completely removed the partition before
                  -- re-assign.
                  --
                  -- If it happens anyway, it seems safer to crash (and restart) than to
                  -- try continue into the unknown.
                  RuntimeExceptions -> STM ()
forall e a. Exception e => e -> STM a
STM.throwSTM (PartitionKey -> RuntimeExceptions
AskedToInitPartitionThatAlreadyExists PartitionKey
key)
                Maybe Partition
Nothing -> do
                  AllPartitions -> Dict PartitionKey Partition -> STM ()
forall a. TVar a -> a -> STM ()
TVar.writeTVar (State -> AllPartitions
partitions State
state) (PartitionKey
-> Partition
-> Dict PartitionKey Partition
-> Dict PartitionKey Partition
forall comparable v.
Ord comparable =>
comparable -> v -> Dict comparable v -> Dict comparable v
Dict.insert PartitionKey
key Partition
partition Dict PartitionKey Partition
queues)
        )
    )
    -- cleanup function
    ( IO () -> OnCleanup
Partition.OnCleanup
        ( do
            -- Remove the partition from the dict to clean up memory
            STM () -> IO ()
forall a. STM a -> IO a
STM.atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
<| AllPartitions
-> (Dict PartitionKey Partition -> Dict PartitionKey Partition)
-> STM ()
forall a. TVar a -> (a -> a) -> STM ()
TVar.modifyTVar' (State -> AllPartitions
partitions State
state) (PartitionKey
-> Dict PartitionKey Partition -> Dict PartitionKey Partition
forall comparable v.
Ord comparable =>
comparable -> Dict comparable v -> Dict comparable v
Dict.remove PartitionKey
key)
            String -> IO ()
Prelude.putStrLn (String
"Stop processing messages for partition: " String -> ShowS
forall appendable.
Semigroup appendable =>
appendable -> appendable -> appendable
++ PartitionKey -> String
forall a. Show a => a -> String
Prelude.show PartitionKey
key)
        )
    )

runThreads ::
  Settings.Settings ->
  State ->
  Consumer.KafkaConsumer ->
  Prelude.IO ()
runThreads :: Settings -> State -> KafkaConsumer -> IO ()
runThreads Settings
settings State
state KafkaConsumer
consumer = do
  Stopping -> () -> IO () -> IO ()
forall a. Stopping -> a -> IO a -> IO a
Stopping.runUnlessStopping
    (State -> Stopping
stopping State
state)
    ()
    ( IO () -> IO () -> IO (Either () ())
forall a b. IO a -> IO b -> IO (Either a b)
Async.race
        (MaxMsgsPerPartitionBufferedLocally
-> KafkaConsumer -> State -> Set PartitionKey -> IO ()
pauseAndAnalyticsLoop (Settings -> MaxMsgsPerPartitionBufferedLocally
Settings.maxMsgsPerPartitionBufferedLocally Settings
settings) KafkaConsumer
consumer State
state Set PartitionKey
forall a. Set a
Set.empty)
        (Settings -> EnqueueRecord -> Analytics -> KafkaConsumer -> IO ()
Fetcher.pollingLoop Settings
settings (AllPartitions -> EnqueueRecord
enqueueRecord (State -> AllPartitions
partitions State
state)) (State -> Analytics
analytics State
state) KafkaConsumer
consumer)
        IO (Either () ()) -> (IO (Either () ()) -> IO ()) -> IO ()
forall a b. a -> (a -> b) -> b
|> (Either () () -> ()) -> IO (Either () ()) -> IO ()
forall (m :: * -> *) a value.
Functor m =>
(a -> value) -> m a -> m value
map (\Either () ()
_ -> ())
    )

data RuntimeExceptions
  = AskedToInitPartitionThatAlreadyExists (Consumer.TopicName, Consumer.PartitionId)
  deriving (Int -> RuntimeExceptions -> ShowS
[RuntimeExceptions] -> ShowS
RuntimeExceptions -> String
(Int -> RuntimeExceptions -> ShowS)
-> (RuntimeExceptions -> String)
-> ([RuntimeExceptions] -> ShowS)
-> Show RuntimeExceptions
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [RuntimeExceptions] -> ShowS
$cshowList :: [RuntimeExceptions] -> ShowS
show :: RuntimeExceptions -> String
$cshow :: RuntimeExceptions -> String
showsPrec :: Int -> RuntimeExceptions -> ShowS
$cshowsPrec :: Int -> RuntimeExceptions -> ShowS
Show)

instance Exception.Exception RuntimeExceptions

enqueueRecord ::
  AllPartitions ->
  Partition.ConsumerRecord ->
  Prelude.IO Partition.SeekCmd
enqueueRecord :: AllPartitions -> EnqueueRecord
enqueueRecord AllPartitions
partitions ConsumerRecord
record =
  STM SeekCmd -> IO SeekCmd
forall a. STM a -> IO a
STM.atomically (STM SeekCmd -> IO SeekCmd) -> STM SeekCmd -> IO SeekCmd
forall a b. (a -> b) -> a -> b
<| do
    let key :: PartitionKey
key = (ConsumerRecord -> TopicName
forall k v. ConsumerRecord k v -> TopicName
Consumer.crTopic ConsumerRecord
record, ConsumerRecord -> PartitionId
forall k v. ConsumerRecord k v -> PartitionId
Consumer.crPartition ConsumerRecord
record)
    Dict PartitionKey Partition
partitions' <- AllPartitions -> STM (Dict PartitionKey Partition)
forall a. TVar a -> STM a
TVar.readTVar AllPartitions
partitions
    let maybePartition :: Maybe Partition
maybePartition = PartitionKey -> Dict PartitionKey Partition -> Maybe Partition
forall comparable v.
Ord comparable =>
comparable -> Dict comparable v -> Maybe v
Dict.get PartitionKey
key Dict PartitionKey Partition
partitions'
    case Maybe Partition
maybePartition of
      Maybe Partition
Nothing -> SeekCmd -> STM SeekCmd
forall (f :: * -> *) a. Applicative f => a -> f a
Prelude.pure SeekCmd
Partition.NoSeek
      Just Partition
partition -> ConsumerRecord -> Partition -> STM SeekCmd
Partition.append ConsumerRecord
record Partition
partition

-- | Intermittently updates
-- - paused partitions to reflect desired state.
-- - analytics, so that the worker node can report up-to-date data to honeycomb
pauseAndAnalyticsLoop ::
  Settings.MaxMsgsPerPartitionBufferedLocally ->
  Consumer.KafkaConsumer ->
  State ->
  Set.Set PartitionKey ->
  Prelude.IO ()
pauseAndAnalyticsLoop :: MaxMsgsPerPartitionBufferedLocally
-> KafkaConsumer -> State -> Set PartitionKey -> IO ()
pauseAndAnalyticsLoop MaxMsgsPerPartitionBufferedLocally
maxBufferSize KafkaConsumer
consumer State
state Set PartitionKey
pausedPartitions = do
  Set PartitionKey
desiredPausedPartitions <- MaxMsgsPerPartitionBufferedLocally
-> AllPartitions -> IO (Set PartitionKey)
pausedPartitionKeys MaxMsgsPerPartitionBufferedLocally
maxBufferSize (State -> AllPartitions
partitions State
state)
  Int -> Analytics -> IO ()
Analytics.updatePaused (Set PartitionKey -> Int
forall a. Set a -> Int
Set.size Set PartitionKey
desiredPausedPartitions) (State -> Analytics
analytics State
state)
  let newlyPaused :: Set PartitionKey
newlyPaused = Set PartitionKey -> Set PartitionKey -> Set PartitionKey
forall comparable.
Ord comparable =>
Set comparable -> Set comparable -> Set comparable
Set.diff Set PartitionKey
desiredPausedPartitions Set PartitionKey
pausedPartitions
  KafkaError
_ <- KafkaConsumer -> [PartitionKey] -> IO KafkaError
forall (m :: * -> *).
MonadIO m =>
KafkaConsumer -> [PartitionKey] -> m KafkaError
Consumer.pausePartitions KafkaConsumer
consumer (Set PartitionKey -> [PartitionKey]
forall a. Set a -> List a
Set.toList Set PartitionKey
newlyPaused)
  let newlyResumed :: Set PartitionKey
newlyResumed = Set PartitionKey -> Set PartitionKey -> Set PartitionKey
forall comparable.
Ord comparable =>
Set comparable -> Set comparable -> Set comparable
Set.diff Set PartitionKey
pausedPartitions Set PartitionKey
desiredPausedPartitions
  KafkaError
_ <- KafkaConsumer -> [PartitionKey] -> IO KafkaError
forall (m :: * -> *).
MonadIO m =>
KafkaConsumer -> [PartitionKey] -> m KafkaError
Consumer.resumePartitions KafkaConsumer
consumer (Set PartitionKey -> [PartitionKey]
forall a. Set a -> List a
Set.toList Set PartitionKey
newlyResumed)
  Int -> IO ()
Control.Concurrent.threadDelay Int
1_000_000 {- 1 second -}
  MaxMsgsPerPartitionBufferedLocally
-> KafkaConsumer -> State -> Set PartitionKey -> IO ()
pauseAndAnalyticsLoop MaxMsgsPerPartitionBufferedLocally
maxBufferSize KafkaConsumer
consumer State
state Set PartitionKey
desiredPausedPartitions

pausedPartitionKeys :: Settings.MaxMsgsPerPartitionBufferedLocally -> AllPartitions -> Prelude.IO (Set.Set PartitionKey)
pausedPartitionKeys :: MaxMsgsPerPartitionBufferedLocally
-> AllPartitions -> IO (Set PartitionKey)
pausedPartitionKeys (Settings.MaxMsgsPerPartitionBufferedLocally Int
maxBufferSize) AllPartitions
partitions = do
  Dict PartitionKey Partition
partitions' <- AllPartitions -> IO (Dict PartitionKey Partition)
forall a. TVar a -> IO a
TVar.readTVarIO AllPartitions
partitions
  Dict PartitionKey Partition
partitions'
    Dict PartitionKey Partition
-> (Dict PartitionKey Partition -> List (PartitionKey, Partition))
-> List (PartitionKey, Partition)
forall a b. a -> (a -> b) -> b
|> Dict PartitionKey Partition -> List (PartitionKey, Partition)
forall k v. Dict k v -> List (k, v)
Dict.toList
    List (PartitionKey, Partition)
-> (List (PartitionKey, Partition) -> IO [Maybe PartitionKey])
-> IO [Maybe PartitionKey]
forall a b. a -> (a -> b) -> b
|> ((PartitionKey, Partition) -> IO (Maybe PartitionKey))
-> List (PartitionKey, Partition) -> IO [Maybe PartitionKey]
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
Prelude.traverse
      ( \(PartitionKey
key, Partition
partition) -> do
          Maybe Int
maybeLen <- Partition -> IO (Maybe Int)
Partition.length Partition
partition
          Maybe PartitionKey -> IO (Maybe PartitionKey)
forall (f :: * -> *) a. Applicative f => a -> f a
Prelude.pure
            (Maybe PartitionKey -> IO (Maybe PartitionKey))
-> Maybe PartitionKey -> IO (Maybe PartitionKey)
forall a b. (a -> b) -> a -> b
<| case Maybe Int
maybeLen of
              Maybe Int
Nothing -> Maybe PartitionKey
forall a. Maybe a
Nothing
              Just Int
length ->
                if Int
length Int -> Int -> Bool
forall comparable.
Ord comparable =>
comparable -> comparable -> Bool
> Int
maxBufferSize
                  then PartitionKey -> Maybe PartitionKey
forall a. a -> Maybe a
Just PartitionKey
key
                  else Maybe PartitionKey
forall a. Maybe a
Nothing
      )
    IO [Maybe PartitionKey]
-> (IO [Maybe PartitionKey] -> IO (Set PartitionKey))
-> IO (Set PartitionKey)
forall a b. a -> (a -> b) -> b
|> ([Maybe PartitionKey] -> Set PartitionKey)
-> IO [Maybe PartitionKey] -> IO (Set PartitionKey)
forall (m :: * -> *) a value.
Functor m =>
(a -> value) -> m a -> m value
map ((Maybe PartitionKey -> Maybe PartitionKey)
-> [Maybe PartitionKey] -> [PartitionKey]
forall a b. (a -> Maybe b) -> List a -> List b
List.filterMap Maybe PartitionKey -> Maybe PartitionKey
forall a. a -> a
identity ([Maybe PartitionKey] -> [PartitionKey])
-> ([PartitionKey] -> Set PartitionKey)
-> [Maybe PartitionKey]
-> Set PartitionKey
forall a b c. (a -> b) -> (b -> c) -> a -> c
>> [PartitionKey] -> Set PartitionKey
forall comparable.
Ord comparable =>
List comparable -> Set comparable
Set.fromList)

quitSignals :: [Signals.Signal]
quitSignals :: [Signal]
quitSignals =
  [ Signal
Signals.sigINT, -- ctrl-c
    Signal
Signals.sigQUIT, -- ctrl-\ ???
    Signal
Signals.sigTERM
  ]

onQuitSignal :: Prelude.IO () -> Prelude.IO ()
onQuitSignal :: IO () -> IO ()
onQuitSignal IO ()
release = do
  let handleQuit :: Signal -> IO Handler
handleQuit Signal
signal =
        Signal -> Handler -> Maybe SignalSet -> IO Handler
Signals.installHandler
          Signal
signal
          (IO () -> Handler
Signals.Catch IO ()
release)
          Maybe SignalSet
forall a. Maybe a
Nothing
  [Handler]
_ <- (Signal -> IO Handler) -> [Signal] -> IO [Handler]
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
Prelude.traverse Signal -> IO Handler
handleQuit [Signal]
quitSignals
  () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
Prelude.pure ()