{-# LANGUAGE DeriveDataTypeable         #-}
{-# LANGUAGE DeriveGeneric              #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}

-----------------------------------------------------------------------------
-- |
-- Module holding consumer types.
-----------------------------------------------------------------------------
module Kafka.Consumer.Types
( KafkaConsumer(..)
, ConsumerGroupId(..)
, Offset(..)
, OffsetReset(..)
, RebalanceEvent(..)
, PartitionOffset(..)
, SubscribedPartitions(..)
, Timestamp(..)
, OffsetCommit(..)
, OffsetStoreSync(..)
, OffsetStoreMethod(..)
, TopicPartition(..)
, ConsumerRecord(..)
, crMapKey
, crMapValue
, crMapKV
-- why are these here?

-- * Deprecated
, sequenceFirst
, traverseFirst
, traverseFirstM
, traverseM
, bitraverseM
)
where

import Data.Bifoldable      (Bifoldable (..))
import Data.Bifunctor       (Bifunctor (..))
import Data.Bitraversable   (Bitraversable (..), bimapM, bisequence)
import Data.Int             (Int64)
import Data.String          (IsString)
import Data.Text            (Text)
import Data.Typeable        (Typeable)
import GHC.Generics         (Generic)
import Kafka.Internal.Setup (HasKafka (..), HasKafkaConf (..), Kafka (..), KafkaConf (..))
import Kafka.Types          (Millis (..), PartitionId (..), TopicName (..))

-- | The main type for Kafka consumption, used e.g. to poll and commit messages.
-- 
-- Its constructor is intentionally not exposed, instead, one should use 'Kafka.Consumer.newConsumer' to acquire such a value.
data KafkaConsumer = KafkaConsumer
  { KafkaConsumer -> Kafka
kcKafkaPtr  :: !Kafka
  , KafkaConsumer -> KafkaConf
kcKafkaConf :: !KafkaConf
  }

instance HasKafka KafkaConsumer where
  getKafka :: KafkaConsumer -> Kafka
getKafka = KafkaConsumer -> Kafka
kcKafkaPtr
  {-# INLINE getKafka #-}

instance HasKafkaConf KafkaConsumer where
  getKafkaConf :: KafkaConsumer -> KafkaConf
getKafkaConf = KafkaConsumer -> KafkaConf
kcKafkaConf
  {-# INLINE getKafkaConf #-}

-- | Consumer group ID. Different consumers with the same consumer group ID will get assigned different partitions of each subscribed topic. 
-- 
-- See <https://kafka.apache.org/documentation/#group.id Kafka documentation on consumer group>
newtype ConsumerGroupId = ConsumerGroupId
  { ConsumerGroupId -> Text
unConsumerGroupId :: Text
  } deriving (Int -> ConsumerGroupId -> ShowS
[ConsumerGroupId] -> ShowS
ConsumerGroupId -> String
(Int -> ConsumerGroupId -> ShowS)
-> (ConsumerGroupId -> String)
-> ([ConsumerGroupId] -> ShowS)
-> Show ConsumerGroupId
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [ConsumerGroupId] -> ShowS
$cshowList :: [ConsumerGroupId] -> ShowS
show :: ConsumerGroupId -> String
$cshow :: ConsumerGroupId -> String
showsPrec :: Int -> ConsumerGroupId -> ShowS
$cshowsPrec :: Int -> ConsumerGroupId -> ShowS
Show, Eq ConsumerGroupId
Eq ConsumerGroupId =>
(ConsumerGroupId -> ConsumerGroupId -> Ordering)
-> (ConsumerGroupId -> ConsumerGroupId -> Bool)
-> (ConsumerGroupId -> ConsumerGroupId -> Bool)
-> (ConsumerGroupId -> ConsumerGroupId -> Bool)
-> (ConsumerGroupId -> ConsumerGroupId -> Bool)
-> (ConsumerGroupId -> ConsumerGroupId -> ConsumerGroupId)
-> (ConsumerGroupId -> ConsumerGroupId -> ConsumerGroupId)
-> Ord ConsumerGroupId
ConsumerGroupId -> ConsumerGroupId -> Bool
ConsumerGroupId -> ConsumerGroupId -> Ordering
ConsumerGroupId -> ConsumerGroupId -> ConsumerGroupId
forall a.
Eq a =>
(a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
min :: ConsumerGroupId -> ConsumerGroupId -> ConsumerGroupId
$cmin :: ConsumerGroupId -> ConsumerGroupId -> ConsumerGroupId
max :: ConsumerGroupId -> ConsumerGroupId -> ConsumerGroupId
$cmax :: ConsumerGroupId -> ConsumerGroupId -> ConsumerGroupId
>= :: ConsumerGroupId -> ConsumerGroupId -> Bool
$c>= :: ConsumerGroupId -> ConsumerGroupId -> Bool
> :: ConsumerGroupId -> ConsumerGroupId -> Bool
$c> :: ConsumerGroupId -> ConsumerGroupId -> Bool
<= :: ConsumerGroupId -> ConsumerGroupId -> Bool
$c<= :: ConsumerGroupId -> ConsumerGroupId -> Bool
< :: ConsumerGroupId -> ConsumerGroupId -> Bool
$c< :: ConsumerGroupId -> ConsumerGroupId -> Bool
compare :: ConsumerGroupId -> ConsumerGroupId -> Ordering
$ccompare :: ConsumerGroupId -> ConsumerGroupId -> Ordering
$cp1Ord :: Eq ConsumerGroupId
Ord, ConsumerGroupId -> ConsumerGroupId -> Bool
(ConsumerGroupId -> ConsumerGroupId -> Bool)
-> (ConsumerGroupId -> ConsumerGroupId -> Bool)
-> Eq ConsumerGroupId
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: ConsumerGroupId -> ConsumerGroupId -> Bool
$c/= :: ConsumerGroupId -> ConsumerGroupId -> Bool
== :: ConsumerGroupId -> ConsumerGroupId -> Bool
$c== :: ConsumerGroupId -> ConsumerGroupId -> Bool
Eq, String -> ConsumerGroupId
(String -> ConsumerGroupId) -> IsString ConsumerGroupId
forall a. (String -> a) -> IsString a
fromString :: String -> ConsumerGroupId
$cfromString :: String -> ConsumerGroupId
IsString, (forall x. ConsumerGroupId -> Rep ConsumerGroupId x)
-> (forall x. Rep ConsumerGroupId x -> ConsumerGroupId)
-> Generic ConsumerGroupId
forall x. Rep ConsumerGroupId x -> ConsumerGroupId
forall x. ConsumerGroupId -> Rep ConsumerGroupId x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep ConsumerGroupId x -> ConsumerGroupId
$cfrom :: forall x. ConsumerGroupId -> Rep ConsumerGroupId x
Generic)

-- | A message offset in a partition
newtype Offset          = Offset { Offset -> Int64
unOffset :: Int64 } deriving (Int -> Offset -> ShowS
[Offset] -> ShowS
Offset -> String
(Int -> Offset -> ShowS)
-> (Offset -> String) -> ([Offset] -> ShowS) -> Show Offset
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [Offset] -> ShowS
$cshowList :: [Offset] -> ShowS
show :: Offset -> String
$cshow :: Offset -> String
showsPrec :: Int -> Offset -> ShowS
$cshowsPrec :: Int -> Offset -> ShowS
Show, Offset -> Offset -> Bool
(Offset -> Offset -> Bool)
-> (Offset -> Offset -> Bool) -> Eq Offset
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: Offset -> Offset -> Bool
$c/= :: Offset -> Offset -> Bool
== :: Offset -> Offset -> Bool
$c== :: Offset -> Offset -> Bool
Eq, Eq Offset
Eq Offset =>
(Offset -> Offset -> Ordering)
-> (Offset -> Offset -> Bool)
-> (Offset -> Offset -> Bool)
-> (Offset -> Offset -> Bool)
-> (Offset -> Offset -> Bool)
-> (Offset -> Offset -> Offset)
-> (Offset -> Offset -> Offset)
-> Ord Offset
Offset -> Offset -> Bool
Offset -> Offset -> Ordering
Offset -> Offset -> Offset
forall a.
Eq a =>
(a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
min :: Offset -> Offset -> Offset
$cmin :: Offset -> Offset -> Offset
max :: Offset -> Offset -> Offset
$cmax :: Offset -> Offset -> Offset
>= :: Offset -> Offset -> Bool
$c>= :: Offset -> Offset -> Bool
> :: Offset -> Offset -> Bool
$c> :: Offset -> Offset -> Bool
<= :: Offset -> Offset -> Bool
$c<= :: Offset -> Offset -> Bool
< :: Offset -> Offset -> Bool
$c< :: Offset -> Offset -> Bool
compare :: Offset -> Offset -> Ordering
$ccompare :: Offset -> Offset -> Ordering
$cp1Ord :: Eq Offset
Ord, ReadPrec [Offset]
ReadPrec Offset
Int -> ReadS Offset
ReadS [Offset]
(Int -> ReadS Offset)
-> ReadS [Offset]
-> ReadPrec Offset
-> ReadPrec [Offset]
-> Read Offset
forall a.
(Int -> ReadS a)
-> ReadS [a] -> ReadPrec a -> ReadPrec [a] -> Read a
readListPrec :: ReadPrec [Offset]
$creadListPrec :: ReadPrec [Offset]
readPrec :: ReadPrec Offset
$creadPrec :: ReadPrec Offset
readList :: ReadS [Offset]
$creadList :: ReadS [Offset]
readsPrec :: Int -> ReadS Offset
$creadsPrec :: Int -> ReadS Offset
Read, (forall x. Offset -> Rep Offset x)
-> (forall x. Rep Offset x -> Offset) -> Generic Offset
forall x. Rep Offset x -> Offset
forall x. Offset -> Rep Offset x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep Offset x -> Offset
$cfrom :: forall x. Offset -> Rep Offset x
Generic)

-- | Where to reset the offset when there is no initial offset in Kafka
-- 
-- See <https://kafka.apache.org/documentation/#auto.offset.reset Kafka documentation on offset reset>
data OffsetReset        = Earliest | Latest deriving (Int -> OffsetReset -> ShowS
[OffsetReset] -> ShowS
OffsetReset -> String
(Int -> OffsetReset -> ShowS)
-> (OffsetReset -> String)
-> ([OffsetReset] -> ShowS)
-> Show OffsetReset
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [OffsetReset] -> ShowS
$cshowList :: [OffsetReset] -> ShowS
show :: OffsetReset -> String
$cshow :: OffsetReset -> String
showsPrec :: Int -> OffsetReset -> ShowS
$cshowsPrec :: Int -> OffsetReset -> ShowS
Show, OffsetReset -> OffsetReset -> Bool
(OffsetReset -> OffsetReset -> Bool)
-> (OffsetReset -> OffsetReset -> Bool) -> Eq OffsetReset
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: OffsetReset -> OffsetReset -> Bool
$c/= :: OffsetReset -> OffsetReset -> Bool
== :: OffsetReset -> OffsetReset -> Bool
$c== :: OffsetReset -> OffsetReset -> Bool
Eq, (forall x. OffsetReset -> Rep OffsetReset x)
-> (forall x. Rep OffsetReset x -> OffsetReset)
-> Generic OffsetReset
forall x. Rep OffsetReset x -> OffsetReset
forall x. OffsetReset -> Rep OffsetReset x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep OffsetReset x -> OffsetReset
$cfrom :: forall x. OffsetReset -> Rep OffsetReset x
Generic)

-- | A set of events which happen during the rebalancing process
data RebalanceEvent =
    -- | Happens before Kafka Client confirms new assignment
    RebalanceBeforeAssign [(TopicName, PartitionId)]
    -- | Happens after the new assignment is confirmed
  | RebalanceAssign [(TopicName, PartitionId)]
    -- | Happens before Kafka Client confirms partitions rejection
  | RebalanceBeforeRevoke [(TopicName, PartitionId)]
    -- | Happens after the rejection is confirmed
  | RebalanceRevoke [(TopicName, PartitionId)]
  deriving (RebalanceEvent -> RebalanceEvent -> Bool
(RebalanceEvent -> RebalanceEvent -> Bool)
-> (RebalanceEvent -> RebalanceEvent -> Bool) -> Eq RebalanceEvent
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: RebalanceEvent -> RebalanceEvent -> Bool
$c/= :: RebalanceEvent -> RebalanceEvent -> Bool
== :: RebalanceEvent -> RebalanceEvent -> Bool
$c== :: RebalanceEvent -> RebalanceEvent -> Bool
Eq, Int -> RebalanceEvent -> ShowS
[RebalanceEvent] -> ShowS
RebalanceEvent -> String
(Int -> RebalanceEvent -> ShowS)
-> (RebalanceEvent -> String)
-> ([RebalanceEvent] -> ShowS)
-> Show RebalanceEvent
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [RebalanceEvent] -> ShowS
$cshowList :: [RebalanceEvent] -> ShowS
show :: RebalanceEvent -> String
$cshow :: RebalanceEvent -> String
showsPrec :: Int -> RebalanceEvent -> ShowS
$cshowsPrec :: Int -> RebalanceEvent -> ShowS
Show, (forall x. RebalanceEvent -> Rep RebalanceEvent x)
-> (forall x. Rep RebalanceEvent x -> RebalanceEvent)
-> Generic RebalanceEvent
forall x. Rep RebalanceEvent x -> RebalanceEvent
forall x. RebalanceEvent -> Rep RebalanceEvent x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep RebalanceEvent x -> RebalanceEvent
$cfrom :: forall x. RebalanceEvent -> Rep RebalanceEvent x
Generic)

-- | The partition offset
data PartitionOffset =
    PartitionOffsetBeginning
  | PartitionOffsetEnd
  | PartitionOffset Int64
  | PartitionOffsetStored
  | PartitionOffsetInvalid
  deriving (PartitionOffset -> PartitionOffset -> Bool
(PartitionOffset -> PartitionOffset -> Bool)
-> (PartitionOffset -> PartitionOffset -> Bool)
-> Eq PartitionOffset
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: PartitionOffset -> PartitionOffset -> Bool
$c/= :: PartitionOffset -> PartitionOffset -> Bool
== :: PartitionOffset -> PartitionOffset -> Bool
$c== :: PartitionOffset -> PartitionOffset -> Bool
Eq, Int -> PartitionOffset -> ShowS
[PartitionOffset] -> ShowS
PartitionOffset -> String
(Int -> PartitionOffset -> ShowS)
-> (PartitionOffset -> String)
-> ([PartitionOffset] -> ShowS)
-> Show PartitionOffset
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [PartitionOffset] -> ShowS
$cshowList :: [PartitionOffset] -> ShowS
show :: PartitionOffset -> String
$cshow :: PartitionOffset -> String
showsPrec :: Int -> PartitionOffset -> ShowS
$cshowsPrec :: Int -> PartitionOffset -> ShowS
Show, (forall x. PartitionOffset -> Rep PartitionOffset x)
-> (forall x. Rep PartitionOffset x -> PartitionOffset)
-> Generic PartitionOffset
forall x. Rep PartitionOffset x -> PartitionOffset
forall x. PartitionOffset -> Rep PartitionOffset x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep PartitionOffset x -> PartitionOffset
$cfrom :: forall x. PartitionOffset -> Rep PartitionOffset x
Generic)

-- | Partitions subscribed by a consumer
data SubscribedPartitions
  = SubscribedPartitions [PartitionId] -- ^ Subscribe only to those partitions
  | SubscribedPartitionsAll            -- ^ Subscribe to all partitions
  deriving (Int -> SubscribedPartitions -> ShowS
[SubscribedPartitions] -> ShowS
SubscribedPartitions -> String
(Int -> SubscribedPartitions -> ShowS)
-> (SubscribedPartitions -> String)
-> ([SubscribedPartitions] -> ShowS)
-> Show SubscribedPartitions
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [SubscribedPartitions] -> ShowS
$cshowList :: [SubscribedPartitions] -> ShowS
show :: SubscribedPartitions -> String
$cshow :: SubscribedPartitions -> String
showsPrec :: Int -> SubscribedPartitions -> ShowS
$cshowsPrec :: Int -> SubscribedPartitions -> ShowS
Show, SubscribedPartitions -> SubscribedPartitions -> Bool
(SubscribedPartitions -> SubscribedPartitions -> Bool)
-> (SubscribedPartitions -> SubscribedPartitions -> Bool)
-> Eq SubscribedPartitions
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: SubscribedPartitions -> SubscribedPartitions -> Bool
$c/= :: SubscribedPartitions -> SubscribedPartitions -> Bool
== :: SubscribedPartitions -> SubscribedPartitions -> Bool
$c== :: SubscribedPartitions -> SubscribedPartitions -> Bool
Eq, (forall x. SubscribedPartitions -> Rep SubscribedPartitions x)
-> (forall x. Rep SubscribedPartitions x -> SubscribedPartitions)
-> Generic SubscribedPartitions
forall x. Rep SubscribedPartitions x -> SubscribedPartitions
forall x. SubscribedPartitions -> Rep SubscribedPartitions x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep SubscribedPartitions x -> SubscribedPartitions
$cfrom :: forall x. SubscribedPartitions -> Rep SubscribedPartitions x
Generic)

-- | Consumer record timestamp 
data Timestamp =
    CreateTime !Millis
  | LogAppendTime !Millis
  | NoTimestamp
  deriving (Int -> Timestamp -> ShowS
[Timestamp] -> ShowS
Timestamp -> String
(Int -> Timestamp -> ShowS)
-> (Timestamp -> String)
-> ([Timestamp] -> ShowS)
-> Show Timestamp
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [Timestamp] -> ShowS
$cshowList :: [Timestamp] -> ShowS
show :: Timestamp -> String
$cshow :: Timestamp -> String
showsPrec :: Int -> Timestamp -> ShowS
$cshowsPrec :: Int -> Timestamp -> ShowS
Show, Timestamp -> Timestamp -> Bool
(Timestamp -> Timestamp -> Bool)
-> (Timestamp -> Timestamp -> Bool) -> Eq Timestamp
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: Timestamp -> Timestamp -> Bool
$c/= :: Timestamp -> Timestamp -> Bool
== :: Timestamp -> Timestamp -> Bool
$c== :: Timestamp -> Timestamp -> Bool
Eq, ReadPrec [Timestamp]
ReadPrec Timestamp
Int -> ReadS Timestamp
ReadS [Timestamp]
(Int -> ReadS Timestamp)
-> ReadS [Timestamp]
-> ReadPrec Timestamp
-> ReadPrec [Timestamp]
-> Read Timestamp
forall a.
(Int -> ReadS a)
-> ReadS [a] -> ReadPrec a -> ReadPrec [a] -> Read a
readListPrec :: ReadPrec [Timestamp]
$creadListPrec :: ReadPrec [Timestamp]
readPrec :: ReadPrec Timestamp
$creadPrec :: ReadPrec Timestamp
readList :: ReadS [Timestamp]
$creadList :: ReadS [Timestamp]
readsPrec :: Int -> ReadS Timestamp
$creadsPrec :: Int -> ReadS Timestamp
Read, (forall x. Timestamp -> Rep Timestamp x)
-> (forall x. Rep Timestamp x -> Timestamp) -> Generic Timestamp
forall x. Rep Timestamp x -> Timestamp
forall x. Timestamp -> Rep Timestamp x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep Timestamp x -> Timestamp
$cfrom :: forall x. Timestamp -> Rep Timestamp x
Generic)

-- | Offsets commit mode
data OffsetCommit =
      OffsetCommit       -- ^ Forces consumer to block until the broker offsets commit is done
    | OffsetCommitAsync  -- ^ Offsets will be committed in a non-blocking way
    deriving (Int -> OffsetCommit -> ShowS
[OffsetCommit] -> ShowS
OffsetCommit -> String
(Int -> OffsetCommit -> ShowS)
-> (OffsetCommit -> String)
-> ([OffsetCommit] -> ShowS)
-> Show OffsetCommit
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [OffsetCommit] -> ShowS
$cshowList :: [OffsetCommit] -> ShowS
show :: OffsetCommit -> String
$cshow :: OffsetCommit -> String
showsPrec :: Int -> OffsetCommit -> ShowS
$cshowsPrec :: Int -> OffsetCommit -> ShowS
Show, OffsetCommit -> OffsetCommit -> Bool
(OffsetCommit -> OffsetCommit -> Bool)
-> (OffsetCommit -> OffsetCommit -> Bool) -> Eq OffsetCommit
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: OffsetCommit -> OffsetCommit -> Bool
$c/= :: OffsetCommit -> OffsetCommit -> Bool
== :: OffsetCommit -> OffsetCommit -> Bool
$c== :: OffsetCommit -> OffsetCommit -> Bool
Eq, (forall x. OffsetCommit -> Rep OffsetCommit x)
-> (forall x. Rep OffsetCommit x -> OffsetCommit)
-> Generic OffsetCommit
forall x. Rep OffsetCommit x -> OffsetCommit
forall x. OffsetCommit -> Rep OffsetCommit x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep OffsetCommit x -> OffsetCommit
$cfrom :: forall x. OffsetCommit -> Rep OffsetCommit x
Generic)


-- | Indicates how offsets are to be synced to disk
data OffsetStoreSync =
      OffsetSyncDisable       -- ^ Do not sync offsets (in Kafka: -1)
    | OffsetSyncImmediate     -- ^ Sync immediately after each offset commit (in Kafka: 0)
    | OffsetSyncInterval Int  -- ^ Sync after specified interval in millis
    deriving (Int -> OffsetStoreSync -> ShowS
[OffsetStoreSync] -> ShowS
OffsetStoreSync -> String
(Int -> OffsetStoreSync -> ShowS)
-> (OffsetStoreSync -> String)
-> ([OffsetStoreSync] -> ShowS)
-> Show OffsetStoreSync
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [OffsetStoreSync] -> ShowS
$cshowList :: [OffsetStoreSync] -> ShowS
show :: OffsetStoreSync -> String
$cshow :: OffsetStoreSync -> String
showsPrec :: Int -> OffsetStoreSync -> ShowS
$cshowsPrec :: Int -> OffsetStoreSync -> ShowS
Show, OffsetStoreSync -> OffsetStoreSync -> Bool
(OffsetStoreSync -> OffsetStoreSync -> Bool)
-> (OffsetStoreSync -> OffsetStoreSync -> Bool)
-> Eq OffsetStoreSync
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: OffsetStoreSync -> OffsetStoreSync -> Bool
$c/= :: OffsetStoreSync -> OffsetStoreSync -> Bool
== :: OffsetStoreSync -> OffsetStoreSync -> Bool
$c== :: OffsetStoreSync -> OffsetStoreSync -> Bool
Eq, (forall x. OffsetStoreSync -> Rep OffsetStoreSync x)
-> (forall x. Rep OffsetStoreSync x -> OffsetStoreSync)
-> Generic OffsetStoreSync
forall x. Rep OffsetStoreSync x -> OffsetStoreSync
forall x. OffsetStoreSync -> Rep OffsetStoreSync x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep OffsetStoreSync x -> OffsetStoreSync
$cfrom :: forall x. OffsetStoreSync -> Rep OffsetStoreSync x
Generic)

-- | Indicates the method of storing the offsets
data OffsetStoreMethod =
      OffsetStoreBroker                         -- ^ Offsets are stored in Kafka broker (preferred)
    | OffsetStoreFile FilePath OffsetStoreSync  -- ^ Offsets are stored in a file (and synced to disk according to the sync policy)
    deriving (Int -> OffsetStoreMethod -> ShowS
[OffsetStoreMethod] -> ShowS
OffsetStoreMethod -> String
(Int -> OffsetStoreMethod -> ShowS)
-> (OffsetStoreMethod -> String)
-> ([OffsetStoreMethod] -> ShowS)
-> Show OffsetStoreMethod
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [OffsetStoreMethod] -> ShowS
$cshowList :: [OffsetStoreMethod] -> ShowS
show :: OffsetStoreMethod -> String
$cshow :: OffsetStoreMethod -> String
showsPrec :: Int -> OffsetStoreMethod -> ShowS
$cshowsPrec :: Int -> OffsetStoreMethod -> ShowS
Show, OffsetStoreMethod -> OffsetStoreMethod -> Bool
(OffsetStoreMethod -> OffsetStoreMethod -> Bool)
-> (OffsetStoreMethod -> OffsetStoreMethod -> Bool)
-> Eq OffsetStoreMethod
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: OffsetStoreMethod -> OffsetStoreMethod -> Bool
$c/= :: OffsetStoreMethod -> OffsetStoreMethod -> Bool
== :: OffsetStoreMethod -> OffsetStoreMethod -> Bool
$c== :: OffsetStoreMethod -> OffsetStoreMethod -> Bool
Eq, (forall x. OffsetStoreMethod -> Rep OffsetStoreMethod x)
-> (forall x. Rep OffsetStoreMethod x -> OffsetStoreMethod)
-> Generic OffsetStoreMethod
forall x. Rep OffsetStoreMethod x -> OffsetStoreMethod
forall x. OffsetStoreMethod -> Rep OffsetStoreMethod x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep OffsetStoreMethod x -> OffsetStoreMethod
$cfrom :: forall x. OffsetStoreMethod -> Rep OffsetStoreMethod x
Generic)

-- | Kafka topic partition structure
data TopicPartition = TopicPartition
  { TopicPartition -> TopicName
tpTopicName :: TopicName
  , TopicPartition -> PartitionId
tpPartition :: PartitionId
  , TopicPartition -> PartitionOffset
tpOffset    :: PartitionOffset
  } deriving (Int -> TopicPartition -> ShowS
[TopicPartition] -> ShowS
TopicPartition -> String
(Int -> TopicPartition -> ShowS)
-> (TopicPartition -> String)
-> ([TopicPartition] -> ShowS)
-> Show TopicPartition
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [TopicPartition] -> ShowS
$cshowList :: [TopicPartition] -> ShowS
show :: TopicPartition -> String
$cshow :: TopicPartition -> String
showsPrec :: Int -> TopicPartition -> ShowS
$cshowsPrec :: Int -> TopicPartition -> ShowS
Show, TopicPartition -> TopicPartition -> Bool
(TopicPartition -> TopicPartition -> Bool)
-> (TopicPartition -> TopicPartition -> Bool) -> Eq TopicPartition
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: TopicPartition -> TopicPartition -> Bool
$c/= :: TopicPartition -> TopicPartition -> Bool
== :: TopicPartition -> TopicPartition -> Bool
$c== :: TopicPartition -> TopicPartition -> Bool
Eq, (forall x. TopicPartition -> Rep TopicPartition x)
-> (forall x. Rep TopicPartition x -> TopicPartition)
-> Generic TopicPartition
forall x. Rep TopicPartition x -> TopicPartition
forall x. TopicPartition -> Rep TopicPartition x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep TopicPartition x -> TopicPartition
$cfrom :: forall x. TopicPartition -> Rep TopicPartition x
Generic)

-- | Represents a /received/ message from Kafka (i.e. used in a consumer)
data ConsumerRecord k v = ConsumerRecord
  { ConsumerRecord k v -> TopicName
crTopic     :: !TopicName    -- ^ Kafka topic this message was received from
  , ConsumerRecord k v -> PartitionId
crPartition :: !PartitionId  -- ^ Kafka partition this message was received from
  , ConsumerRecord k v -> Offset
crOffset    :: !Offset       -- ^ Offset within the 'crPartition' Kafka partition
  , ConsumerRecord k v -> Timestamp
crTimestamp :: !Timestamp    -- ^ Message timestamp
  , ConsumerRecord k v -> k
crKey       :: !k            -- ^ Message key
  , ConsumerRecord k v -> v
crValue     :: !v            -- ^ Message value
  }
  deriving (ConsumerRecord k v -> ConsumerRecord k v -> Bool
(ConsumerRecord k v -> ConsumerRecord k v -> Bool)
-> (ConsumerRecord k v -> ConsumerRecord k v -> Bool)
-> Eq (ConsumerRecord k v)
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
forall k v.
(Eq k, Eq v) =>
ConsumerRecord k v -> ConsumerRecord k v -> Bool
/= :: ConsumerRecord k v -> ConsumerRecord k v -> Bool
$c/= :: forall k v.
(Eq k, Eq v) =>
ConsumerRecord k v -> ConsumerRecord k v -> Bool
== :: ConsumerRecord k v -> ConsumerRecord k v -> Bool
$c== :: forall k v.
(Eq k, Eq v) =>
ConsumerRecord k v -> ConsumerRecord k v -> Bool
Eq, Int -> ConsumerRecord k v -> ShowS
[ConsumerRecord k v] -> ShowS
ConsumerRecord k v -> String
(Int -> ConsumerRecord k v -> ShowS)
-> (ConsumerRecord k v -> String)
-> ([ConsumerRecord k v] -> ShowS)
-> Show (ConsumerRecord k v)
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
forall k v. (Show k, Show v) => Int -> ConsumerRecord k v -> ShowS
forall k v. (Show k, Show v) => [ConsumerRecord k v] -> ShowS
forall k v. (Show k, Show v) => ConsumerRecord k v -> String
showList :: [ConsumerRecord k v] -> ShowS
$cshowList :: forall k v. (Show k, Show v) => [ConsumerRecord k v] -> ShowS
show :: ConsumerRecord k v -> String
$cshow :: forall k v. (Show k, Show v) => ConsumerRecord k v -> String
showsPrec :: Int -> ConsumerRecord k v -> ShowS
$cshowsPrec :: forall k v. (Show k, Show v) => Int -> ConsumerRecord k v -> ShowS
Show, ReadPrec [ConsumerRecord k v]
ReadPrec (ConsumerRecord k v)
Int -> ReadS (ConsumerRecord k v)
ReadS [ConsumerRecord k v]
(Int -> ReadS (ConsumerRecord k v))
-> ReadS [ConsumerRecord k v]
-> ReadPrec (ConsumerRecord k v)
-> ReadPrec [ConsumerRecord k v]
-> Read (ConsumerRecord k v)
forall a.
(Int -> ReadS a)
-> ReadS [a] -> ReadPrec a -> ReadPrec [a] -> Read a
forall k v. (Read k, Read v) => ReadPrec [ConsumerRecord k v]
forall k v. (Read k, Read v) => ReadPrec (ConsumerRecord k v)
forall k v. (Read k, Read v) => Int -> ReadS (ConsumerRecord k v)
forall k v. (Read k, Read v) => ReadS [ConsumerRecord k v]
readListPrec :: ReadPrec [ConsumerRecord k v]
$creadListPrec :: forall k v. (Read k, Read v) => ReadPrec [ConsumerRecord k v]
readPrec :: ReadPrec (ConsumerRecord k v)
$creadPrec :: forall k v. (Read k, Read v) => ReadPrec (ConsumerRecord k v)
readList :: ReadS [ConsumerRecord k v]
$creadList :: forall k v. (Read k, Read v) => ReadS [ConsumerRecord k v]
readsPrec :: Int -> ReadS (ConsumerRecord k v)
$creadsPrec :: forall k v. (Read k, Read v) => Int -> ReadS (ConsumerRecord k v)
Read, Typeable, (forall x. ConsumerRecord k v -> Rep (ConsumerRecord k v) x)
-> (forall x. Rep (ConsumerRecord k v) x -> ConsumerRecord k v)
-> Generic (ConsumerRecord k v)
forall x. Rep (ConsumerRecord k v) x -> ConsumerRecord k v
forall x. ConsumerRecord k v -> Rep (ConsumerRecord k v) x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
forall k v x. Rep (ConsumerRecord k v) x -> ConsumerRecord k v
forall k v x. ConsumerRecord k v -> Rep (ConsumerRecord k v) x
$cto :: forall k v x. Rep (ConsumerRecord k v) x -> ConsumerRecord k v
$cfrom :: forall k v x. ConsumerRecord k v -> Rep (ConsumerRecord k v) x
Generic)

instance Bifunctor ConsumerRecord where
  bimap :: (a -> b) -> (c -> d) -> ConsumerRecord a c -> ConsumerRecord b d
bimap f :: a -> b
f g :: c -> d
g (ConsumerRecord t :: TopicName
t p :: PartitionId
p o :: Offset
o ts :: Timestamp
ts k :: a
k v :: c
v) =  TopicName
-> PartitionId
-> Offset
-> Timestamp
-> b
-> d
-> ConsumerRecord b d
forall k v.
TopicName
-> PartitionId
-> Offset
-> Timestamp
-> k
-> v
-> ConsumerRecord k v
ConsumerRecord TopicName
t PartitionId
p Offset
o Timestamp
ts (a -> b
f a
k) (c -> d
g c
v)
  {-# INLINE bimap #-}

instance Functor (ConsumerRecord k) where
  fmap :: (a -> b) -> ConsumerRecord k a -> ConsumerRecord k b
fmap = (a -> b) -> ConsumerRecord k a -> ConsumerRecord k b
forall (p :: * -> * -> *) b c a.
Bifunctor p =>
(b -> c) -> p a b -> p a c
second
  {-# INLINE fmap #-}

instance Foldable (ConsumerRecord k) where
  foldMap :: (a -> m) -> ConsumerRecord k a -> m
foldMap f :: a -> m
f r :: ConsumerRecord k a
r = a -> m
f (ConsumerRecord k a -> a
forall k v. ConsumerRecord k v -> v
crValue ConsumerRecord k a
r)
  {-# INLINE foldMap #-}

instance Traversable (ConsumerRecord k) where
  traverse :: (a -> f b) -> ConsumerRecord k a -> f (ConsumerRecord k b)
traverse f :: a -> f b
f r :: ConsumerRecord k a
r = (\v :: b
v -> (a -> b) -> ConsumerRecord k a -> ConsumerRecord k b
forall b c a. (b -> c) -> ConsumerRecord a b -> ConsumerRecord a c
crMapValue (b -> a -> b
forall a b. a -> b -> a
const b
v) ConsumerRecord k a
r) (b -> ConsumerRecord k b) -> f b -> f (ConsumerRecord k b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> a -> f b
f (ConsumerRecord k a -> a
forall k v. ConsumerRecord k v -> v
crValue ConsumerRecord k a
r)
  {-# INLINE traverse #-}

instance Bifoldable ConsumerRecord where
  bifoldMap :: (a -> m) -> (b -> m) -> ConsumerRecord a b -> m
bifoldMap f :: a -> m
f g :: b -> m
g r :: ConsumerRecord a b
r = a -> m
f (ConsumerRecord a b -> a
forall k v. ConsumerRecord k v -> k
crKey ConsumerRecord a b
r) m -> m -> m
forall a. Monoid a => a -> a -> a
`mappend` b -> m
g (ConsumerRecord a b -> b
forall k v. ConsumerRecord k v -> v
crValue ConsumerRecord a b
r)
  {-# INLINE bifoldMap #-}

instance Bitraversable ConsumerRecord where
  bitraverse :: (a -> f c)
-> (b -> f d) -> ConsumerRecord a b -> f (ConsumerRecord c d)
bitraverse f :: a -> f c
f g :: b -> f d
g r :: ConsumerRecord a b
r = (\k :: c
k v :: d
v -> (a -> c) -> (b -> d) -> ConsumerRecord a b -> ConsumerRecord c d
forall (p :: * -> * -> *) a b c d.
Bifunctor p =>
(a -> b) -> (c -> d) -> p a c -> p b d
bimap (c -> a -> c
forall a b. a -> b -> a
const c
k) (d -> b -> d
forall a b. a -> b -> a
const d
v) ConsumerRecord a b
r) (c -> d -> ConsumerRecord c d)
-> f c -> f (d -> ConsumerRecord c d)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> a -> f c
f (ConsumerRecord a b -> a
forall k v. ConsumerRecord k v -> k
crKey ConsumerRecord a b
r) f (d -> ConsumerRecord c d) -> f d -> f (ConsumerRecord c d)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> b -> f d
g (ConsumerRecord a b -> b
forall k v. ConsumerRecord k v -> v
crValue ConsumerRecord a b
r)
  {-# INLINE bitraverse #-}

{-# DEPRECATED crMapKey "Isn't concern of this library. Use 'first'" #-}
crMapKey :: (k -> k') -> ConsumerRecord k v -> ConsumerRecord k' v
crMapKey :: (k -> k') -> ConsumerRecord k v -> ConsumerRecord k' v
crMapKey = (k -> k') -> ConsumerRecord k v -> ConsumerRecord k' v
forall (p :: * -> * -> *) a b c.
Bifunctor p =>
(a -> b) -> p a c -> p b c
first
{-# INLINE crMapKey #-}

{-# DEPRECATED crMapValue "Isn't concern of this library. Use 'second'" #-}
crMapValue :: (v -> v') -> ConsumerRecord k v -> ConsumerRecord k v'
crMapValue :: (v -> v') -> ConsumerRecord k v -> ConsumerRecord k v'
crMapValue = (v -> v') -> ConsumerRecord k v -> ConsumerRecord k v'
forall (p :: * -> * -> *) b c a.
Bifunctor p =>
(b -> c) -> p a b -> p a c
second
{-# INLINE crMapValue #-}

{-# DEPRECATED crMapKV "Isn't concern of this library. Use 'bimap'" #-}
crMapKV :: (k -> k') -> (v -> v') -> ConsumerRecord k v -> ConsumerRecord k' v'
crMapKV :: (k -> k')
-> (v -> v') -> ConsumerRecord k v -> ConsumerRecord k' v'
crMapKV = (k -> k')
-> (v -> v') -> ConsumerRecord k v -> ConsumerRecord k' v'
forall (p :: * -> * -> *) a b c d.
Bifunctor p =>
(a -> b) -> (c -> d) -> p a c -> p b d
bimap
{-# INLINE crMapKV #-}

{-# DEPRECATED sequenceFirst "Isn't concern of this library. Use @'bitraverse' 'id' 'pure'@" #-}
sequenceFirst :: (Bitraversable t, Applicative f) => t (f k) v -> f (t k v)
sequenceFirst :: t (f k) v -> f (t k v)
sequenceFirst = (f k -> f k) -> (v -> f v) -> t (f k) v -> f (t k v)
forall (t :: * -> * -> *) (f :: * -> *) a c b d.
(Bitraversable t, Applicative f) =>
(a -> f c) -> (b -> f d) -> t a b -> f (t c d)
bitraverse f k -> f k
forall a. a -> a
id v -> f v
forall (f :: * -> *) a. Applicative f => a -> f a
pure
{-# INLINE sequenceFirst #-}

{-# DEPRECATED traverseFirst "Isn't concern of this library. Use @'bitraverse' f 'pure'@"  #-}
traverseFirst :: (Bitraversable t, Applicative f)
              => (k -> f k')
              -> t k v
              -> f (t k' v)
traverseFirst :: (k -> f k') -> t k v -> f (t k' v)
traverseFirst f :: k -> f k'
f = (k -> f k') -> (v -> f v) -> t k v -> f (t k' v)
forall (t :: * -> * -> *) (f :: * -> *) a c b d.
(Bitraversable t, Applicative f) =>
(a -> f c) -> (b -> f d) -> t a b -> f (t c d)
bitraverse k -> f k'
f v -> f v
forall (f :: * -> *) a. Applicative f => a -> f a
pure
{-# INLINE traverseFirst #-}

{-# DEPRECATED traverseFirstM "Isn't concern of this library. Use @'bitraverse' 'id' 'pure' '<$>' 'bitraverse' f 'pure' r@"  #-}
traverseFirstM :: (Bitraversable t, Applicative f, Monad m)
               => (k -> m (f k'))
               -> t k v
               -> m (f (t k' v))
traverseFirstM :: (k -> m (f k')) -> t k v -> m (f (t k' v))
traverseFirstM f :: k -> m (f k')
f r :: t k v
r = (f k' -> f k') -> (v -> f v) -> t (f k') v -> f (t k' v)
forall (t :: * -> * -> *) (f :: * -> *) a c b d.
(Bitraversable t, Applicative f) =>
(a -> f c) -> (b -> f d) -> t a b -> f (t c d)
bitraverse f k' -> f k'
forall a. a -> a
id v -> f v
forall (f :: * -> *) a. Applicative f => a -> f a
pure (t (f k') v -> f (t k' v)) -> m (t (f k') v) -> m (f (t k' v))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (k -> m (f k')) -> (v -> m v) -> t k v -> m (t (f k') v)
forall (t :: * -> * -> *) (f :: * -> *) a c b d.
(Bitraversable t, Applicative f) =>
(a -> f c) -> (b -> f d) -> t a b -> f (t c d)
bitraverse k -> m (f k')
f v -> m v
forall (f :: * -> *) a. Applicative f => a -> f a
pure t k v
r
{-# INLINE traverseFirstM #-}

{-# DEPRECATED traverseM "Isn't concern of this library. Use @'sequenceA' '<$>' 'traverse' f r@"  #-}
traverseM :: (Traversable t, Applicative f, Monad m)
          => (v -> m (f v'))
          -> t v
          -> m (f (t v'))
traverseM :: (v -> m (f v')) -> t v -> m (f (t v'))
traverseM f :: v -> m (f v')
f r :: t v
r = t (f v') -> f (t v')
forall (t :: * -> *) (f :: * -> *) a.
(Traversable t, Applicative f) =>
t (f a) -> f (t a)
sequenceA (t (f v') -> f (t v')) -> m (t (f v')) -> m (f (t v'))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (v -> m (f v')) -> t v -> m (t (f v'))
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
traverse v -> m (f v')
f t v
r
{-# INLINE traverseM #-}

{-# DEPRECATED bitraverseM "Isn't concern of this library. Use @'Data.Bitraversable.bisequenceA' '<$>' 'bimapM' f g r@"  #-}
bitraverseM :: (Bitraversable t, Applicative f, Monad m)
            => (k -> m (f k'))
            -> (v -> m (f v'))
            -> t k v
            -> m (f (t k' v'))
bitraverseM :: (k -> m (f k')) -> (v -> m (f v')) -> t k v -> m (f (t k' v'))
bitraverseM f :: k -> m (f k')
f g :: v -> m (f v')
g r :: t k v
r = t (f k') (f v') -> f (t k' v')
forall (t :: * -> * -> *) (f :: * -> *) a b.
(Bitraversable t, Applicative f) =>
t (f a) (f b) -> f (t a b)
bisequence (t (f k') (f v') -> f (t k' v'))
-> m (t (f k') (f v')) -> m (f (t k' v'))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (k -> m (f k')) -> (v -> m (f v')) -> t k v -> m (t (f k') (f v'))
forall (t :: * -> * -> *) (f :: * -> *) a c b d.
(Bitraversable t, Applicative f) =>
(a -> f c) -> (b -> f d) -> t a b -> f (t c d)
bimapM k -> m (f k')
f v -> m (f v')
g t k v
r
{-# INLINE bitraverseM #-}