module Kafka.Worker.Fetcher (pollingLoop) where

import qualified Control.Concurrent
import qualified Control.Exception.Safe as Exception
import qualified Data.ByteString as ByteString
import qualified Dict
import qualified GHC.Clock
import qualified Kafka.Consumer as Consumer
import qualified Kafka.Worker.Analytics as Analytics
import qualified Kafka.Worker.Partition as Partition
import qualified Kafka.Worker.Settings as Settings
import qualified Prelude

type EnqueueRecord = (ConsumerRecord -> Prelude.IO Partition.SeekCmd)

-- | pollingLoop
-- our long-running event loop that
-- - polls for new messages
-- - for each message, spawns a thread for its partition if it doesn't yet exist
-- - appends the message to an in-memory queue that's being worked on by a partition-specific thread
pollingLoop ::
  Settings.Settings ->
  EnqueueRecord ->
  Analytics.Analytics ->
  Consumer.KafkaConsumer ->
  Prelude.IO ()
pollingLoop :: Settings -> EnqueueRecord -> Analytics -> KafkaConsumer -> IO ()
pollingLoop Settings
settings EnqueueRecord
enqueueRecord Analytics
analytics KafkaConsumer
consumer = do
  NextPollingTimestamp
now <- IO NextPollingTimestamp
nextPollingTimestamp
  Settings
-> EnqueueRecord
-> Analytics
-> KafkaConsumer
-> LastPollingTimestamp
-> IO ()
pollingLoop' Settings
settings EnqueueRecord
enqueueRecord Analytics
analytics KafkaConsumer
consumer (NextPollingTimestamp -> LastPollingTimestamp
pollTimeIsOld NextPollingTimestamp
now)

newtype LastPollingTimestamp = LastPollingTimestamp Float

newtype NextPollingTimestamp = NextPollingTimestamp Float

pollTimeIsOld :: NextPollingTimestamp -> LastPollingTimestamp
pollTimeIsOld :: NextPollingTimestamp -> LastPollingTimestamp
pollTimeIsOld (NextPollingTimestamp Float
time) = Float -> LastPollingTimestamp
LastPollingTimestamp Float
time

nextPollingTimestamp :: Prelude.IO NextPollingTimestamp
nextPollingTimestamp :: IO NextPollingTimestamp
nextPollingTimestamp = do
  (Float -> NextPollingTimestamp)
-> IO Float -> IO NextPollingTimestamp
forall (m :: * -> *) a value.
Functor m =>
(a -> value) -> m a -> m value
map Float -> NextPollingTimestamp
NextPollingTimestamp IO Float
GHC.Clock.getMonotonicTime

type ConsumerRecord = Consumer.ConsumerRecord (Maybe ByteString.ByteString) (Maybe ByteString.ByteString)

pollingLoop' ::
  Settings.Settings ->
  EnqueueRecord ->
  Analytics.Analytics ->
  Consumer.KafkaConsumer ->
  LastPollingTimestamp ->
  Prelude.IO ()
pollingLoop' :: Settings
-> EnqueueRecord
-> Analytics
-> KafkaConsumer
-> LastPollingTimestamp
-> IO ()
pollingLoop'
  settings :: Settings
settings@Settings.Settings
    { Timeout
pollingTimeout :: Settings -> Timeout
pollingTimeout :: Timeout
Settings.pollingTimeout,
      BatchSize
pollBatchSize :: Settings -> BatchSize
pollBatchSize :: BatchSize
Settings.pollBatchSize,
      MaxMsgsPerSecondPerPartition
maxMsgsPerSecondPerPartition :: Settings -> MaxMsgsPerSecondPerPartition
maxMsgsPerSecondPerPartition :: MaxMsgsPerSecondPerPartition
Settings.maxMsgsPerSecondPerPartition,
      MaxPollIntervalMs
maxPollIntervalMs :: Settings -> MaxPollIntervalMs
maxPollIntervalMs :: MaxPollIntervalMs
Settings.maxPollIntervalMs
    }
  EnqueueRecord
enqueueRecord
  Analytics
analytics
  KafkaConsumer
consumer
  LastPollingTimestamp
lastPollTimestamp = do
    -- we block here if we're actively revoking
    -- Check whether we need to shut down while long-polling for new messages.
    [Either
   KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))]
eitherMsgs <- KafkaConsumer
-> Timeout
-> BatchSize
-> IO
     [Either
        KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))]
forall (m :: * -> *).
MonadIO m =>
KafkaConsumer
-> Timeout
-> BatchSize
-> m [Either
        KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))]
Consumer.pollMessageBatch KafkaConsumer
consumer Timeout
pollingTimeout BatchSize
pollBatchSize
    [ConsumerRecord (Maybe ByteString) (Maybe ByteString)]
msgs <- (Either
   KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))
 -> IO (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
-> [Either
      KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))]
-> IO [ConsumerRecord (Maybe ByteString) (Maybe ByteString)]
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
Prelude.traverse Either
  KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))
-> IO (ConsumerRecord (Maybe ByteString) (Maybe ByteString))
forall a. Either KafkaError a -> IO a
handleKafkaError [Either
   KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))]
eitherMsgs
    Map TopicName [PartitionId]
assignment <-
      KafkaConsumer
-> IO (Either KafkaError (Map TopicName [PartitionId]))
forall (m :: * -> *).
MonadIO m =>
KafkaConsumer
-> m (Either KafkaError (Map TopicName [PartitionId]))
Consumer.assignment KafkaConsumer
consumer
        IO (Either KafkaError (Map TopicName [PartitionId]))
-> (IO (Either KafkaError (Map TopicName [PartitionId]))
    -> IO (Map TopicName [PartitionId]))
-> IO (Map TopicName [PartitionId])
forall a b. a -> (a -> b) -> b
|> (Either KafkaError (Map TopicName [PartitionId])
 -> IO (Map TopicName [PartitionId]))
-> IO (Either KafkaError (Map TopicName [PartitionId]))
-> IO (Map TopicName [PartitionId])
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
andThen Either KafkaError (Map TopicName [PartitionId])
-> IO (Map TopicName [PartitionId])
forall a. Either KafkaError a -> IO a
handleKafkaError
    [SeekCmd]
appendResults <-
      [ConsumerRecord (Maybe ByteString) (Maybe ByteString)]
msgs
        -- We occasionally get a message here for a partition that based on
        -- internal state we believed to be revoked. We feel uneasy just
        -- dropping those messages, for what if our internal state is wrong? We
        -- might be dropping messages we really should be processing.
        -- So instead we ask librdkafka to tell us what our current assignment
        -- is. If we receive messages for partitions outside of that
        -- assignment, then we can confidently drop them.
        [ConsumerRecord (Maybe ByteString) (Maybe ByteString)]
-> ([ConsumerRecord (Maybe ByteString) (Maybe ByteString)]
    -> [ConsumerRecord (Maybe ByteString) (Maybe ByteString)])
-> [ConsumerRecord (Maybe ByteString) (Maybe ByteString)]
forall a b. a -> (a -> b) -> b
|> (ConsumerRecord (Maybe ByteString) (Maybe ByteString) -> Bool)
-> [ConsumerRecord (Maybe ByteString) (Maybe ByteString)]
-> [ConsumerRecord (Maybe ByteString) (Maybe ByteString)]
forall a. (a -> Bool) -> List a -> List a
List.filter (Map TopicName [PartitionId]
-> ConsumerRecord (Maybe ByteString) (Maybe ByteString) -> Bool
msgIsForAssignedPartition Map TopicName [PartitionId]
assignment)
        -- Enqueue messages in per-partition queues.
        [ConsumerRecord (Maybe ByteString) (Maybe ByteString)]
-> ([ConsumerRecord (Maybe ByteString) (Maybe ByteString)]
    -> IO [SeekCmd])
-> IO [SeekCmd]
forall a b. a -> (a -> b) -> b
|> EnqueueRecord
-> [ConsumerRecord (Maybe ByteString) (Maybe ByteString)]
-> IO [SeekCmd]
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
Prelude.traverse EnqueueRecord
enqueueRecord
    ((TopicName, PartitionId)
 -> SeekCmd -> ((TopicName, PartitionId), SeekCmd))
-> List (TopicName, PartitionId)
-> [SeekCmd]
-> List ((TopicName, PartitionId), SeekCmd)
forall a b result.
(a -> b -> result) -> List a -> List b -> List result
List.map2 (,) ((ConsumerRecord (Maybe ByteString) (Maybe ByteString)
 -> (TopicName, PartitionId))
-> [ConsumerRecord (Maybe ByteString) (Maybe ByteString)]
-> List (TopicName, PartitionId)
forall a b. (a -> b) -> List a -> List b
List.map ConsumerRecord (Maybe ByteString) (Maybe ByteString)
-> (TopicName, PartitionId)
forall k v. ConsumerRecord k v -> (TopicName, PartitionId)
getPartitionKey [ConsumerRecord (Maybe ByteString) (Maybe ByteString)]
msgs) [SeekCmd]
appendResults
      List ((TopicName, PartitionId), SeekCmd)
-> (List ((TopicName, PartitionId), SeekCmd)
    -> Dict (TopicName, PartitionId) [SeekCmd])
-> Dict (TopicName, PartitionId) [SeekCmd]
forall a b. a -> (a -> b) -> b
|> (((TopicName, PartitionId), SeekCmd)
 -> ((TopicName, PartitionId), SeekCmd))
-> List ((TopicName, PartitionId), SeekCmd)
-> Dict (TopicName, PartitionId) [SeekCmd]
forall b a c. Ord b => (a -> (b, c)) -> List a -> Dict b (List c)
groupDictAndMap ((TopicName, PartitionId), SeekCmd)
-> ((TopicName, PartitionId), SeekCmd)
forall a. a -> a
identity
      Dict (TopicName, PartitionId) [SeekCmd]
-> (Dict (TopicName, PartitionId) [SeekCmd]
    -> List ((TopicName, PartitionId), [SeekCmd]))
-> List ((TopicName, PartitionId), [SeekCmd])
forall a b. a -> (a -> b) -> b
|> Dict (TopicName, PartitionId) [SeekCmd]
-> List ((TopicName, PartitionId), [SeekCmd])
forall k v. Dict k v -> List (k, v)
Dict.toList
      List ((TopicName, PartitionId), [SeekCmd])
-> (List ((TopicName, PartitionId), [SeekCmd])
    -> List TopicPartition)
-> List TopicPartition
forall a b. a -> (a -> b) -> b
|> (((TopicName, PartitionId), [SeekCmd]) -> Maybe TopicPartition)
-> List ((TopicName, PartitionId), [SeekCmd])
-> List TopicPartition
forall a b. (a -> Maybe b) -> List a -> List b
List.filterMap ((TopicName, PartitionId), [SeekCmd]) -> Maybe TopicPartition
toSeekPartition
      List TopicPartition -> (List TopicPartition -> IO ()) -> IO ()
forall a b. a -> (a -> b) -> b
|> KafkaConsumer -> List TopicPartition -> IO ()
seek KafkaConsumer
consumer
    NextPollingTimestamp
now <- IO NextPollingTimestamp
nextPollingTimestamp
    MaxMsgsPerSecondPerPartition
-> MaxPollIntervalMs
-> Int
-> Analytics
-> NextPollingTimestamp
-> LastPollingTimestamp
-> IO ()
throttle MaxMsgsPerSecondPerPartition
maxMsgsPerSecondPerPartition MaxPollIntervalMs
maxPollIntervalMs ([SeekCmd] -> Int
forall a. List a -> Int
List.length [SeekCmd]
appendResults) Analytics
analytics NextPollingTimestamp
now LastPollingTimestamp
lastPollTimestamp
    Settings
-> EnqueueRecord
-> Analytics
-> KafkaConsumer
-> LastPollingTimestamp
-> IO ()
pollingLoop' Settings
settings EnqueueRecord
enqueueRecord Analytics
analytics KafkaConsumer
consumer (NextPollingTimestamp -> LastPollingTimestamp
pollTimeIsOld NextPollingTimestamp
now)

getPartitionKey :: Consumer.ConsumerRecord k v -> (Consumer.TopicName, Consumer.PartitionId)
getPartitionKey :: ConsumerRecord k v -> (TopicName, PartitionId)
getPartitionKey ConsumerRecord k v
record =
  ( ConsumerRecord k v -> TopicName
forall k v. ConsumerRecord k v -> TopicName
Consumer.crTopic ConsumerRecord k v
record,
    ConsumerRecord k v -> PartitionId
forall k v. ConsumerRecord k v -> PartitionId
Consumer.crPartition ConsumerRecord k v
record
  )

toSeekPartition ::
  ( (Consumer.TopicName, Consumer.PartitionId),
    List Partition.SeekCmd
  ) ->
  Maybe Consumer.TopicPartition
toSeekPartition :: ((TopicName, PartitionId), [SeekCmd]) -> Maybe TopicPartition
toSeekPartition ((TopicName
topicName, PartitionId
partitionId), [SeekCmd]
appendResults) =
  -- Among they last batch of fetched messages might have been multiple messages
  -- for this partition, which we subsequently tried to enqueue. It's possible
  -- the first message might have had an offset smaller than the one that we
  -- were looking for, but that somewhere in the middle of the series we caught
  -- up. That's why we consider only the last message appended to the patition
  -- in this batch. If that one was succesfull then there's nothing for us to
  -- do. If it had an unexpected offset then we should seek.
  case [SeekCmd] -> Maybe SeekCmd
forall a. List a -> Maybe a
last [SeekCmd]
appendResults of
    Maybe SeekCmd
Nothing -> Maybe TopicPartition
forall a. Maybe a
Nothing
    Just SeekCmd
Partition.NoSeek -> Maybe TopicPartition
forall a. Maybe a
Nothing
    Just (Partition.SeekToOffset Int
offset) ->
      TopicPartition -> Maybe TopicPartition
forall a. a -> Maybe a
Just
        TopicPartition :: TopicName -> PartitionId -> PartitionOffset -> TopicPartition
Consumer.TopicPartition
          { tpTopicName :: TopicName
Consumer.tpTopicName = TopicName
topicName,
            tpPartition :: PartitionId
Consumer.tpPartition = PartitionId
partitionId,
            tpOffset :: PartitionOffset
Consumer.tpOffset = Int -> PartitionOffset
Consumer.PartitionOffset Int
offset
          }

last :: List a -> Maybe a
last :: List a -> Maybe a
last List a
list = List a -> Maybe a
forall a. List a -> Maybe a
List.head (List a -> List a
forall a. List a -> List a
List.reverse List a
list)

seek :: Consumer.KafkaConsumer -> List Consumer.TopicPartition -> Prelude.IO ()
seek :: KafkaConsumer -> List TopicPartition -> IO ()
seek KafkaConsumer
consumer List TopicPartition
partitions = do
  let goSeek :: IO (Maybe KafkaError)
goSeek = KafkaConsumer
-> Timeout -> List TopicPartition -> IO (Maybe KafkaError)
forall (m :: * -> *).
MonadIO m =>
KafkaConsumer
-> Timeout -> List TopicPartition -> m (Maybe KafkaError)
Consumer.seek KafkaConsumer
consumer (Int -> Timeout
Consumer.Timeout Int
5000 {- 5 seconds -}) List TopicPartition
partitions
  Maybe KafkaError
maybeSeekError <- IO (Maybe KafkaError)
goSeek
  case Maybe KafkaError
maybeSeekError of
    Maybe KafkaError
Nothing -> () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
Prelude.pure ()
    Just KafkaError
_ -> do
      -- Retry once, after a delay, because we're seeing reports
      -- that attempting to `seek` after just having called
      -- `assign` (which hw-kafka-client does for us before running
      -- this callback) might result in seek failing. See:
      -- https://github.com/confluentinc/confluent-kafka-dotnet/issues/1303
      Int -> IO ()
Control.Concurrent.threadDelay Int
5_000_000 {- 5 seconds -}
      Maybe KafkaError
maybeSeekError2 <- IO (Maybe KafkaError)
goSeek
      case Maybe KafkaError
maybeSeekError2 of
        Maybe KafkaError
Nothing -> () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
Prelude.pure ()
        Just KafkaError
seekError2 -> KafkaError -> IO ()
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
Exception.throwIO KafkaError
seekError2

msgIsForAssignedPartition ::
  Dict.Dict Consumer.TopicName [Consumer.PartitionId] ->
  ConsumerRecord ->
  Bool
msgIsForAssignedPartition :: Map TopicName [PartitionId]
-> ConsumerRecord (Maybe ByteString) (Maybe ByteString) -> Bool
msgIsForAssignedPartition Map TopicName [PartitionId]
assignment ConsumerRecord (Maybe ByteString) (Maybe ByteString)
msg =
  case TopicName -> Map TopicName [PartitionId] -> Maybe [PartitionId]
forall comparable v.
Ord comparable =>
comparable -> Dict comparable v -> Maybe v
Dict.get (ConsumerRecord (Maybe ByteString) (Maybe ByteString) -> TopicName
forall k v. ConsumerRecord k v -> TopicName
Consumer.crTopic ConsumerRecord (Maybe ByteString) (Maybe ByteString)
msg) Map TopicName [PartitionId]
assignment of
    Maybe [PartitionId]
Nothing -> Bool
False
    Just [PartitionId]
partitionIds ->
      PartitionId -> [PartitionId] -> Bool
forall a. Eq a => a -> List a -> Bool
List.member (ConsumerRecord (Maybe ByteString) (Maybe ByteString) -> PartitionId
forall k v. ConsumerRecord k v -> PartitionId
Consumer.crPartition ConsumerRecord (Maybe ByteString) (Maybe ByteString)
msg) [PartitionId]
partitionIds

handleKafkaError :: Prelude.Either Consumer.KafkaError a -> Prelude.IO a
handleKafkaError :: Either KafkaError a -> IO a
handleKafkaError Either KafkaError a
eitherMsg = do
  case Either KafkaError a
eitherMsg of
    Prelude.Left KafkaError
err ->
      -- Kill the worker process if polling for messages results in an error.
      -- Every individual message of a batch can contain an error. It's unclear
      -- what the implications of that are. Specifically: could such errors
      -- result in holes in the stream of messages for a partition? That would
      -- be bad, it could result in some messages being ignored.
      --
      -- Crashing the worker seems a "safe" option at least. If such crashes
      -- are very rare this solution might be good enough. If it happens
      -- regularly we should figure out a better solution.
      KafkaError -> IO a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
Exception.throwIO KafkaError
err
    Prelude.Right a
record ->
      a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
Prelude.pure a
record

-- | Call on the poll thread after fetching a new batch of messages. If we're
-- ahead of our quotum this function will sleep for a bit, delaying the fetch of
-- the next batch.
throttle ::
  Settings.MaxMsgsPerSecondPerPartition ->
  Settings.MaxPollIntervalMs ->
  Int ->
  Analytics.Analytics ->
  NextPollingTimestamp ->
  LastPollingTimestamp ->
  Prelude.IO ()
throttle :: MaxMsgsPerSecondPerPartition
-> MaxPollIntervalMs
-> Int
-> Analytics
-> NextPollingTimestamp
-> LastPollingTimestamp
-> IO ()
throttle MaxMsgsPerSecondPerPartition
Settings.DontThrottle MaxPollIntervalMs
_ Int
_ Analytics
_ NextPollingTimestamp
_ LastPollingTimestamp
_ = () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
Prelude.pure ()
throttle (Settings.ThrottleAt Int
maxMsgsPerSecondPerPartition) MaxPollIntervalMs
maxPollIntervalMs Int
newPolledMessages Analytics
analytics (NextPollingTimestamp Float
now) (LastPollingTimestamp Float
lastPollTimestamp) = do
  (PausedPartitions
_, Analytics.AssignedPartitions Int
numPartitions, TimeOfLastRebalance
_) <- Analytics
-> IO (PausedPartitions, AssignedPartitions, TimeOfLastRebalance)
Analytics.read Analytics
analytics
  let timeDiff :: Int
timeDiff = Float -> Int
forall a b. (RealFrac a, Integral b) => a -> b
Prelude.floor (Float
now Float -> Float -> Float
forall number. Num number => number -> number -> number
- Float
lastPollTimestamp)
  let quotumPerSecond :: Int
quotumPerSecond = Int
maxMsgsPerSecondPerPartition Int -> Int -> Int
forall number. Num number => number -> number -> number
* Int
numPartitions
  let quotum :: Int
quotum = Int
timeDiff Int -> Int -> Int
forall number. Num number => number -> number -> number
* Int
quotumPerSecond
  let overQuotum :: Int
overQuotum = Int
newPolledMessages Int -> Int -> Int
forall number. Num number => number -> number -> number
- Int
quotum
  let secondsToSleep :: Float
secondsToSleep =
        Int -> Float
forall a b. (Integral a, Num b) => a -> b
Prelude.fromIntegral Int
overQuotum Float -> Float -> Float
/ Int -> Float
forall a b. (Integral a, Num b) => a -> b
Prelude.fromIntegral Int
quotumPerSecond
  let microSecondsToSleep :: Int
microSecondsToSleep =
        Float -> Int
forall a b. (RealFrac a, Integral b) => a -> b
Prelude.floor (Float
secondsToSleep Float -> Float -> Float
forall number. Num number => number -> number -> number
* Float
1e6)
          Int -> (Int -> Int) -> Int
forall a b. a -> (a -> b) -> b
|> Int -> Int -> Int
forall comparable.
Ord comparable =>
comparable -> comparable -> comparable
min (Int -> Int
forall a b. (Integral a, Num b) => a -> b
Prelude.fromIntegral (Int -> Int) -> Int -> Int
forall a b. (a -> b) -> a -> b
<| MaxPollIntervalMs -> Int
Settings.unMaxPollIntervalMs MaxPollIntervalMs
maxPollIntervalMs Int -> Int -> Int
forall number. Num number => number -> number -> number
- Int
100) -- -100ms so that it has time to loop.
  if Int
microSecondsToSleep Int -> Int -> Bool
forall comparable.
Ord comparable =>
comparable -> comparable -> Bool
> Int
0
    then Int -> IO ()
Control.Concurrent.threadDelay Int
microSecondsToSleep
    else () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
Prelude.pure ()

groupDictAndMap :: Ord b => (a -> (b, c)) -> List a -> Dict.Dict b (List c)
groupDictAndMap :: (a -> (b, c)) -> List a -> Dict b (List c)
groupDictAndMap a -> (b, c)
f =
  (a -> Dict b (List c) -> Dict b (List c))
-> Dict b (List c) -> List a -> Dict b (List c)
forall a b. (a -> b -> b) -> b -> List a -> b
List.foldr
    ( \a
x ->
        b
-> (Maybe (List c) -> Maybe (List c))
-> Dict b (List c)
-> Dict b (List c)
forall comparable v.
Ord comparable =>
comparable
-> (Maybe v -> Maybe v) -> Dict comparable v -> Dict comparable v
Dict.update ((b, c) -> b
forall a b. (a, b) -> a
Tuple.first (a -> (b, c)
f a
x)) ((Maybe (List c) -> Maybe (List c))
 -> Dict b (List c) -> Dict b (List c))
-> (Maybe (List c) -> Maybe (List c))
-> Dict b (List c)
-> Dict b (List c)
forall a b. (a -> b) -> a -> b
<| \Maybe (List c)
val ->
          case Maybe (List c)
val of
            Maybe (List c)
Nothing -> List c -> Maybe (List c)
forall a. a -> Maybe a
Just [(b, c) -> c
forall a b. (a, b) -> b
Tuple.second (a -> (b, c)
f a
x)]
            Just List c
y -> List c -> Maybe (List c)
forall a. a -> Maybe a
Just ((b, c) -> c
forall a b. (a, b) -> b
Tuple.second (a -> (b, c)
f a
x) c -> List c -> List c
forall a. a -> [a] -> [a]
: List c
y)
    )
    Dict b (List c)
forall k v. Dict k v
Dict.empty