{-# LANGUAGE DeriveDataTypeable #-}
module Kafka.Consumer.Types
where
import Data.Bifoldable
import Data.Bifunctor
import Data.Bitraversable
import Data.Int
import Data.Typeable
import Kafka.Internal.Setup
import Kafka.Types
data KafkaConsumer = KafkaConsumer
{ kcKafkaPtr :: !Kafka
, kcKafkaConf :: !KafkaConf
}
instance HasKafka KafkaConsumer where
getKafka = kcKafkaPtr
{-# INLINE getKafka #-}
instance HasKafkaConf KafkaConsumer where
getKafkaConf = kcKafkaConf
{-# INLINE getKafkaConf #-}
newtype ConsumerGroupId = ConsumerGroupId { unConsumerGroupId :: String} deriving (Show, Ord, Eq)
newtype Offset = Offset { unOffset :: Int64 } deriving (Show, Eq, Ord, Read)
data OffsetReset = Earliest | Latest deriving (Show, Eq)
data RebalanceEvent =
RebalanceBeforeAssign [(TopicName, PartitionId)]
| RebalanceAssign [(TopicName, PartitionId)]
| RebalanceBeforeRevoke [(TopicName, PartitionId)]
| RebalanceRevoke [(TopicName, PartitionId)]
deriving (Eq, Show)
data PartitionOffset =
PartitionOffsetBeginning
| PartitionOffsetEnd
| PartitionOffset Int64
| PartitionOffsetStored
| PartitionOffsetInvalid
deriving (Eq, Show)
data SubscribedPartitions
= SubscribedPartitions [PartitionId]
| SubscribedPartitionsAll
deriving (Show, Eq)
data Timestamp =
CreateTime !Millis
| LogAppendTime !Millis
| NoTimestamp
deriving (Show, Eq, Read)
data OffsetCommit =
OffsetCommit
| OffsetCommitAsync
deriving (Show, Eq)
data OffsetStoreSync =
OffsetSyncDisable
| OffsetSyncImmediate
| OffsetSyncInterval Int
data OffsetStoreMethod =
OffsetStoreBroker
| OffsetStoreFile FilePath OffsetStoreSync
data TopicPartition = TopicPartition
{ tpTopicName :: TopicName
, tpPartition :: PartitionId
, tpOffset :: PartitionOffset } deriving (Show, Eq)
data ConsumerRecord k v = ConsumerRecord
{ crTopic :: !TopicName
, crPartition :: !PartitionId
, crOffset :: !Offset
, crTimestamp :: !Timestamp
, crKey :: !k
, crValue :: !v
}
deriving (Eq, Show, Read, Typeable)
instance Bifunctor ConsumerRecord where
bimap f g (ConsumerRecord t p o ts k v) = ConsumerRecord t p o ts (f k) (g v)
{-# INLINE bimap #-}
instance Functor (ConsumerRecord k) where
fmap = second
{-# INLINE fmap #-}
instance Foldable (ConsumerRecord k) where
foldMap f r = f (crValue r)
{-# INLINE foldMap #-}
instance Traversable (ConsumerRecord k) where
traverse f r = (\v -> crMapValue (const v) r) <$> f (crValue r)
{-# INLINE traverse #-}
instance Bifoldable ConsumerRecord where
bifoldMap f g r = f (crKey r) `mappend` g (crValue r)
{-# INLINE bifoldMap #-}
instance Bitraversable ConsumerRecord where
bitraverse f g r = (\k v -> bimap (const k) (const v) r) <$> f (crKey r) <*> g (crValue r)
{-# INLINE bitraverse #-}
crMapKey :: (k -> k') -> ConsumerRecord k v -> ConsumerRecord k' v
crMapKey = first
{-# INLINE crMapKey #-}
crMapValue :: (v -> v') -> ConsumerRecord k v -> ConsumerRecord k v'
crMapValue = second
{-# INLINE crMapValue #-}
crMapKV :: (k -> k') -> (v -> v') -> ConsumerRecord k v -> ConsumerRecord k' v'
crMapKV = bimap
{-# INLINE crMapKV #-}
sequenceFirst :: (Bitraversable t, Applicative f) => t (f k) v -> f (t k v)
sequenceFirst = bitraverse id pure
{-# INLINE sequenceFirst #-}
traverseFirst :: (Bitraversable t, Applicative f)
=> (k -> f k')
-> t k v
-> f (t k' v)
traverseFirst f = bitraverse f pure
{-# INLINE traverseFirst #-}
traverseFirstM :: (Bitraversable t, Applicative f, Monad m)
=> (k -> m (f k'))
-> t k v
-> m (f (t k' v))
traverseFirstM f r = bitraverse id pure <$> bitraverse f pure r
{-# INLINE traverseFirstM #-}
traverseM :: (Traversable t, Applicative f, Monad m)
=> (v -> m (f v'))
-> t v
-> m (f (t v'))
traverseM f r = sequenceA <$> traverse f r
{-# INLINE traverseM #-}
bitraverseM :: (Bitraversable t, Applicative f, Monad m)
=> (k -> m (f k'))
-> (v -> m (f v'))
-> t k v
-> m (f (t k' v'))
bitraverseM f g r = bisequenceA <$> bimapM f g r
{-# INLINE bitraverseM #-}