{-# LANGUAGE DeriveDataTypeable #-} {-# LANGUAGE DeriveGeneric #-} module Kafka.Consumer.Types ( KafkaConsumer(..) , ConsumerGroupId(..) , Offset(..) , OffsetReset(..) , RebalanceEvent(..) , PartitionOffset(..) , SubscribedPartitions(..) , Timestamp(..) , OffsetCommit(..) , OffsetStoreSync(..) , OffsetStoreMethod(..) , TopicPartition(..) , ConsumerRecord(..) , crMapKey , crMapValue , crMapKV -- why are these here? -- * Deprecated , sequenceFirst , traverseFirst , traverseFirstM , traverseM , bitraverseM ) where import Data.Bifoldable (Bifoldable (..)) import Data.Bifunctor (Bifunctor (..)) import Data.Bitraversable (Bitraversable (..), bimapM, bisequenceA) import Data.Int (Int64) import Data.Text (Text) import Data.Typeable (Typeable) import GHC.Generics (Generic) import Kafka.Internal.Setup (HasKafka (..), HasKafkaConf (..), Kafka (..), KafkaConf (..)) import Kafka.Types (Millis (..), PartitionId (..), TopicName (..)) data KafkaConsumer = KafkaConsumer { kcKafkaPtr :: !Kafka , kcKafkaConf :: !KafkaConf } instance HasKafka KafkaConsumer where getKafka = kcKafkaPtr {-# INLINE getKafka #-} instance HasKafkaConf KafkaConsumer where getKafkaConf = kcKafkaConf {-# INLINE getKafkaConf #-} newtype ConsumerGroupId = ConsumerGroupId { unConsumerGroupId :: Text } deriving (Show, Ord, Eq, Generic) newtype Offset = Offset { unOffset :: Int64 } deriving (Show, Eq, Ord, Read, Generic) data OffsetReset = Earliest | Latest deriving (Show, Eq, Generic) -- | A set of events which happen during the rebalancing process data RebalanceEvent = -- | Happens before Kafka Client confirms new assignment RebalanceBeforeAssign [(TopicName, PartitionId)] -- | Happens after the new assignment is confirmed | RebalanceAssign [(TopicName, PartitionId)] -- | Happens before Kafka Client confirms partitions rejection | RebalanceBeforeRevoke [(TopicName, PartitionId)] -- | Happens after the rejection is confirmed | RebalanceRevoke [(TopicName, PartitionId)] deriving (Eq, Show, Generic) data PartitionOffset = PartitionOffsetBeginning | PartitionOffsetEnd | PartitionOffset Int64 | PartitionOffsetStored | PartitionOffsetInvalid deriving (Eq, Show, Generic) data SubscribedPartitions = SubscribedPartitions [PartitionId] | SubscribedPartitionsAll deriving (Show, Eq, Generic) data Timestamp = CreateTime !Millis | LogAppendTime !Millis | NoTimestamp deriving (Show, Eq, Read, Generic) -- | Offsets commit mode data OffsetCommit = OffsetCommit -- ^ Forces consumer to block until the broker offsets commit is done | OffsetCommitAsync -- ^ Offsets will be committed in a non-blocking way deriving (Show, Eq, Generic) -- | Indicates how offsets are to be synced to disk data OffsetStoreSync = OffsetSyncDisable -- ^ Do not sync offsets (in Kafka: -1) | OffsetSyncImmediate -- ^ Sync immediately after each offset commit (in Kafka: 0) | OffsetSyncInterval Int -- ^ Sync after specified interval in millis deriving (Show, Eq, Generic) -- | Indicates the method of storing the offsets data OffsetStoreMethod = OffsetStoreBroker -- ^ Offsets are stored in Kafka broker (preferred) | OffsetStoreFile FilePath OffsetStoreSync -- ^ Offsets are stored in a file (and synced to disk according to the sync policy) deriving (Show, Eq, Generic) -- | Kafka topic partition structure data TopicPartition = TopicPartition { tpTopicName :: TopicName , tpPartition :: PartitionId , tpOffset :: PartitionOffset } deriving (Show, Eq, Generic) -- | Represents a /received/ message from Kafka (i.e. used in a consumer) data ConsumerRecord k v = ConsumerRecord { crTopic :: !TopicName -- ^ Kafka topic this message was received from , crPartition :: !PartitionId -- ^ Kafka partition this message was received from , crOffset :: !Offset -- ^ Offset within the 'crPartition' Kafka partition , crTimestamp :: !Timestamp -- ^ Message timestamp , crKey :: !k , crValue :: !v } deriving (Eq, Show, Read, Typeable, Generic) instance Bifunctor ConsumerRecord where bimap f g (ConsumerRecord t p o ts k v) = ConsumerRecord t p o ts (f k) (g v) {-# INLINE bimap #-} instance Functor (ConsumerRecord k) where fmap = second {-# INLINE fmap #-} instance Foldable (ConsumerRecord k) where foldMap f r = f (crValue r) {-# INLINE foldMap #-} instance Traversable (ConsumerRecord k) where traverse f r = (\v -> crMapValue (const v) r) <$> f (crValue r) {-# INLINE traverse #-} instance Bifoldable ConsumerRecord where bifoldMap f g r = f (crKey r) `mappend` g (crValue r) {-# INLINE bifoldMap #-} instance Bitraversable ConsumerRecord where bitraverse f g r = (\k v -> bimap (const k) (const v) r) <$> f (crKey r) <*> g (crValue r) {-# INLINE bitraverse #-} crMapKey :: (k -> k') -> ConsumerRecord k v -> ConsumerRecord k' v crMapKey = first {-# INLINE crMapKey #-} crMapValue :: (v -> v') -> ConsumerRecord k v -> ConsumerRecord k v' crMapValue = second {-# INLINE crMapValue #-} crMapKV :: (k -> k') -> (v -> v') -> ConsumerRecord k v -> ConsumerRecord k' v' crMapKV = bimap {-# INLINE crMapKV #-} {-# DEPRECATED sequenceFirst "Isn't concern of this library. Use 'bitraverse id pure'" #-} sequenceFirst :: (Bitraversable t, Applicative f) => t (f k) v -> f (t k v) sequenceFirst = bitraverse id pure {-# INLINE sequenceFirst #-} {-# DEPRECATED traverseFirst "Isn't concern of this library. Use 'bitraverse f pure'" #-} traverseFirst :: (Bitraversable t, Applicative f) => (k -> f k') -> t k v -> f (t k' v) traverseFirst f = bitraverse f pure {-# INLINE traverseFirst #-} {-# DEPRECATED traverseFirstM "Isn't concern of this library. Use 'bitraverse id pure <$> bitraverse f pure r'" #-} traverseFirstM :: (Bitraversable t, Applicative f, Monad m) => (k -> m (f k')) -> t k v -> m (f (t k' v)) traverseFirstM f r = bitraverse id pure <$> bitraverse f pure r {-# INLINE traverseFirstM #-} {-# DEPRECATED traverseM "Isn't concern of this library. Use 'sequenceA <$> traverse f r'" #-} traverseM :: (Traversable t, Applicative f, Monad m) => (v -> m (f v')) -> t v -> m (f (t v')) traverseM f r = sequenceA <$> traverse f r {-# INLINE traverseM #-} {-# DEPRECATED bitraverseM "Isn't concern of this library. Use 'bisequenceA <$> bimapM f g r'" #-} bitraverseM :: (Bitraversable t, Applicative f, Monad m) => (k -> m (f k')) -> (v -> m (f v')) -> t k v -> m (f (t k' v')) bitraverseM f g r = bisequenceA <$> bimapM f g r {-# INLINE bitraverseM #-}