{-# LANGUAGE GADTs #-}
module Kafka.Worker.Partition
( spawnWorkerThread,
append,
length,
revoke,
ConsumerRecord,
Partition,
MessageCallback (..),
SeekCmd (..),
CommitOffsets (..),
microSecondsDelayForAttempt,
OnStartup (OnStartup),
OnCleanup (OnCleanup),
)
where
import qualified Control.Concurrent
import qualified Control.Concurrent.Async as Async
import qualified Control.Concurrent.STM as STM
import qualified Control.Concurrent.STM.TVar as TVar
import qualified Control.Exception.Safe as Exception
import qualified Data.Aeson as Aeson
import qualified Data.ByteString as ByteString
import qualified Data.Sequence as Seq
import qualified Data.Text.Encoding
import qualified Data.Time.Clock as Clock
import qualified Data.Time.Clock.POSIX as Clock.POSIX
import qualified Data.UUID
import qualified Data.UUID.V4
import qualified GHC.Clock
import qualified Kafka.Consumer as Consumer
import qualified Kafka.Internal as Internal
import qualified Kafka.Worker.Analytics as Analytics
import qualified Kafka.Worker.Settings as Settings
import qualified Kafka.Worker.Stopping as Stopping
import qualified Log.Kafka
import qualified Observability
import qualified Platform
import qualified Prelude
data WorkerError e
= WorkerCallbackFailed e
| WorkerCallbackThrew Exception.SomeException
| MsgDecodingFailed Text
| SeekFailed Consumer.KafkaError
deriving (Int -> WorkerError e -> ShowS
[WorkerError e] -> ShowS
WorkerError e -> String
(Int -> WorkerError e -> ShowS)
-> (WorkerError e -> String)
-> ([WorkerError e] -> ShowS)
-> Show (WorkerError e)
forall e. Show e => Int -> WorkerError e -> ShowS
forall e. Show e => [WorkerError e] -> ShowS
forall e. Show e => WorkerError e -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [WorkerError e] -> ShowS
$cshowList :: forall e. Show e => [WorkerError e] -> ShowS
show :: WorkerError e -> String
$cshow :: forall e. Show e => WorkerError e -> String
showsPrec :: Int -> WorkerError e -> ShowS
$cshowsPrec :: forall e. Show e => Int -> WorkerError e -> ShowS
Show)
data State = State
{ State -> Analytics
analytics :: Analytics.Analytics,
State -> Stopping
stopping :: Stopping.Stopping,
State -> Partition
partition :: Partition
}
type ConsumerRecord = Consumer.ConsumerRecord (Maybe ByteString.ByteString) (Maybe ByteString.ByteString)
newtype ProcessAttemptsCount = ProcessAttemptsCount Int
deriving (Integer -> ProcessAttemptsCount
ProcessAttemptsCount -> ProcessAttemptsCount
ProcessAttemptsCount
-> ProcessAttemptsCount -> ProcessAttemptsCount
(ProcessAttemptsCount
-> ProcessAttemptsCount -> ProcessAttemptsCount)
-> (ProcessAttemptsCount
-> ProcessAttemptsCount -> ProcessAttemptsCount)
-> (ProcessAttemptsCount
-> ProcessAttemptsCount -> ProcessAttemptsCount)
-> (ProcessAttemptsCount -> ProcessAttemptsCount)
-> (ProcessAttemptsCount -> ProcessAttemptsCount)
-> (ProcessAttemptsCount -> ProcessAttemptsCount)
-> (Integer -> ProcessAttemptsCount)
-> Num ProcessAttemptsCount
forall a.
(a -> a -> a)
-> (a -> a -> a)
-> (a -> a -> a)
-> (a -> a)
-> (a -> a)
-> (a -> a)
-> (Integer -> a)
-> Num a
fromInteger :: Integer -> ProcessAttemptsCount
$cfromInteger :: Integer -> ProcessAttemptsCount
signum :: ProcessAttemptsCount -> ProcessAttemptsCount
$csignum :: ProcessAttemptsCount -> ProcessAttemptsCount
abs :: ProcessAttemptsCount -> ProcessAttemptsCount
$cabs :: ProcessAttemptsCount -> ProcessAttemptsCount
negate :: ProcessAttemptsCount -> ProcessAttemptsCount
$cnegate :: ProcessAttemptsCount -> ProcessAttemptsCount
* :: ProcessAttemptsCount
-> ProcessAttemptsCount -> ProcessAttemptsCount
$c* :: ProcessAttemptsCount
-> ProcessAttemptsCount -> ProcessAttemptsCount
- :: ProcessAttemptsCount
-> ProcessAttemptsCount -> ProcessAttemptsCount
$c- :: ProcessAttemptsCount
-> ProcessAttemptsCount -> ProcessAttemptsCount
+ :: ProcessAttemptsCount
-> ProcessAttemptsCount -> ProcessAttemptsCount
$c+ :: ProcessAttemptsCount
-> ProcessAttemptsCount -> ProcessAttemptsCount
Num)
newtype Partition = Partition (TVar.TVar Backlog)
data SeekCmd
= NoSeek
| SeekToOffset Int
data Backlog
= AwaitingSeekTo Int
|
Assigned (Seq.Seq (ProcessAttemptsCount, ConsumerRecord))
|
Stopping
newtype OnStartup = OnStartup (Partition -> Prelude.IO ())
newtype OnCleanup = OnCleanup (Prelude.IO ())
data MessageCallback where
MessageCallback ::
(Show e, Aeson.ToJSON msg, Aeson.FromJSON msg) =>
(Consumer.ConsumerRecord () () -> msg -> Task e SeekCmd) ->
MessageCallback
data CommitOffsets
= ToKafka
|
Elsewhere Int
spawnWorkerThread ::
Settings.SkipOrNot ->
CommitOffsets ->
Observability.Handler ->
Analytics.Analytics ->
Stopping.Stopping ->
Consumer.KafkaConsumer ->
MessageCallback ->
OnStartup ->
OnCleanup ->
Prelude.IO ()
spawnWorkerThread :: SkipOrNot
-> CommitOffsets
-> Handler
-> Analytics
-> Stopping
-> KafkaConsumer
-> MessageCallback
-> OnStartup
-> OnCleanup
-> IO ()
spawnWorkerThread SkipOrNot
skipOrNot CommitOffsets
commitOffsets Handler
observabilityHandler Analytics
analytics Stopping
stopping KafkaConsumer
consumer MessageCallback
callback (OnStartup Partition -> IO ()
onStartup) (OnCleanup IO ()
onCleanup) = do
Partition
partition <-
(TVar Backlog -> Partition) -> IO (TVar Backlog) -> IO Partition
forall (m :: * -> *) a value.
Functor m =>
(a -> value) -> m a -> m value
map TVar Backlog -> Partition
Partition (IO (TVar Backlog) -> IO Partition)
-> IO (TVar Backlog) -> IO Partition
forall a b. (a -> b) -> a -> b
<| Backlog -> IO (TVar Backlog)
forall a. a -> IO (TVar a)
TVar.newTVarIO
(Backlog -> IO (TVar Backlog)) -> Backlog -> IO (TVar Backlog)
forall a b. (a -> b) -> a -> b
<| case CommitOffsets
commitOffsets of
CommitOffsets
ToKafka -> Seq (ProcessAttemptsCount, ConsumerRecord) -> Backlog
Assigned Seq (ProcessAttemptsCount, ConsumerRecord)
forall a. Seq a
Seq.empty
Elsewhere Int
offset -> Int -> Backlog
AwaitingSeekTo Int
offset
Partition -> IO ()
onStartup Partition
partition
IO () -> IO () -> IO ()
forall (m :: * -> *) a b. MonadMask m => m a -> m b -> m a
Exception.finally
(SkipOrNot
-> CommitOffsets
-> Handler
-> State
-> KafkaConsumer
-> MessageCallback
-> IO ()
processMsgLoop SkipOrNot
skipOrNot CommitOffsets
commitOffsets Handler
observabilityHandler State :: Analytics -> Stopping -> Partition -> State
State {Analytics
analytics :: Analytics
analytics :: Analytics
analytics, Stopping
stopping :: Stopping
stopping :: Stopping
stopping, Partition
partition :: Partition
partition :: Partition
partition} KafkaConsumer
consumer MessageCallback
callback)
IO ()
onCleanup
IO () -> (IO () -> IO (Async ())) -> IO (Async ())
forall a b. a -> (a -> b) -> b
|> IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
Async.async
IO (Async ()) -> (IO (Async ()) -> IO ()) -> IO ()
forall a b. a -> (a -> b) -> b
|> (Async () -> IO ()) -> IO (Async ()) -> IO ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
andThen Async () -> IO ()
forall a. Async a -> IO ()
Async.link
processMsgLoop ::
Settings.SkipOrNot ->
CommitOffsets ->
Observability.Handler ->
State ->
Consumer.KafkaConsumer ->
MessageCallback ->
Prelude.IO ()
processMsgLoop :: SkipOrNot
-> CommitOffsets
-> Handler
-> State
-> KafkaConsumer
-> MessageCallback
-> IO ()
processMsgLoop SkipOrNot
skipOrNot CommitOffsets
commitOffsets Handler
observabilityHandler State
state KafkaConsumer
consumer callback :: MessageCallback
callback@(MessageCallback ConsumerRecord () () -> msg -> Task e SeekCmd
runCallback) = do
PollResponse
peekResponse <- State -> IO PollResponse
peekRecord State
state
case PollResponse
peekResponse of
PollResponse
StopThread ->
() -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
Prelude.pure ()
(NextMsg ProcessAttemptsCount
processAttempts ConsumerRecord
record) -> do
case ProcessAttemptsCount
processAttempts of
(ProcessAttemptsCount Int
0) -> () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
Prelude.pure ()
(ProcessAttemptsCount Int
attempts) ->
Int -> Int
microSecondsDelayForAttempt Int
attempts
Int -> (Int -> Int) -> Int
forall a b. a -> (a -> b) -> b
|> Int -> Int
forall a b. (Integral a, Num b) => a -> b
Prelude.fromIntegral
Int -> (Int -> IO ()) -> IO ()
forall a b. a -> (a -> b) -> b
|> Int -> IO ()
Control.Concurrent.threadDelay
Handler
doAnything <- IO Handler
Platform.doAnythingHandler
let commit :: SeekCmd -> Task (WorkerError e) ()
commit SeekCmd
processResult =
case SeekCmd
processResult of
SeekToOffset Int
offset ->
Partition -> Int -> IO ()
awaitingSeekTo (State -> Partition
partition State
state) Int
offset
IO ()
-> (IO () -> IO (Result (WorkerError e) ()))
-> IO (Result (WorkerError e) ())
forall a b. a -> (a -> b) -> b
|> (() -> Result (WorkerError e) ())
-> IO () -> IO (Result (WorkerError e) ())
forall (m :: * -> *) a value.
Functor m =>
(a -> value) -> m a -> m value
map () -> Result (WorkerError e) ()
forall error value. value -> Result error value
Ok
IO (Result (WorkerError e) ())
-> (IO (Result (WorkerError e) ()) -> Task (WorkerError e) ())
-> Task (WorkerError e) ()
forall a b. a -> (a -> b) -> b
|> Handler
-> IO (Result (WorkerError e) ()) -> Task (WorkerError e) ()
forall e a. Handler -> IO (Result e a) -> Task e a
Platform.doAnything Handler
doAnything
SeekCmd
NoSeek -> do
case CommitOffsets
commitOffsets of
CommitOffsets
ToKafka ->
Handler
-> KafkaConsumer -> ConsumerRecord -> Task (WorkerError e) ()
forall e. Handler -> KafkaConsumer -> ConsumerRecord -> Task e ()
commitRecord Handler
doAnything KafkaConsumer
consumer ConsumerRecord
record
Elsewhere Int
_ ->
() -> Task (WorkerError e) ()
forall (f :: * -> *) a. Applicative f => a -> f a
Prelude.pure ()
Partition -> ConsumerRecord -> IO ()
dequeueRecord (State -> Partition
partition State
state) ConsumerRecord
record
IO ()
-> (IO () -> IO (Result (WorkerError e) ()))
-> IO (Result (WorkerError e) ())
forall a b. a -> (a -> b) -> b
|> (() -> Result (WorkerError e) ())
-> IO () -> IO (Result (WorkerError e) ())
forall (m :: * -> *) a value.
Functor m =>
(a -> value) -> m a -> m value
map () -> Result (WorkerError e) ()
forall error value. value -> Result error value
Ok
IO (Result (WorkerError e) ())
-> (IO (Result (WorkerError e) ()) -> Task (WorkerError e) ())
-> Task (WorkerError e) ()
forall a b. a -> (a -> b) -> b
|> Handler
-> IO (Result (WorkerError e) ()) -> Task (WorkerError e) ()
forall e a. Handler -> IO (Result e a) -> Task e a
Platform.doAnything Handler
doAnything
(RequestId Text
requestId, Details
details) <- Analytics
-> ProcessAttemptsCount
-> ConsumerRecord
-> IO (RequestId, Details)
getTracingDetails (State -> Analytics
analytics State
state) ProcessAttemptsCount
processAttempts ConsumerRecord
record
Text
-> (TracingSpan -> IO ()) -> Text -> (LogHandler -> IO ()) -> IO ()
forall a.
HasCallStack =>
Text
-> (TracingSpan -> IO ()) -> Text -> (LogHandler -> IO a) -> IO a
Platform.rootTracingSpanIO
Text
requestId
(Handler -> Text -> TracingSpan -> IO ()
Observability.report Handler
observabilityHandler Text
requestId)
Text
"Assigned Kafka message"
( \LogHandler
log -> do
LogHandler -> Details -> IO ()
LogHandler -> forall d. TracingSpanDetails d => d -> IO ()
Platform.setTracingSpanDetailsIO LogHandler
log Details
details
LogHandler -> Task (WorkerError e) () -> IO ()
forall e a. Show e => LogHandler -> Task (WorkerError e) a -> IO ()
handleFailures LogHandler
log (Task (WorkerError e) () -> IO ())
-> Task (WorkerError e) () -> IO ()
forall a b. (a -> b) -> a -> b
<| do
msg
msg <- ConsumerRecord -> Task (WorkerError e) msg
forall msg e.
FromJSON msg =>
ConsumerRecord -> Task (WorkerError e) msg
decodeMessage ConsumerRecord
record
ConsumerRecord () () -> msg -> Task e SeekCmd
runCallback ConsumerRecord
record {crKey :: ()
Consumer.crKey = (), crValue :: ()
Consumer.crValue = ()} msg
msg
Task e SeekCmd
-> (Task e SeekCmd -> Task (WorkerError e) SeekCmd)
-> Task (WorkerError e) SeekCmd
forall a b. a -> (a -> b) -> b
|> (e -> WorkerError e)
-> Task e SeekCmd -> Task (WorkerError e) SeekCmd
forall x y a. (x -> y) -> Task x a -> Task y a
Task.mapError e -> WorkerError e
forall e. e -> WorkerError e
WorkerCallbackFailed
Task (WorkerError e) SeekCmd
-> (Task (WorkerError e) SeekCmd -> Task (WorkerError e) SeekCmd)
-> Task (WorkerError e) SeekCmd
forall a b. a -> (a -> b) -> b
|> (WorkerError e -> Task (WorkerError e) SeekCmd)
-> Task (WorkerError e) SeekCmd -> Task (WorkerError e) SeekCmd
forall x y a. (x -> Task y a) -> Task x a -> Task y a
Task.onError
( \WorkerError e
err -> do
case SkipOrNot
skipOrNot of
SkipOrNot
Settings.Skip -> SeekCmd -> Task (WorkerError e) SeekCmd
forall a x. a -> Task x a
Task.succeed SeekCmd
NoSeek
SkipOrNot
Settings.DoNotSkip -> WorkerError e -> Task (WorkerError e) SeekCmd
forall x a. x -> Task x a
Task.fail WorkerError e
err
)
Task (WorkerError e) SeekCmd
-> (Task (WorkerError e) SeekCmd -> Task (WorkerError e) ())
-> Task (WorkerError e) ()
forall a b. a -> (a -> b) -> b
|> (SeekCmd -> Task (WorkerError e) ())
-> Task (WorkerError e) SeekCmd -> Task (WorkerError e) ()
forall a x b. (a -> Task x b) -> Task x a -> Task x b
Task.andThen SeekCmd -> Task (WorkerError e) ()
commit
)
SkipOrNot
-> CommitOffsets
-> Handler
-> State
-> KafkaConsumer
-> MessageCallback
-> IO ()
processMsgLoop
SkipOrNot
skipOrNot
CommitOffsets
commitOffsets
Handler
observabilityHandler
State
state
KafkaConsumer
consumer
MessageCallback
callback
microSecondsDelayForAttempt :: Int -> Int
microSecondsDelayForAttempt :: Int -> Int
microSecondsDelayForAttempt Int
attempts =
Int -> Int -> Int
forall comparable.
Ord comparable =>
comparable -> comparable -> comparable
min
Int
3_600_000_000
((Int
10 Int -> Int -> Int
forall a b. (Num a, Integral b) => a -> b -> a
Prelude.^ Int
attempts) Int -> Int -> Int
forall number. Num number => number -> number -> number
* Int
1000_000 )
handleFailures ::
Show e =>
Platform.LogHandler ->
Task (WorkerError e) a ->
Prelude.IO ()
handleFailures :: LogHandler -> Task (WorkerError e) a -> IO ()
handleFailures LogHandler
logHandler Task (WorkerError e) a
task = do
Result (WorkerError e) a
result <-
(SomeException -> IO (Result (WorkerError e) a))
-> IO (Result (WorkerError e) a) -> IO (Result (WorkerError e) a)
forall (m :: * -> *) a.
MonadCatch m =>
(SomeException -> m a) -> m a -> m a
Exception.handleAny
(Result (WorkerError e) a -> IO (Result (WorkerError e) a)
forall (f :: * -> *) a. Applicative f => a -> f a
Prelude.pure (Result (WorkerError e) a -> IO (Result (WorkerError e) a))
-> (WorkerError e -> Result (WorkerError e) a)
-> WorkerError e
-> IO (Result (WorkerError e) a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
<< WorkerError e -> Result (WorkerError e) a
forall error value. error -> Result error value
Err (WorkerError e -> IO (Result (WorkerError e) a))
-> (SomeException -> WorkerError e)
-> SomeException
-> IO (Result (WorkerError e) a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
<< SomeException -> WorkerError e
forall e. SomeException -> WorkerError e
WorkerCallbackThrew)
(LogHandler
-> Task (WorkerError e) a -> IO (Result (WorkerError e) a)
forall x a. LogHandler -> Task x a -> IO (Result x a)
Task.attempt LogHandler
logHandler Task (WorkerError e) a
task)
case Result (WorkerError e) a
result of
Ok a
_ -> () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
Prelude.pure ()
Err WorkerError e
err -> do
Text -> [Context] -> Task Never ()
forall e. HasCallStack => Text -> [Context] -> Task e ()
Log.error
Text
"Assigned Kafka message failed"
[ Text -> Text -> Context
forall a. (Show a, ToJSON a) => Text -> a -> Context
Log.context Text
"triage" (Text
"We'll automatically attempt to retry processing of the message. Until a retry succeeds no messages for the same topic partition as the failing message can be processed." :: Text),
Text -> Text -> Context
forall a. (Show a, ToJSON a) => Text -> a -> Context
Log.context Text
"error" (WorkerError e -> Text
forall a. Show a => a -> Text
Debug.toString WorkerError e
err)
]
Task Never () -> (Task Never () -> IO ()) -> IO ()
forall a b. a -> (a -> b) -> b
|> LogHandler -> Task Never () -> IO ()
forall a. LogHandler -> Task Never a -> IO a
Task.perform LogHandler
logHandler
newtype RequestId = RequestId Text
getTracingDetails ::
Analytics.Analytics ->
ProcessAttemptsCount ->
ConsumerRecord ->
Prelude.IO (RequestId, Log.Kafka.Details)
getTracingDetails :: Analytics
-> ProcessAttemptsCount
-> ConsumerRecord
-> IO (RequestId, Details)
getTracingDetails Analytics
analytics (ProcessAttemptsCount Int
processAttempt) ConsumerRecord
record = do
let (Maybe UTCTime
createTime, Maybe UTCTime
logAppendTime) =
case ConsumerRecord -> Timestamp
forall k v. ConsumerRecord k v -> Timestamp
Consumer.crTimestamp ConsumerRecord
record of
Consumer.CreateTime Millis
millis -> (UTCTime -> Maybe UTCTime
forall a. a -> Maybe a
Just (Millis -> UTCTime
millisToSecs Millis
millis), Maybe UTCTime
forall a. Maybe a
Nothing)
Consumer.LogAppendTime Millis
millis -> (Maybe UTCTime
forall a. Maybe a
Nothing, UTCTime -> Maybe UTCTime
forall a. a -> Maybe a
Just (Millis -> UTCTime
millisToSecs Millis
millis))
Timestamp
Consumer.NoTimestamp -> (Maybe UTCTime
forall a. Maybe a
Nothing, Maybe UTCTime
forall a. Maybe a
Nothing)
let eitherMsg :: Either String MsgWithMetaData
eitherMsg =
ConsumerRecord -> Maybe ByteString
forall k v. ConsumerRecord k v -> v
Consumer.crValue ConsumerRecord
record
Maybe ByteString -> (Maybe ByteString -> ByteString) -> ByteString
forall a b. a -> (a -> b) -> b
|> ByteString -> Maybe ByteString -> ByteString
forall a. a -> Maybe a -> a
Maybe.withDefault ByteString
"[]"
ByteString
-> (ByteString -> Either String MsgWithMetaData)
-> Either String MsgWithMetaData
forall a b. a -> (a -> b) -> b
|> ByteString -> Either String MsgWithMetaData
forall a. FromJSON a => ByteString -> Either String a
Aeson.eitherDecodeStrict
let (Maybe Text
requestId, Contents
contents) = case Either String MsgWithMetaData
eitherMsg of
Prelude.Right Internal.MsgWithMetaData {MetaData
metaData :: MsgWithMetaData -> MetaData
metaData :: MetaData
Internal.metaData, Encodable
value :: MsgWithMetaData -> Encodable
value :: Encodable
Internal.value} ->
(Text -> Maybe Text
forall a. a -> Maybe a
Just (MetaData -> Text
Internal.requestId MetaData
metaData), Encodable -> Contents
forall a. ToJSON a => a -> Contents
Log.Kafka.mkContents Encodable
value)
Prelude.Left String
_ ->
( Maybe Text
forall a. Maybe a
Nothing,
ConsumerRecord -> Maybe ByteString
forall k v. ConsumerRecord k v -> v
Consumer.crValue ConsumerRecord
record
Maybe ByteString -> (Maybe ByteString -> Maybe Text) -> Maybe Text
forall a b. a -> (a -> b) -> b
|> (ByteString -> Maybe Text) -> Maybe ByteString -> Maybe Text
forall a b. (a -> Maybe b) -> Maybe a -> Maybe b
Maybe.andThen (ByteString -> Either UnicodeException Text
Data.Text.Encoding.decodeUtf8' (ByteString -> Either UnicodeException Text)
-> (Either UnicodeException Text -> Maybe Text)
-> ByteString
-> Maybe Text
forall a b c. (a -> b) -> (b -> c) -> a -> c
>> (UnicodeException -> Maybe Text)
-> (Text -> Maybe Text)
-> Either UnicodeException Text
-> Maybe Text
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
Prelude.either (\UnicodeException
_ -> Maybe Text
forall a. Maybe a
Nothing) Text -> Maybe Text
forall a. a -> Maybe a
Just)
Maybe Text -> (Maybe Text -> Contents) -> Contents
forall a b. a -> (a -> b) -> b
|> Maybe Text -> Contents
forall a. ToJSON a => a -> Contents
Log.Kafka.mkContents
)
( Analytics.PausedPartitions Int
pausedPartitions,
Analytics.AssignedPartitions Int
assignedPartitions,
Analytics.TimeOfLastRebalance Float
timeOfLastRebalance
) <-
Analytics
-> IO (PausedPartitions, AssignedPartitions, TimeOfLastRebalance)
Analytics.read Analytics
analytics
Float
now <- IO Float
GHC.Clock.getMonotonicTime
let timeSinceLastRebalance :: Float
timeSinceLastRebalance = Float
now Float -> Float -> Float
forall number. Num number => number -> number -> number
- Float
timeOfLastRebalance
RequestId
requestIdForReturn <-
case Maybe Text
requestId of
Maybe Text
Nothing ->
(UUID -> Text) -> IO UUID -> IO Text
forall (m :: * -> *) a value.
Functor m =>
(a -> value) -> m a -> m value
map UUID -> Text
Data.UUID.toText IO UUID
Data.UUID.V4.nextRandom
Just Text
requestId' -> Text -> IO Text
forall (f :: * -> *) a. Applicative f => a -> f a
Prelude.pure Text
requestId'
IO Text -> (IO Text -> IO RequestId) -> IO RequestId
forall a b. a -> (a -> b) -> b
|> (Text -> RequestId) -> IO Text -> IO RequestId
forall (m :: * -> *) a value.
Functor m =>
(a -> value) -> m a -> m value
map Text -> RequestId
RequestId
(RequestId, Details) -> IO (RequestId, Details)
forall (f :: * -> *) a. Applicative f => a -> f a
Prelude.pure
( RequestId
requestIdForReturn,
Details
Log.Kafka.emptyDetails
{ topic :: Maybe Text
Log.Kafka.topic = Text -> Maybe Text
forall a. a -> Maybe a
Just (TopicName -> Text
Consumer.unTopicName (ConsumerRecord -> TopicName
forall k v. ConsumerRecord k v -> TopicName
Consumer.crTopic ConsumerRecord
record)),
partitionId :: Maybe Int
Log.Kafka.partitionId = Int -> Maybe Int
forall a. a -> Maybe a
Just (Int -> Int
forall a b. (Integral a, Num b) => a -> b
Prelude.fromIntegral (PartitionId -> Int
Consumer.unPartitionId (ConsumerRecord -> PartitionId
forall k v. ConsumerRecord k v -> PartitionId
Consumer.crPartition ConsumerRecord
record))),
key :: Maybe Text
Log.Kafka.key =
ConsumerRecord -> Maybe ByteString
forall k v. ConsumerRecord k v -> k
Consumer.crKey ConsumerRecord
record
Maybe ByteString -> (Maybe ByteString -> Maybe Text) -> Maybe Text
forall a b. a -> (a -> b) -> b
|> (ByteString -> Maybe Text) -> Maybe ByteString -> Maybe Text
forall a b. (a -> Maybe b) -> Maybe a -> Maybe b
Maybe.andThen
( \ByteString
keyBytes ->
case ByteString -> Either UnicodeException Text
Data.Text.Encoding.decodeUtf8' ByteString
keyBytes of
Prelude.Left UnicodeException
_ -> Maybe Text
forall a. Maybe a
Nothing
Prelude.Right Text
keyText -> Text -> Maybe Text
forall a. a -> Maybe a
Just Text
keyText
),
contents :: Maybe Contents
Log.Kafka.contents = Contents -> Maybe Contents
forall a. a -> Maybe a
Just Contents
contents,
processAttempt :: Maybe Int
Log.Kafka.processAttempt = Int -> Maybe Int
forall a. a -> Maybe a
Just Int
processAttempt,
Maybe UTCTime
createTime :: Maybe UTCTime
createTime :: Maybe UTCTime
Log.Kafka.createTime,
assignedPartitions :: Maybe Int
Log.Kafka.assignedPartitions = Int -> Maybe Int
forall a. a -> Maybe a
Just Int
assignedPartitions,
pausedPartitions :: Maybe Int
Log.Kafka.pausedPartitions = Int -> Maybe Int
forall a. a -> Maybe a
Just Int
pausedPartitions,
timeSinceLastRebalance :: Maybe Float
Log.Kafka.timeSinceLastRebalance = Float -> Maybe Float
forall a. a -> Maybe a
Just Float
timeSinceLastRebalance,
Maybe UTCTime
logAppendTime :: Maybe UTCTime
logAppendTime :: Maybe UTCTime
Log.Kafka.logAppendTime,
Maybe Text
requestId :: Maybe Text
requestId :: Maybe Text
Log.Kafka.requestId
}
)
millisToSecs :: Consumer.Millis -> Clock.UTCTime
millisToSecs :: Millis -> UTCTime
millisToSecs (Consumer.Millis Int
millis) = Int -> UTCTime
fromPosix (Int
millis Int -> Int -> Int
// Int
1000)
decodeMessage :: (Aeson.FromJSON msg) => ConsumerRecord -> Task (WorkerError e) msg
decodeMessage :: ConsumerRecord -> Task (WorkerError e) msg
decodeMessage ConsumerRecord
record = do
let eitherMsg :: Either String MsgWithMetaData
eitherMsg =
ConsumerRecord -> Maybe ByteString
forall k v. ConsumerRecord k v -> v
Consumer.crValue ConsumerRecord
record
Maybe ByteString -> (Maybe ByteString -> ByteString) -> ByteString
forall a b. a -> (a -> b) -> b
|> ByteString -> Maybe ByteString -> ByteString
forall a. a -> Maybe a -> a
Maybe.withDefault ByteString
"[]"
ByteString
-> (ByteString -> Either String MsgWithMetaData)
-> Either String MsgWithMetaData
forall a b. a -> (a -> b) -> b
|> ByteString -> Either String MsgWithMetaData
forall a. FromJSON a => ByteString -> Either String a
Aeson.eitherDecodeStrict
case Either String MsgWithMetaData
eitherMsg of
Prelude.Left String
err ->
WorkerError e -> Task (WorkerError e) msg
forall x a. x -> Task x a
Task.fail (Text -> WorkerError e
forall e. Text -> WorkerError e
MsgDecodingFailed (String -> Text
Text.fromList String
err))
Prelude.Right MsgWithMetaData
msgWithMetaData ->
case MsgWithMetaData -> Encodable
Internal.value MsgWithMetaData
msgWithMetaData of
(Internal.Encodable a
value) ->
case Value -> Result msg
forall a. FromJSON a => Value -> Result a
Aeson.fromJSON (a -> Value
forall a. ToJSON a => a -> Value
Aeson.toJSON a
value) of
Aeson.Error String
err ->
WorkerError e -> Task (WorkerError e) msg
forall x a. x -> Task x a
Task.fail (Text -> WorkerError e
forall e. Text -> WorkerError e
MsgDecodingFailed (String -> Text
Text.fromList String
err))
Aeson.Success msg
msg ->
msg -> Task (WorkerError e) msg
forall a x. a -> Task x a
Task.succeed msg
msg
commitRecord ::
Platform.DoAnythingHandler ->
Consumer.KafkaConsumer ->
ConsumerRecord ->
Task e ()
commitRecord :: Handler -> KafkaConsumer -> ConsumerRecord -> Task e ()
commitRecord Handler
doAnything KafkaConsumer
consumer ConsumerRecord
record = do
Maybe KafkaError
commitResult <-
OffsetCommit
-> KafkaConsumer -> ConsumerRecord -> IO (Maybe KafkaError)
forall (m :: * -> *) k v.
MonadIO m =>
OffsetCommit
-> KafkaConsumer -> ConsumerRecord k v -> m (Maybe KafkaError)
Consumer.commitOffsetMessage OffsetCommit
Consumer.OffsetCommit KafkaConsumer
consumer ConsumerRecord
record
IO (Maybe KafkaError)
-> (IO (Maybe KafkaError) -> IO (Result e (Maybe KafkaError)))
-> IO (Result e (Maybe KafkaError))
forall a b. a -> (a -> b) -> b
|> (Maybe KafkaError -> Result e (Maybe KafkaError))
-> IO (Maybe KafkaError) -> IO (Result e (Maybe KafkaError))
forall (m :: * -> *) a value.
Functor m =>
(a -> value) -> m a -> m value
map Maybe KafkaError -> Result e (Maybe KafkaError)
forall error value. value -> Result error value
Ok
IO (Result e (Maybe KafkaError))
-> (IO (Result e (Maybe KafkaError)) -> Task e (Maybe KafkaError))
-> Task e (Maybe KafkaError)
forall a b. a -> (a -> b) -> b
|> Handler
-> IO (Result e (Maybe KafkaError)) -> Task e (Maybe KafkaError)
forall e a. Handler -> IO (Result e a) -> Task e a
Platform.doAnything Handler
doAnything
case Maybe KafkaError
commitResult of
Just KafkaError
err ->
Text -> [Context] -> Task e ()
forall e. HasCallStack => Text -> [Context] -> Task e ()
Log.error
Text
"Failed to commit offset to Kafka after succesfully processing message."
[ Text -> Text -> Context
forall a. (Show a, ToJSON a) => Text -> a -> Context
Log.context Text
"err" (KafkaError -> Text
forall a. Show a => a -> Text
Debug.toString KafkaError
err),
Text -> Text -> Context
forall a. (Show a, ToJSON a) => Text -> a -> Context
Log.context Text
"context" (Text
"We failed to commit progress on the message, which means there is a risk of us processing it again. If the message is not idempotent this will be a problem. If we see a lot of these errors it might mean no commits are happening at all, in which cases our queues are not making forward progress." :: Text)
]
Maybe KafkaError
Nothing -> () -> Task e ()
forall a x. a -> Task x a
Task.succeed ()
data PollResponse
= NextMsg ProcessAttemptsCount ConsumerRecord
| StopThread
peekRecord ::
State ->
Prelude.IO PollResponse
peekRecord :: State -> IO PollResponse
peekRecord State
state =
Stopping -> PollResponse -> IO PollResponse -> IO PollResponse
forall a. Stopping -> a -> IO a -> IO a
Stopping.runUnlessStopping
(State -> Stopping
stopping State
state)
PollResponse
StopThread
( STM PollResponse -> IO PollResponse
forall a. STM a -> IO a
STM.atomically
(STM PollResponse -> IO PollResponse)
-> STM PollResponse -> IO PollResponse
forall a b. (a -> b) -> a -> b
<| do
let (Partition TVar Backlog
partition') = State -> Partition
partition State
state
Backlog
backlog' <- TVar Backlog -> STM Backlog
forall a. TVar a -> STM a
TVar.readTVar TVar Backlog
partition'
case Backlog
backlog' of
AwaitingSeekTo Int
_ ->
STM PollResponse
forall a. STM a
STM.retry
Backlog
Stopping -> do
PollResponse -> STM PollResponse
forall (f :: * -> *) a. Applicative f => a -> f a
Prelude.pure PollResponse
StopThread
Assigned Seq (ProcessAttemptsCount, ConsumerRecord)
Seq.Empty ->
STM PollResponse
forall a. STM a
STM.retry
Assigned ((ProcessAttemptsCount
processAttemptsCount, ConsumerRecord
first) Seq.:<| Seq (ProcessAttemptsCount, ConsumerRecord)
rest) -> do
TVar Backlog -> Backlog -> STM ()
forall a. TVar a -> a -> STM ()
TVar.writeTVar
TVar Backlog
partition'
(Seq (ProcessAttemptsCount, ConsumerRecord) -> Backlog
Assigned ((ProcessAttemptsCount
processAttemptsCount ProcessAttemptsCount
-> ProcessAttemptsCount -> ProcessAttemptsCount
forall number. Num number => number -> number -> number
+ ProcessAttemptsCount
1, ConsumerRecord
first) (ProcessAttemptsCount, ConsumerRecord)
-> Seq (ProcessAttemptsCount, ConsumerRecord)
-> Seq (ProcessAttemptsCount, ConsumerRecord)
forall a. a -> Seq a -> Seq a
Seq.:<| Seq (ProcessAttemptsCount, ConsumerRecord)
rest))
PollResponse -> STM PollResponse
forall (f :: * -> *) a. Applicative f => a -> f a
Prelude.pure (ProcessAttemptsCount -> ConsumerRecord -> PollResponse
NextMsg ProcessAttemptsCount
processAttemptsCount ConsumerRecord
first)
)
awaitingSeekTo :: Partition -> Int -> Prelude.IO ()
awaitingSeekTo :: Partition -> Int -> IO ()
awaitingSeekTo (Partition TVar Backlog
partition) Int
offset =
STM () -> IO ()
forall a. STM a -> IO a
STM.atomically (TVar Backlog -> Backlog -> STM ()
forall a. TVar a -> a -> STM ()
TVar.writeTVar TVar Backlog
partition (Int -> Backlog
AwaitingSeekTo Int
offset))
dequeueRecord ::
Partition ->
ConsumerRecord ->
Prelude.IO ()
dequeueRecord :: Partition -> ConsumerRecord -> IO ()
dequeueRecord (Partition TVar Backlog
partition) ConsumerRecord
record =
STM () -> IO ()
forall a. STM a -> IO a
STM.atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
<| do
Backlog
backlog' <- TVar Backlog -> STM Backlog
forall a. TVar a -> STM a
TVar.readTVar TVar Backlog
partition
case Backlog
backlog' of
AwaitingSeekTo Int
_ ->
() -> STM ()
forall (f :: * -> *) a. Applicative f => a -> f a
Prelude.pure ()
Backlog
Stopping ->
() -> STM ()
forall (f :: * -> *) a. Applicative f => a -> f a
Prelude.pure ()
Assigned Seq (ProcessAttemptsCount, ConsumerRecord)
Seq.Empty ->
() -> STM ()
forall (f :: * -> *) a. Applicative f => a -> f a
Prelude.pure ()
Assigned ((ProcessAttemptsCount
_, ConsumerRecord
first) Seq.:<| Seq (ProcessAttemptsCount, ConsumerRecord)
rest) -> do
if ConsumerRecord -> Offset
forall k v. ConsumerRecord k v -> Offset
Consumer.crOffset ConsumerRecord
first Offset -> Offset -> Bool
forall a. Eq a => a -> a -> Bool
== ConsumerRecord -> Offset
forall k v. ConsumerRecord k v -> Offset
Consumer.crOffset ConsumerRecord
record
then TVar Backlog -> Backlog -> STM ()
forall a. TVar a -> a -> STM ()
TVar.writeTVar TVar Backlog
partition (Seq (ProcessAttemptsCount, ConsumerRecord) -> Backlog
Assigned Seq (ProcessAttemptsCount, ConsumerRecord)
rest)
else
() -> STM ()
forall (f :: * -> *) a. Applicative f => a -> f a
Prelude.pure ()
append :: ConsumerRecord -> Partition -> STM.STM SeekCmd
append :: ConsumerRecord -> Partition -> STM SeekCmd
append ConsumerRecord
item (Partition TVar Backlog
partition) =
TVar Backlog -> (Backlog -> (SeekCmd, Backlog)) -> STM SeekCmd
forall s a. TVar s -> (s -> (a, s)) -> STM a
TVar.stateTVar
TVar Backlog
partition
( \Backlog
queue' ->
case Backlog
queue' of
AwaitingSeekTo Int
offset ->
if Int
offset Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Offset -> Int
Consumer.unOffset (ConsumerRecord -> Offset
forall k v. ConsumerRecord k v -> Offset
Consumer.crOffset ConsumerRecord
item)
then
( SeekCmd
NoSeek,
Seq (ProcessAttemptsCount, ConsumerRecord) -> Backlog
Assigned ((ProcessAttemptsCount, ConsumerRecord)
-> Seq (ProcessAttemptsCount, ConsumerRecord)
forall a. a -> Seq a
Seq.singleton (Int -> ProcessAttemptsCount
ProcessAttemptsCount Int
0, ConsumerRecord
item))
)
else
( Int -> SeekCmd
SeekToOffset Int
offset,
Int -> Backlog
AwaitingSeekTo Int
offset
)
Backlog
Stopping -> (SeekCmd
NoSeek, Backlog
Stopping)
Assigned Seq (ProcessAttemptsCount, ConsumerRecord)
queue ->
( SeekCmd
NoSeek,
Seq (ProcessAttemptsCount, ConsumerRecord) -> Backlog
Assigned (Seq (ProcessAttemptsCount, ConsumerRecord)
queue Seq (ProcessAttemptsCount, ConsumerRecord)
-> (ProcessAttemptsCount, ConsumerRecord)
-> Seq (ProcessAttemptsCount, ConsumerRecord)
forall a. Seq a -> a -> Seq a
Seq.:|> (Int -> ProcessAttemptsCount
ProcessAttemptsCount Int
0, ConsumerRecord
item))
)
)
length :: Partition -> Prelude.IO (Maybe Int)
length :: Partition -> IO (Maybe Int)
length (Partition TVar Backlog
partition) = do
Backlog
backlog <- TVar Backlog -> IO Backlog
forall a. TVar a -> IO a
TVar.readTVarIO TVar Backlog
partition
Maybe Int -> IO (Maybe Int)
forall (f :: * -> *) a. Applicative f => a -> f a
Prelude.pure
(Maybe Int -> IO (Maybe Int)) -> Maybe Int -> IO (Maybe Int)
forall a b. (a -> b) -> a -> b
<| case Backlog
backlog of
AwaitingSeekTo Int
_ -> Maybe Int
forall a. Maybe a
Nothing
Backlog
Stopping -> Maybe Int
forall a. Maybe a
Nothing
Assigned Seq (ProcessAttemptsCount, ConsumerRecord)
queue ->
Int -> Maybe Int
forall a. a -> Maybe a
Just (Int -> Int
forall a b. (Integral a, Num b) => a -> b
Prelude.fromIntegral (Seq (ProcessAttemptsCount, ConsumerRecord) -> Int
forall a. Seq a -> Int
Seq.length Seq (ProcessAttemptsCount, ConsumerRecord)
queue))
revoke :: Partition -> STM.STM ()
revoke :: Partition -> STM ()
revoke (Partition TVar Backlog
partition) = TVar Backlog -> Backlog -> STM ()
forall a. TVar a -> a -> STM ()
TVar.writeTVar TVar Backlog
partition Backlog
Stopping
fromPosix :: Int -> Clock.UTCTime
fromPosix :: Int -> UTCTime
fromPosix Int
secondsSinceEpoch =
Int
secondsSinceEpoch
Int -> (Int -> POSIXTime) -> POSIXTime
forall a b. a -> (a -> b) -> b
|> Int -> POSIXTime
forall a b. (Integral a, Num b) => a -> b
Prelude.fromIntegral
POSIXTime -> (POSIXTime -> UTCTime) -> UTCTime
forall a b. a -> (a -> b) -> b
|> POSIXTime -> UTCTime
Clock.POSIX.posixSecondsToUTCTime