{-# LANGUAGE GADTs #-}

module Kafka.Worker.Partition
  ( spawnWorkerThread,
    append,
    length,
    revoke,
    ConsumerRecord,
    Partition,
    MessageCallback (..),
    SeekCmd (..),
    CommitOffsets (..),
    -- just exported for tests
    microSecondsDelayForAttempt,
    OnStartup (OnStartup),
    OnCleanup (OnCleanup),
  )
where

import qualified Control.Concurrent
import qualified Control.Concurrent.Async as Async
import qualified Control.Concurrent.STM as STM
import qualified Control.Concurrent.STM.TVar as TVar
import qualified Control.Exception.Safe as Exception
import qualified Data.Aeson as Aeson
import qualified Data.ByteString as ByteString
import qualified Data.Sequence as Seq
import qualified Data.Text.Encoding
import qualified Data.Time.Clock as Clock
import qualified Data.Time.Clock.POSIX as Clock.POSIX
import qualified Data.UUID
import qualified Data.UUID.V4
import qualified GHC.Clock
import qualified Kafka.Consumer as Consumer
import qualified Kafka.Internal as Internal
import qualified Kafka.Worker.Analytics as Analytics
import qualified Kafka.Worker.Settings as Settings
import qualified Kafka.Worker.Stopping as Stopping
import qualified Log.Kafka
import qualified Observability
import qualified Platform
import qualified Prelude

data WorkerError e
  = WorkerCallbackFailed e
  | WorkerCallbackThrew Exception.SomeException
  | MsgDecodingFailed Text
  | SeekFailed Consumer.KafkaError
  deriving (Int -> WorkerError e -> ShowS
[WorkerError e] -> ShowS
WorkerError e -> String
(Int -> WorkerError e -> ShowS)
-> (WorkerError e -> String)
-> ([WorkerError e] -> ShowS)
-> Show (WorkerError e)
forall e. Show e => Int -> WorkerError e -> ShowS
forall e. Show e => [WorkerError e] -> ShowS
forall e. Show e => WorkerError e -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [WorkerError e] -> ShowS
$cshowList :: forall e. Show e => [WorkerError e] -> ShowS
show :: WorkerError e -> String
$cshow :: forall e. Show e => WorkerError e -> String
showsPrec :: Int -> WorkerError e -> ShowS
$cshowsPrec :: forall e. Show e => Int -> WorkerError e -> ShowS
Show)

data State = State
  { State -> Analytics
analytics :: Analytics.Analytics,
    State -> Stopping
stopping :: Stopping.Stopping,
    State -> Partition
partition :: Partition
  }

type ConsumerRecord = Consumer.ConsumerRecord (Maybe ByteString.ByteString) (Maybe ByteString.ByteString)

newtype ProcessAttemptsCount = ProcessAttemptsCount Int
  deriving (Integer -> ProcessAttemptsCount
ProcessAttemptsCount -> ProcessAttemptsCount
ProcessAttemptsCount
-> ProcessAttemptsCount -> ProcessAttemptsCount
(ProcessAttemptsCount
 -> ProcessAttemptsCount -> ProcessAttemptsCount)
-> (ProcessAttemptsCount
    -> ProcessAttemptsCount -> ProcessAttemptsCount)
-> (ProcessAttemptsCount
    -> ProcessAttemptsCount -> ProcessAttemptsCount)
-> (ProcessAttemptsCount -> ProcessAttemptsCount)
-> (ProcessAttemptsCount -> ProcessAttemptsCount)
-> (ProcessAttemptsCount -> ProcessAttemptsCount)
-> (Integer -> ProcessAttemptsCount)
-> Num ProcessAttemptsCount
forall a.
(a -> a -> a)
-> (a -> a -> a)
-> (a -> a -> a)
-> (a -> a)
-> (a -> a)
-> (a -> a)
-> (Integer -> a)
-> Num a
fromInteger :: Integer -> ProcessAttemptsCount
$cfromInteger :: Integer -> ProcessAttemptsCount
signum :: ProcessAttemptsCount -> ProcessAttemptsCount
$csignum :: ProcessAttemptsCount -> ProcessAttemptsCount
abs :: ProcessAttemptsCount -> ProcessAttemptsCount
$cabs :: ProcessAttemptsCount -> ProcessAttemptsCount
negate :: ProcessAttemptsCount -> ProcessAttemptsCount
$cnegate :: ProcessAttemptsCount -> ProcessAttemptsCount
* :: ProcessAttemptsCount
-> ProcessAttemptsCount -> ProcessAttemptsCount
$c* :: ProcessAttemptsCount
-> ProcessAttemptsCount -> ProcessAttemptsCount
- :: ProcessAttemptsCount
-> ProcessAttemptsCount -> ProcessAttemptsCount
$c- :: ProcessAttemptsCount
-> ProcessAttemptsCount -> ProcessAttemptsCount
+ :: ProcessAttemptsCount
-> ProcessAttemptsCount -> ProcessAttemptsCount
$c+ :: ProcessAttemptsCount
-> ProcessAttemptsCount -> ProcessAttemptsCount
Num)

newtype Partition = Partition (TVar.TVar Backlog)

-- | SeekCmd is the expected response of the MessageCallback.
-- Return NoSeek if processing succeeded and the offset was correct.
-- Return SeekToOffset with the expected offset if the offset of the message
-- processed was wrong (this only makes sense when Kafka isn't managing the offset)
-- The MessageCallback will throw if proccessing throws.
data SeekCmd
  = NoSeek
  | SeekToOffset Int

-- for each partition, we keep a local backlog of messages we've read from Kafka
-- that are not yet processed
data Backlog
  = AwaitingSeekTo Int
  | -- | Use a `Sequence` type to store records. It's a bit like a list, but unlike
    -- lists it has O(1) appends to both ends, and an O(1) length function as well.
    -- We use this a lot considering we're appending new messages on one end, and
    -- processes messages from the other.
    Assigned (Seq.Seq (ProcessAttemptsCount, ConsumerRecord))
  | -- we set the Backlog to this to tell the partition's thread to stop
    Stopping

-- using these types as a proxy for named parameters
-- this is a function that runs with the partition once it's started
newtype OnStartup = OnStartup (Partition -> Prelude.IO ())

-- using this types as a proxy for named parameters
-- this is a function that runs with the partition when it's done to cleanup
newtype OnCleanup = OnCleanup (Prelude.IO ())

-- | MessageCallback is used by your worker to process messages peeled off the queue.
data MessageCallback where
  MessageCallback ::
    (Show e, Aeson.ToJSON msg, Aeson.FromJSON msg) =>
    (Consumer.ConsumerRecord () () -> msg -> Task e SeekCmd) ->
    MessageCallback

data CommitOffsets
  = ToKafka
  | -- | Commit offsets elsewhere. Takes the offset of the last committed
    -- message so we can skip old messages.
    Elsewhere Int

-- | A thread that processes messages for a particular partition. Cleans itself
-- up if it ever runs out.
--
-- When processing a message fails we have a couple of non-appealing options:
--
-- - We could throw an error. That would kill the worker process, preventing it
--   from working on unrelated partitions that might be fine.
-- - We can skip the message. This might be okay for some domains, but we cannot
--   generally assume ignorning messages is fine.
-- - We can park the message in a separate topic for processing later (a dead
--   letter queue) This might be fine for some domains, but we cannot generally
--   assume handling messages out of order is fine.
-- - We can retry until it works. If we're lucky this will get the message
--   unstuck, but some logic errors (such as JSON decoding errors) won't go
--   away by themselves and will require new code to be deployed. Until a retry
--   succeeds we'll be blocked from handling any messages on the same partition.
--
-- The retrying solution is unsatisfying, but at least is simple to implement,
-- and will not attempt to fix matters in a way that might also make them worse.
--
-- In domains where we skipping messages or resubmitting them out of order is
-- okay we already have the option of passing in a callback function that
-- catches and handles errors itself and always succeeds.
--
-- In case message order needs to be maintainted, there might be strategies we
-- can devise where being blocked on one key would allow processing on other
-- keys within the same partition to continue. It's not clear however how often
-- a logic error will affect only some keys in a partition and not others, and
-- so whether it's worth it to expend the effort and complexity to implement
-- such a scheme. As we run this code we'll gather data that can help us decide.
spawnWorkerThread ::
  Settings.SkipOrNot ->
  CommitOffsets ->
  Observability.Handler ->
  Analytics.Analytics ->
  Stopping.Stopping ->
  Consumer.KafkaConsumer ->
  MessageCallback ->
  OnStartup ->
  OnCleanup ->
  Prelude.IO ()
spawnWorkerThread :: SkipOrNot
-> CommitOffsets
-> Handler
-> Analytics
-> Stopping
-> KafkaConsumer
-> MessageCallback
-> OnStartup
-> OnCleanup
-> IO ()
spawnWorkerThread SkipOrNot
skipOrNot CommitOffsets
commitOffsets Handler
observabilityHandler Analytics
analytics Stopping
stopping KafkaConsumer
consumer MessageCallback
callback (OnStartup Partition -> IO ()
onStartup) (OnCleanup IO ()
onCleanup) = do
  -- Synchronously create the queue that will come to contain messages for the
  -- partition. This way we'll be able to start receiving messages for this
  -- partition as soon as this function returns, even if the processing thread
  -- we start below still needs boot.
  Partition
partition <-
    (TVar Backlog -> Partition) -> IO (TVar Backlog) -> IO Partition
forall (m :: * -> *) a value.
Functor m =>
(a -> value) -> m a -> m value
map TVar Backlog -> Partition
Partition (IO (TVar Backlog) -> IO Partition)
-> IO (TVar Backlog) -> IO Partition
forall a b. (a -> b) -> a -> b
<| Backlog -> IO (TVar Backlog)
forall a. a -> IO (TVar a)
TVar.newTVarIO
      (Backlog -> IO (TVar Backlog)) -> Backlog -> IO (TVar Backlog)
forall a b. (a -> b) -> a -> b
<| case CommitOffsets
commitOffsets of
        CommitOffsets
ToKafka -> Seq (ProcessAttemptsCount, ConsumerRecord) -> Backlog
Assigned Seq (ProcessAttemptsCount, ConsumerRecord)
forall a. Seq a
Seq.empty
        Elsewhere Int
offset -> Int -> Backlog
AwaitingSeekTo Int
offset
  Partition -> IO ()
onStartup Partition
partition
  IO () -> IO () -> IO ()
forall (m :: * -> *) a b. MonadMask m => m a -> m b -> m a
Exception.finally
    (SkipOrNot
-> CommitOffsets
-> Handler
-> State
-> KafkaConsumer
-> MessageCallback
-> IO ()
processMsgLoop SkipOrNot
skipOrNot CommitOffsets
commitOffsets Handler
observabilityHandler State :: Analytics -> Stopping -> Partition -> State
State {Analytics
analytics :: Analytics
analytics :: Analytics
analytics, Stopping
stopping :: Stopping
stopping :: Stopping
stopping, Partition
partition :: Partition
partition :: Partition
partition} KafkaConsumer
consumer MessageCallback
callback)
    IO ()
onCleanup
    IO () -> (IO () -> IO (Async ())) -> IO (Async ())
forall a b. a -> (a -> b) -> b
|> IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
Async.async
    -- If the async process spawned here throws an exception, rethrow it
    -- in the main thread so we bring down the worker.
    --
    -- We take care to prevent exceptions from the user-provided callback
    -- to bring down the processing thread. Should an exception slip
    -- through somehow, or should the code in this module produce one,
    -- that could result in a partition thread being killed without us
    -- knowing, and without it being restarted. This would result in no
    -- messages for this partition being processed.
    --
    -- Linking the partition processing threads to the main one will
    -- ensure that if one thread goes down, they all go down. We'll get a
    -- loud crash, which isn't nice, but at least we'll know something bad
    -- happened.
    IO (Async ()) -> (IO (Async ()) -> IO ()) -> IO ()
forall a b. a -> (a -> b) -> b
|> (Async () -> IO ()) -> IO (Async ()) -> IO ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
andThen Async () -> IO ()
forall a. Async a -> IO ()
Async.link

processMsgLoop ::
  Settings.SkipOrNot ->
  CommitOffsets ->
  Observability.Handler ->
  State ->
  Consumer.KafkaConsumer ->
  MessageCallback ->
  Prelude.IO ()
processMsgLoop :: SkipOrNot
-> CommitOffsets
-> Handler
-> State
-> KafkaConsumer
-> MessageCallback
-> IO ()
processMsgLoop SkipOrNot
skipOrNot CommitOffsets
commitOffsets Handler
observabilityHandler State
state KafkaConsumer
consumer callback :: MessageCallback
callback@(MessageCallback ConsumerRecord () () -> msg -> Task e SeekCmd
runCallback) = do
  -- # Get the next message from the queue.
  PollResponse
peekResponse <- State -> IO PollResponse
peekRecord State
state
  case PollResponse
peekResponse of
    PollResponse
StopThread ->
      () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
Prelude.pure ()
    (NextMsg ProcessAttemptsCount
processAttempts ConsumerRecord
record) -> do
      case ProcessAttemptsCount
processAttempts of
        (ProcessAttemptsCount Int
0) -> () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
Prelude.pure ()
        (ProcessAttemptsCount Int
attempts) ->
          -- Wait a bit if this is a retry, to prevent putting a lot of retry
          -- stress on downstream systems or generating huge numbers of error
          -- messages.
          Int -> Int
microSecondsDelayForAttempt Int
attempts
            Int -> (Int -> Int) -> Int
forall a b. a -> (a -> b) -> b
|> Int -> Int
forall a b. (Integral a, Num b) => a -> b
Prelude.fromIntegral
            Int -> (Int -> IO ()) -> IO ()
forall a b. a -> (a -> b) -> b
|> Int -> IO ()
Control.Concurrent.threadDelay

      Handler
doAnything <- IO Handler
Platform.doAnythingHandler
      let commit :: SeekCmd -> Task (WorkerError e) ()
commit SeekCmd
processResult =
            case SeekCmd
processResult of
              SeekToOffset Int
offset ->
                Partition -> Int -> IO ()
awaitingSeekTo (State -> Partition
partition State
state) Int
offset
                  IO ()
-> (IO () -> IO (Result (WorkerError e) ()))
-> IO (Result (WorkerError e) ())
forall a b. a -> (a -> b) -> b
|> (() -> Result (WorkerError e) ())
-> IO () -> IO (Result (WorkerError e) ())
forall (m :: * -> *) a value.
Functor m =>
(a -> value) -> m a -> m value
map () -> Result (WorkerError e) ()
forall error value. value -> Result error value
Ok
                  IO (Result (WorkerError e) ())
-> (IO (Result (WorkerError e) ()) -> Task (WorkerError e) ())
-> Task (WorkerError e) ()
forall a b. a -> (a -> b) -> b
|> Handler
-> IO (Result (WorkerError e) ()) -> Task (WorkerError e) ()
forall e a. Handler -> IO (Result e a) -> Task e a
Platform.doAnything Handler
doAnything
              SeekCmd
NoSeek -> do
                -- Still around? That means things must have gone well. Let's mark
                -- this message as succesfully processed.
                case CommitOffsets
commitOffsets of
                  CommitOffsets
ToKafka ->
                    Handler
-> KafkaConsumer -> ConsumerRecord -> Task (WorkerError e) ()
forall e. Handler -> KafkaConsumer -> ConsumerRecord -> Task e ()
commitRecord Handler
doAnything KafkaConsumer
consumer ConsumerRecord
record
                  Elsewhere Int
_ ->
                    -- The user of the Kafka module is responsible for
                    -- comitting the offsets. To help them do so we pass
                    -- them the record in the callback function.
                    --
                    -- It's important the module user can determine when to
                    -- commit, for example to allow them to commit as part
                    -- of a larger database transaction as part of an
                    -- exactly-once-delivery scheme.
                    () -> Task (WorkerError e) ()
forall (f :: * -> *) a. Applicative f => a -> f a
Prelude.pure ()

                -- finally, let's remove it from the queue
                Partition -> ConsumerRecord -> IO ()
dequeueRecord (State -> Partition
partition State
state) ConsumerRecord
record
                  IO ()
-> (IO () -> IO (Result (WorkerError e) ()))
-> IO (Result (WorkerError e) ())
forall a b. a -> (a -> b) -> b
|> (() -> Result (WorkerError e) ())
-> IO () -> IO (Result (WorkerError e) ())
forall (m :: * -> *) a value.
Functor m =>
(a -> value) -> m a -> m value
map () -> Result (WorkerError e) ()
forall error value. value -> Result error value
Ok
                  IO (Result (WorkerError e) ())
-> (IO (Result (WorkerError e) ()) -> Task (WorkerError e) ())
-> Task (WorkerError e) ()
forall a b. a -> (a -> b) -> b
|> Handler
-> IO (Result (WorkerError e) ()) -> Task (WorkerError e) ()
forall e a. Handler -> IO (Result e a) -> Task e a
Platform.doAnything Handler
doAnything

      -- # Process message.
      (RequestId Text
requestId, Details
details) <- Analytics
-> ProcessAttemptsCount
-> ConsumerRecord
-> IO (RequestId, Details)
getTracingDetails (State -> Analytics
analytics State
state) ProcessAttemptsCount
processAttempts ConsumerRecord
record
      Text
-> (TracingSpan -> IO ()) -> Text -> (LogHandler -> IO ()) -> IO ()
forall a.
HasCallStack =>
Text
-> (TracingSpan -> IO ()) -> Text -> (LogHandler -> IO a) -> IO a
Platform.rootTracingSpanIO
        Text
requestId
        (Handler -> Text -> TracingSpan -> IO ()
Observability.report Handler
observabilityHandler Text
requestId)
        Text
"Assigned Kafka message"
        ( \LogHandler
log -> do
            -- Setting the tracing details first. If anything goes wrong below
            -- at least we'll have nice context in logs!
            LogHandler -> Details -> IO ()
LogHandler -> forall d. TracingSpanDetails d => d -> IO ()
Platform.setTracingSpanDetailsIO LogHandler
log Details
details
            LogHandler -> Task (WorkerError e) () -> IO ()
forall e a. Show e => LogHandler -> Task (WorkerError e) a -> IO ()
handleFailures LogHandler
log (Task (WorkerError e) () -> IO ())
-> Task (WorkerError e) () -> IO ()
forall a b. (a -> b) -> a -> b
<| do
              msg
msg <- ConsumerRecord -> Task (WorkerError e) msg
forall msg e.
FromJSON msg =>
ConsumerRecord -> Task (WorkerError e) msg
decodeMessage ConsumerRecord
record
              ConsumerRecord () () -> msg -> Task e SeekCmd
runCallback ConsumerRecord
record {crKey :: ()
Consumer.crKey = (), crValue :: ()
Consumer.crValue = ()} msg
msg
                Task e SeekCmd
-> (Task e SeekCmd -> Task (WorkerError e) SeekCmd)
-> Task (WorkerError e) SeekCmd
forall a b. a -> (a -> b) -> b
|> (e -> WorkerError e)
-> Task e SeekCmd -> Task (WorkerError e) SeekCmd
forall x y a. (x -> y) -> Task x a -> Task y a
Task.mapError e -> WorkerError e
forall e. e -> WorkerError e
WorkerCallbackFailed
                Task (WorkerError e) SeekCmd
-> (Task (WorkerError e) SeekCmd -> Task (WorkerError e) SeekCmd)
-> Task (WorkerError e) SeekCmd
forall a b. a -> (a -> b) -> b
|> (WorkerError e -> Task (WorkerError e) SeekCmd)
-> Task (WorkerError e) SeekCmd -> Task (WorkerError e) SeekCmd
forall x y a. (x -> Task y a) -> Task x a -> Task y a
Task.onError
                  ( \WorkerError e
err -> do
                      case SkipOrNot
skipOrNot of
                        SkipOrNot
Settings.Skip -> SeekCmd -> Task (WorkerError e) SeekCmd
forall a x. a -> Task x a
Task.succeed SeekCmd
NoSeek
                        SkipOrNot
Settings.DoNotSkip -> WorkerError e -> Task (WorkerError e) SeekCmd
forall x a. x -> Task x a
Task.fail WorkerError e
err
                  )
                Task (WorkerError e) SeekCmd
-> (Task (WorkerError e) SeekCmd -> Task (WorkerError e) ())
-> Task (WorkerError e) ()
forall a b. a -> (a -> b) -> b
|> (SeekCmd -> Task (WorkerError e) ())
-> Task (WorkerError e) SeekCmd -> Task (WorkerError e) ()
forall a x b. (a -> Task x b) -> Task x a -> Task x b
Task.andThen SeekCmd -> Task (WorkerError e) ()
commit
        )

      -- # Loop for the next message
      SkipOrNot
-> CommitOffsets
-> Handler
-> State
-> KafkaConsumer
-> MessageCallback
-> IO ()
processMsgLoop
        SkipOrNot
skipOrNot
        CommitOffsets
commitOffsets
        Handler
observabilityHandler
        State
state
        KafkaConsumer
consumer
        MessageCallback
callback

microSecondsDelayForAttempt :: Int -> Int
microSecondsDelayForAttempt :: Int -> Int
microSecondsDelayForAttempt Int
attempts =
  Int -> Int -> Int
forall comparable.
Ord comparable =>
comparable -> comparable -> comparable
min
    Int
3_600_000_000 {- 1 hour in microseconds -}
    ((Int
10 Int -> Int -> Int
forall a b. (Num a, Integral b) => a -> b -> a
Prelude.^ Int
attempts) Int -> Int -> Int
forall number. Num number => number -> number -> number
* Int
1000_000 {- 1 second in microseconds -})

handleFailures ::
  Show e =>
  Platform.LogHandler ->
  Task (WorkerError e) a ->
  Prelude.IO ()
handleFailures :: LogHandler -> Task (WorkerError e) a -> IO ()
handleFailures LogHandler
logHandler Task (WorkerError e) a
task = do
  Result (WorkerError e) a
result <-
    -- Catch any synchronous exceptions the callback might have thrown, to
    -- prevent them from propagating further and killing the entire worker
    -- process.
    (SomeException -> IO (Result (WorkerError e) a))
-> IO (Result (WorkerError e) a) -> IO (Result (WorkerError e) a)
forall (m :: * -> *) a.
MonadCatch m =>
(SomeException -> m a) -> m a -> m a
Exception.handleAny
      (Result (WorkerError e) a -> IO (Result (WorkerError e) a)
forall (f :: * -> *) a. Applicative f => a -> f a
Prelude.pure (Result (WorkerError e) a -> IO (Result (WorkerError e) a))
-> (WorkerError e -> Result (WorkerError e) a)
-> WorkerError e
-> IO (Result (WorkerError e) a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
<< WorkerError e -> Result (WorkerError e) a
forall error value. error -> Result error value
Err (WorkerError e -> IO (Result (WorkerError e) a))
-> (SomeException -> WorkerError e)
-> SomeException
-> IO (Result (WorkerError e) a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
<< SomeException -> WorkerError e
forall e. SomeException -> WorkerError e
WorkerCallbackThrew)
      (LogHandler
-> Task (WorkerError e) a -> IO (Result (WorkerError e) a)
forall x a. LogHandler -> Task x a -> IO (Result x a)
Task.attempt LogHandler
logHandler Task (WorkerError e) a
task)
  case Result (WorkerError e) a
result of
    Ok a
_ -> () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
Prelude.pure ()
    Err WorkerError e
err -> do
      Text -> [Context] -> Task Never ()
forall e. HasCallStack => Text -> [Context] -> Task e ()
Log.error
        Text
"Assigned Kafka message failed"
        [ Text -> Text -> Context
forall a. (Show a, ToJSON a) => Text -> a -> Context
Log.context Text
"triage" (Text
"We'll automatically attempt to retry processing of the message. Until a retry succeeds no messages for the same topic partition as the failing message can be processed." :: Text),
          Text -> Text -> Context
forall a. (Show a, ToJSON a) => Text -> a -> Context
Log.context Text
"error" (WorkerError e -> Text
forall a. Show a => a -> Text
Debug.toString WorkerError e
err)
        ]
        Task Never () -> (Task Never () -> IO ()) -> IO ()
forall a b. a -> (a -> b) -> b
|> LogHandler -> Task Never () -> IO ()
forall a. LogHandler -> Task Never a -> IO a
Task.perform LogHandler
logHandler

newtype RequestId = RequestId Text

getTracingDetails ::
  Analytics.Analytics ->
  ProcessAttemptsCount ->
  ConsumerRecord ->
  Prelude.IO (RequestId, Log.Kafka.Details)
getTracingDetails :: Analytics
-> ProcessAttemptsCount
-> ConsumerRecord
-> IO (RequestId, Details)
getTracingDetails Analytics
analytics (ProcessAttemptsCount Int
processAttempt) ConsumerRecord
record = do
  let (Maybe UTCTime
createTime, Maybe UTCTime
logAppendTime) =
        case ConsumerRecord -> Timestamp
forall k v. ConsumerRecord k v -> Timestamp
Consumer.crTimestamp ConsumerRecord
record of
          Consumer.CreateTime Millis
millis -> (UTCTime -> Maybe UTCTime
forall a. a -> Maybe a
Just (Millis -> UTCTime
millisToSecs Millis
millis), Maybe UTCTime
forall a. Maybe a
Nothing)
          Consumer.LogAppendTime Millis
millis -> (Maybe UTCTime
forall a. Maybe a
Nothing, UTCTime -> Maybe UTCTime
forall a. a -> Maybe a
Just (Millis -> UTCTime
millisToSecs Millis
millis))
          Timestamp
Consumer.NoTimestamp -> (Maybe UTCTime
forall a. Maybe a
Nothing, Maybe UTCTime
forall a. Maybe a
Nothing)
  let eitherMsg :: Either String MsgWithMetaData
eitherMsg =
        ConsumerRecord -> Maybe ByteString
forall k v. ConsumerRecord k v -> v
Consumer.crValue ConsumerRecord
record
          -- We'll accept the absence of a message if the worker expects a message
          -- of type `()`. The default JSON encoding for `()` is "[]".
          Maybe ByteString -> (Maybe ByteString -> ByteString) -> ByteString
forall a b. a -> (a -> b) -> b
|> ByteString -> Maybe ByteString -> ByteString
forall a. a -> Maybe a -> a
Maybe.withDefault ByteString
"[]"
          ByteString
-> (ByteString -> Either String MsgWithMetaData)
-> Either String MsgWithMetaData
forall a b. a -> (a -> b) -> b
|> ByteString -> Either String MsgWithMetaData
forall a. FromJSON a => ByteString -> Either String a
Aeson.eitherDecodeStrict
  let (Maybe Text
requestId, Contents
contents) = case Either String MsgWithMetaData
eitherMsg of
        Prelude.Right Internal.MsgWithMetaData {MetaData
metaData :: MsgWithMetaData -> MetaData
metaData :: MetaData
Internal.metaData, Encodable
value :: MsgWithMetaData -> Encodable
value :: Encodable
Internal.value} ->
          (Text -> Maybe Text
forall a. a -> Maybe a
Just (MetaData -> Text
Internal.requestId MetaData
metaData), Encodable -> Contents
forall a. ToJSON a => a -> Contents
Log.Kafka.mkContents Encodable
value)
        Prelude.Left String
_ ->
          ( Maybe Text
forall a. Maybe a
Nothing,
            ConsumerRecord -> Maybe ByteString
forall k v. ConsumerRecord k v -> v
Consumer.crValue ConsumerRecord
record
              Maybe ByteString -> (Maybe ByteString -> Maybe Text) -> Maybe Text
forall a b. a -> (a -> b) -> b
|> (ByteString -> Maybe Text) -> Maybe ByteString -> Maybe Text
forall a b. (a -> Maybe b) -> Maybe a -> Maybe b
Maybe.andThen (ByteString -> Either UnicodeException Text
Data.Text.Encoding.decodeUtf8' (ByteString -> Either UnicodeException Text)
-> (Either UnicodeException Text -> Maybe Text)
-> ByteString
-> Maybe Text
forall a b c. (a -> b) -> (b -> c) -> a -> c
>> (UnicodeException -> Maybe Text)
-> (Text -> Maybe Text)
-> Either UnicodeException Text
-> Maybe Text
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
Prelude.either (\UnicodeException
_ -> Maybe Text
forall a. Maybe a
Nothing) Text -> Maybe Text
forall a. a -> Maybe a
Just)
              Maybe Text -> (Maybe Text -> Contents) -> Contents
forall a b. a -> (a -> b) -> b
|> Maybe Text -> Contents
forall a. ToJSON a => a -> Contents
Log.Kafka.mkContents
          )
  ( Analytics.PausedPartitions Int
pausedPartitions,
    Analytics.AssignedPartitions Int
assignedPartitions,
    Analytics.TimeOfLastRebalance Float
timeOfLastRebalance
    ) <-
    Analytics
-> IO (PausedPartitions, AssignedPartitions, TimeOfLastRebalance)
Analytics.read Analytics
analytics
  Float
now <- IO Float
GHC.Clock.getMonotonicTime
  let timeSinceLastRebalance :: Float
timeSinceLastRebalance = Float
now Float -> Float -> Float
forall number. Num number => number -> number -> number
- Float
timeOfLastRebalance
  RequestId
requestIdForReturn <-
    case Maybe Text
requestId of
      Maybe Text
Nothing ->
        -- if the message doens't contain a request id, create a new one
        (UUID -> Text) -> IO UUID -> IO Text
forall (m :: * -> *) a value.
Functor m =>
(a -> value) -> m a -> m value
map UUID -> Text
Data.UUID.toText IO UUID
Data.UUID.V4.nextRandom
      Just Text
requestId' -> Text -> IO Text
forall (f :: * -> *) a. Applicative f => a -> f a
Prelude.pure Text
requestId'
      IO Text -> (IO Text -> IO RequestId) -> IO RequestId
forall a b. a -> (a -> b) -> b
|> (Text -> RequestId) -> IO Text -> IO RequestId
forall (m :: * -> *) a value.
Functor m =>
(a -> value) -> m a -> m value
map Text -> RequestId
RequestId
  (RequestId, Details) -> IO (RequestId, Details)
forall (f :: * -> *) a. Applicative f => a -> f a
Prelude.pure
    ( RequestId
requestIdForReturn,
      Details
Log.Kafka.emptyDetails
        { topic :: Maybe Text
Log.Kafka.topic = Text -> Maybe Text
forall a. a -> Maybe a
Just (TopicName -> Text
Consumer.unTopicName (ConsumerRecord -> TopicName
forall k v. ConsumerRecord k v -> TopicName
Consumer.crTopic ConsumerRecord
record)),
          partitionId :: Maybe Int
Log.Kafka.partitionId = Int -> Maybe Int
forall a. a -> Maybe a
Just (Int -> Int
forall a b. (Integral a, Num b) => a -> b
Prelude.fromIntegral (PartitionId -> Int
Consumer.unPartitionId (ConsumerRecord -> PartitionId
forall k v. ConsumerRecord k v -> PartitionId
Consumer.crPartition ConsumerRecord
record))),
          key :: Maybe Text
Log.Kafka.key =
            ConsumerRecord -> Maybe ByteString
forall k v. ConsumerRecord k v -> k
Consumer.crKey ConsumerRecord
record
              Maybe ByteString -> (Maybe ByteString -> Maybe Text) -> Maybe Text
forall a b. a -> (a -> b) -> b
|> (ByteString -> Maybe Text) -> Maybe ByteString -> Maybe Text
forall a b. (a -> Maybe b) -> Maybe a -> Maybe b
Maybe.andThen
                ( \ByteString
keyBytes ->
                    case ByteString -> Either UnicodeException Text
Data.Text.Encoding.decodeUtf8' ByteString
keyBytes of
                      Prelude.Left UnicodeException
_ -> Maybe Text
forall a. Maybe a
Nothing
                      Prelude.Right Text
keyText -> Text -> Maybe Text
forall a. a -> Maybe a
Just Text
keyText
                ),
          contents :: Maybe Contents
Log.Kafka.contents = Contents -> Maybe Contents
forall a. a -> Maybe a
Just Contents
contents,
          processAttempt :: Maybe Int
Log.Kafka.processAttempt = Int -> Maybe Int
forall a. a -> Maybe a
Just Int
processAttempt,
          Maybe UTCTime
createTime :: Maybe UTCTime
createTime :: Maybe UTCTime
Log.Kafka.createTime,
          assignedPartitions :: Maybe Int
Log.Kafka.assignedPartitions = Int -> Maybe Int
forall a. a -> Maybe a
Just Int
assignedPartitions,
          pausedPartitions :: Maybe Int
Log.Kafka.pausedPartitions = Int -> Maybe Int
forall a. a -> Maybe a
Just Int
pausedPartitions,
          timeSinceLastRebalance :: Maybe Float
Log.Kafka.timeSinceLastRebalance = Float -> Maybe Float
forall a. a -> Maybe a
Just Float
timeSinceLastRebalance,
          Maybe UTCTime
logAppendTime :: Maybe UTCTime
logAppendTime :: Maybe UTCTime
Log.Kafka.logAppendTime,
          Maybe Text
requestId :: Maybe Text
requestId :: Maybe Text
Log.Kafka.requestId
        }
    )

millisToSecs :: Consumer.Millis -> Clock.UTCTime
millisToSecs :: Millis -> UTCTime
millisToSecs (Consumer.Millis Int
millis) = Int -> UTCTime
fromPosix (Int
millis Int -> Int -> Int
// Int
1000)

decodeMessage :: (Aeson.FromJSON msg) => ConsumerRecord -> Task (WorkerError e) msg
decodeMessage :: ConsumerRecord -> Task (WorkerError e) msg
decodeMessage ConsumerRecord
record = do
  let eitherMsg :: Either String MsgWithMetaData
eitherMsg =
        ConsumerRecord -> Maybe ByteString
forall k v. ConsumerRecord k v -> v
Consumer.crValue ConsumerRecord
record
          -- We'll accept the absence of a message if the worker expects a message
          -- of type `()`. The default JSON encoding for `()` is "[]".
          Maybe ByteString -> (Maybe ByteString -> ByteString) -> ByteString
forall a b. a -> (a -> b) -> b
|> ByteString -> Maybe ByteString -> ByteString
forall a. a -> Maybe a -> a
Maybe.withDefault ByteString
"[]"
          ByteString
-> (ByteString -> Either String MsgWithMetaData)
-> Either String MsgWithMetaData
forall a b. a -> (a -> b) -> b
|> ByteString -> Either String MsgWithMetaData
forall a. FromJSON a => ByteString -> Either String a
Aeson.eitherDecodeStrict
  case Either String MsgWithMetaData
eitherMsg of
    Prelude.Left String
err ->
      WorkerError e -> Task (WorkerError e) msg
forall x a. x -> Task x a
Task.fail (Text -> WorkerError e
forall e. Text -> WorkerError e
MsgDecodingFailed (String -> Text
Text.fromList String
err))
    Prelude.Right MsgWithMetaData
msgWithMetaData ->
      case MsgWithMetaData -> Encodable
Internal.value MsgWithMetaData
msgWithMetaData of
        (Internal.Encodable a
value) ->
          case Value -> Result msg
forall a. FromJSON a => Value -> Result a
Aeson.fromJSON (a -> Value
forall a. ToJSON a => a -> Value
Aeson.toJSON a
value) of
            Aeson.Error String
err ->
              WorkerError e -> Task (WorkerError e) msg
forall x a. x -> Task x a
Task.fail (Text -> WorkerError e
forall e. Text -> WorkerError e
MsgDecodingFailed (String -> Text
Text.fromList String
err))
            Aeson.Success msg
msg ->
              msg -> Task (WorkerError e) msg
forall a x. a -> Task x a
Task.succeed msg
msg

commitRecord ::
  Platform.DoAnythingHandler ->
  Consumer.KafkaConsumer ->
  ConsumerRecord ->
  Task e ()
commitRecord :: Handler -> KafkaConsumer -> ConsumerRecord -> Task e ()
commitRecord Handler
doAnything KafkaConsumer
consumer ConsumerRecord
record = do
  Maybe KafkaError
commitResult <-
    OffsetCommit
-> KafkaConsumer -> ConsumerRecord -> IO (Maybe KafkaError)
forall (m :: * -> *) k v.
MonadIO m =>
OffsetCommit
-> KafkaConsumer -> ConsumerRecord k v -> m (Maybe KafkaError)
Consumer.commitOffsetMessage OffsetCommit
Consumer.OffsetCommit KafkaConsumer
consumer ConsumerRecord
record
      IO (Maybe KafkaError)
-> (IO (Maybe KafkaError) -> IO (Result e (Maybe KafkaError)))
-> IO (Result e (Maybe KafkaError))
forall a b. a -> (a -> b) -> b
|> (Maybe KafkaError -> Result e (Maybe KafkaError))
-> IO (Maybe KafkaError) -> IO (Result e (Maybe KafkaError))
forall (m :: * -> *) a value.
Functor m =>
(a -> value) -> m a -> m value
map Maybe KafkaError -> Result e (Maybe KafkaError)
forall error value. value -> Result error value
Ok
      IO (Result e (Maybe KafkaError))
-> (IO (Result e (Maybe KafkaError)) -> Task e (Maybe KafkaError))
-> Task e (Maybe KafkaError)
forall a b. a -> (a -> b) -> b
|> Handler
-> IO (Result e (Maybe KafkaError)) -> Task e (Maybe KafkaError)
forall e a. Handler -> IO (Result e a) -> Task e a
Platform.doAnything Handler
doAnything
  case Maybe KafkaError
commitResult of
    Just KafkaError
err ->
      Text -> [Context] -> Task e ()
forall e. HasCallStack => Text -> [Context] -> Task e ()
Log.error
        Text
"Failed to commit offset to Kafka after succesfully processing message."
        [ Text -> Text -> Context
forall a. (Show a, ToJSON a) => Text -> a -> Context
Log.context Text
"err" (KafkaError -> Text
forall a. Show a => a -> Text
Debug.toString KafkaError
err),
          Text -> Text -> Context
forall a. (Show a, ToJSON a) => Text -> a -> Context
Log.context Text
"context" (Text
"We failed to commit progress on the message, which means there is a risk of us processing it again. If the message is not idempotent this will be a problem. If we see a lot of these errors it might mean no commits are happening at all, in which cases our queues are not making forward progress." :: Text)
        ]
    Maybe KafkaError
Nothing -> () -> Task e ()
forall a x. a -> Task x a
Task.succeed ()

data PollResponse
  = NextMsg ProcessAttemptsCount ConsumerRecord
  | StopThread

-- | Read the next message for a particular partition, but keep it on the local
-- queue. We should only remove the message if we finish processing it.
peekRecord ::
  State ->
  Prelude.IO PollResponse
peekRecord :: State -> IO PollResponse
peekRecord State
state =
  Stopping -> PollResponse -> IO PollResponse -> IO PollResponse
forall a. Stopping -> a -> IO a -> IO a
Stopping.runUnlessStopping
    (State -> Stopping
stopping State
state)
    PollResponse
StopThread
    ( STM PollResponse -> IO PollResponse
forall a. STM a -> IO a
STM.atomically
        (STM PollResponse -> IO PollResponse)
-> STM PollResponse -> IO PollResponse
forall a b. (a -> b) -> a -> b
<| do
          let (Partition TVar Backlog
partition') = State -> Partition
partition State
state
          Backlog
backlog' <- TVar Backlog -> STM Backlog
forall a. TVar a -> STM a
TVar.readTVar TVar Backlog
partition'
          case Backlog
backlog' of
            AwaitingSeekTo Int
_ ->
              STM PollResponse
forall a. STM a
STM.retry
            Backlog
Stopping -> do
              PollResponse -> STM PollResponse
forall (f :: * -> *) a. Applicative f => a -> f a
Prelude.pure PollResponse
StopThread
            Assigned Seq (ProcessAttemptsCount, ConsumerRecord)
Seq.Empty ->
              STM PollResponse
forall a. STM a
STM.retry
            Assigned ((ProcessAttemptsCount
processAttemptsCount, ConsumerRecord
first) Seq.:<| Seq (ProcessAttemptsCount, ConsumerRecord)
rest) -> do
              -- Bump the retry count so that the next time we read this message, we
              -- know we've read it before.
              TVar Backlog -> Backlog -> STM ()
forall a. TVar a -> a -> STM ()
TVar.writeTVar
                TVar Backlog
partition'
                (Seq (ProcessAttemptsCount, ConsumerRecord) -> Backlog
Assigned ((ProcessAttemptsCount
processAttemptsCount ProcessAttemptsCount
-> ProcessAttemptsCount -> ProcessAttemptsCount
forall number. Num number => number -> number -> number
+ ProcessAttemptsCount
1, ConsumerRecord
first) (ProcessAttemptsCount, ConsumerRecord)
-> Seq (ProcessAttemptsCount, ConsumerRecord)
-> Seq (ProcessAttemptsCount, ConsumerRecord)
forall a. a -> Seq a -> Seq a
Seq.:<| Seq (ProcessAttemptsCount, ConsumerRecord)
rest))
              PollResponse -> STM PollResponse
forall (f :: * -> *) a. Applicative f => a -> f a
Prelude.pure (ProcessAttemptsCount -> ConsumerRecord -> PollResponse
NextMsg ProcessAttemptsCount
processAttemptsCount ConsumerRecord
first)
    )

awaitingSeekTo :: Partition -> Int -> Prelude.IO ()
awaitingSeekTo :: Partition -> Int -> IO ()
awaitingSeekTo (Partition TVar Backlog
partition) Int
offset =
  STM () -> IO ()
forall a. STM a -> IO a
STM.atomically (TVar Backlog -> Backlog -> STM ()
forall a. TVar a -> a -> STM ()
TVar.writeTVar TVar Backlog
partition (Int -> Backlog
AwaitingSeekTo Int
offset))

-- | removes the record from the Backlog if it's still assigned
-- if not assigned, doesn't matter, the current thread will die in the next
-- loop
dequeueRecord ::
  Partition ->
  ConsumerRecord ->
  Prelude.IO ()
dequeueRecord :: Partition -> ConsumerRecord -> IO ()
dequeueRecord (Partition TVar Backlog
partition) ConsumerRecord
record =
  STM () -> IO ()
forall a. STM a -> IO a
STM.atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
<| do
    Backlog
backlog' <- TVar Backlog -> STM Backlog
forall a. TVar a -> STM a
TVar.readTVar TVar Backlog
partition
    case Backlog
backlog' of
      AwaitingSeekTo Int
_ ->
        () -> STM ()
forall (f :: * -> *) a. Applicative f => a -> f a
Prelude.pure ()
      Backlog
Stopping ->
        () -> STM ()
forall (f :: * -> *) a. Applicative f => a -> f a
Prelude.pure ()
      Assigned Seq (ProcessAttemptsCount, ConsumerRecord)
Seq.Empty ->
        () -> STM ()
forall (f :: * -> *) a. Applicative f => a -> f a
Prelude.pure ()
      Assigned ((ProcessAttemptsCount
_, ConsumerRecord
first) Seq.:<| Seq (ProcessAttemptsCount, ConsumerRecord)
rest) -> do
        if ConsumerRecord -> Offset
forall k v. ConsumerRecord k v -> Offset
Consumer.crOffset ConsumerRecord
first Offset -> Offset -> Bool
forall a. Eq a => a -> a -> Bool
== ConsumerRecord -> Offset
forall k v. ConsumerRecord k v -> Offset
Consumer.crOffset ConsumerRecord
record
          then TVar Backlog -> Backlog -> STM ()
forall a. TVar a -> a -> STM ()
TVar.writeTVar TVar Backlog
partition (Seq (ProcessAttemptsCount, ConsumerRecord) -> Backlog
Assigned Seq (ProcessAttemptsCount, ConsumerRecord)
rest)
          else -- why would this ever be the case??? should we log here?
            () -> STM ()
forall (f :: * -> *) a. Applicative f => a -> f a
Prelude.pure ()

append :: ConsumerRecord -> Partition -> STM.STM SeekCmd
append :: ConsumerRecord -> Partition -> STM SeekCmd
append ConsumerRecord
item (Partition TVar Backlog
partition) =
  TVar Backlog -> (Backlog -> (SeekCmd, Backlog)) -> STM SeekCmd
forall s a. TVar s -> (s -> (a, s)) -> STM a
TVar.stateTVar
    TVar Backlog
partition
    ( \Backlog
queue' ->
        case Backlog
queue' of
          AwaitingSeekTo Int
offset ->
            if Int
offset Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Offset -> Int
Consumer.unOffset (ConsumerRecord -> Offset
forall k v. ConsumerRecord k v -> Offset
Consumer.crOffset ConsumerRecord
item)
              then
                ( SeekCmd
NoSeek,
                  Seq (ProcessAttemptsCount, ConsumerRecord) -> Backlog
Assigned ((ProcessAttemptsCount, ConsumerRecord)
-> Seq (ProcessAttemptsCount, ConsumerRecord)
forall a. a -> Seq a
Seq.singleton (Int -> ProcessAttemptsCount
ProcessAttemptsCount Int
0, ConsumerRecord
item))
                )
              else
                ( Int -> SeekCmd
SeekToOffset Int
offset,
                  Int -> Backlog
AwaitingSeekTo Int
offset
                )
          Backlog
Stopping -> (SeekCmd
NoSeek, Backlog
Stopping)
          Assigned Seq (ProcessAttemptsCount, ConsumerRecord)
queue ->
            ( SeekCmd
NoSeek,
              Seq (ProcessAttemptsCount, ConsumerRecord) -> Backlog
Assigned (Seq (ProcessAttemptsCount, ConsumerRecord)
queue Seq (ProcessAttemptsCount, ConsumerRecord)
-> (ProcessAttemptsCount, ConsumerRecord)
-> Seq (ProcessAttemptsCount, ConsumerRecord)
forall a. Seq a -> a -> Seq a
Seq.:|> (Int -> ProcessAttemptsCount
ProcessAttemptsCount Int
0, ConsumerRecord
item))
            )
    )

length :: Partition -> Prelude.IO (Maybe Int)
length :: Partition -> IO (Maybe Int)
length (Partition TVar Backlog
partition) = do
  Backlog
backlog <- TVar Backlog -> IO Backlog
forall a. TVar a -> IO a
TVar.readTVarIO TVar Backlog
partition
  Maybe Int -> IO (Maybe Int)
forall (f :: * -> *) a. Applicative f => a -> f a
Prelude.pure
    (Maybe Int -> IO (Maybe Int)) -> Maybe Int -> IO (Maybe Int)
forall a b. (a -> b) -> a -> b
<| case Backlog
backlog of
      AwaitingSeekTo Int
_ -> Maybe Int
forall a. Maybe a
Nothing
      Backlog
Stopping -> Maybe Int
forall a. Maybe a
Nothing
      Assigned Seq (ProcessAttemptsCount, ConsumerRecord)
queue ->
        Int -> Maybe Int
forall a. a -> Maybe a
Just (Int -> Int
forall a b. (Integral a, Num b) => a -> b
Prelude.fromIntegral (Seq (ProcessAttemptsCount, ConsumerRecord) -> Int
forall a. Seq a -> Int
Seq.length Seq (ProcessAttemptsCount, ConsumerRecord)
queue))

revoke :: Partition -> STM.STM ()
revoke :: Partition -> STM ()
revoke (Partition TVar Backlog
partition) = TVar Backlog -> Backlog -> STM ()
forall a. TVar a -> a -> STM ()
TVar.writeTVar TVar Backlog
partition Backlog
Stopping

-- | Create a time from a posix timestamp, a number of seconds since the Linux
-- epoch. This provides us a way to create constant timetamps for tests.
fromPosix :: Int -> Clock.UTCTime
fromPosix :: Int -> UTCTime
fromPosix Int
secondsSinceEpoch =
  Int
secondsSinceEpoch
    Int -> (Int -> POSIXTime) -> POSIXTime
forall a b. a -> (a -> b) -> b
|> Int -> POSIXTime
forall a b. (Integral a, Num b) => a -> b
Prelude.fromIntegral
    POSIXTime -> (POSIXTime -> UTCTime) -> UTCTime
forall a b. a -> (a -> b) -> b
|> POSIXTime -> UTCTime
Clock.POSIX.posixSecondsToUTCTime