{-# language FlexibleContexts #-}
{-# language TypeFamilies #-}
{-# language TypeOperators #-}
module Mu.Kafka.Consumer (
kafkaSource
, kafkaSourceNoClose
, kafkaSourceAutoClose
, module X
) where
import Conduit (mapC)
import Control.Monad.IO.Class
import Control.Monad.Trans.Resource
import qualified Data.Avro as A
import Data.ByteString
import Data.Conduit
import Mu.Schema
import qualified Kafka.Conduit.Source as S
import Kafka.Conduit.Combinators as X
import Kafka.Consumer as X
import Mu.Kafka.Internal
fromCR
:: ( FromSchema sch sty t
, A.FromAvro (WithSchema sch sty t)
, A.HasAvroSchema (WithSchema sch sty t) )
=> Proxy sch
-> ConsumerRecord (Maybe ByteString) (Maybe ByteString)
-> ConsumerRecord (Maybe ByteString) (Maybe t)
fromCR :: Proxy sch
-> ConsumerRecord (Maybe ByteString) (Maybe ByteString)
-> ConsumerRecord (Maybe ByteString) (Maybe t)
fromCR proxy :: Proxy sch
proxy (ConsumerRecord t :: TopicName
t p :: PartitionId
p o :: Offset
o ts :: Timestamp
ts k :: Maybe ByteString
k v :: Maybe ByteString
v)
= TopicName
-> PartitionId
-> Offset
-> Timestamp
-> Maybe ByteString
-> Maybe t
-> ConsumerRecord (Maybe ByteString) (Maybe t)
forall k v.
TopicName
-> PartitionId
-> Offset
-> Timestamp
-> k
-> v
-> ConsumerRecord k v
ConsumerRecord TopicName
t PartitionId
p Offset
o Timestamp
ts Maybe ByteString
k (Maybe ByteString
v Maybe ByteString -> (ByteString -> Maybe t) -> Maybe t
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Proxy sch -> ByteString -> Maybe t
forall (sch :: Schema * *) sty t.
(FromSchema sch sty t, FromAvro (WithSchema sch sty t),
HasAvroSchema (WithSchema sch sty t)) =>
Proxy sch -> ByteString -> Maybe t
fromBS Proxy sch
proxy)
kafkaSource
:: ( MonadResource m
, FromSchema sch sty t
, A.FromAvro (WithSchema sch sty t)
, A.HasAvroSchema (WithSchema sch sty t) )
=> Proxy sch
-> ConsumerProperties -> Subscription -> Timeout
-> ConduitT () (Either KafkaError (ConsumerRecord (Maybe ByteString) (Maybe t))) m ()
kafkaSource :: Proxy sch
-> ConsumerProperties
-> Subscription
-> Timeout
-> ConduitT
()
(Either KafkaError (ConsumerRecord (Maybe ByteString) (Maybe t)))
m
()
kafkaSource proxy :: Proxy sch
proxy props :: ConsumerProperties
props sub :: Subscription
sub ts :: Timeout
ts =
ConsumerProperties
-> Subscription
-> Timeout
-> ConduitT
()
(Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
m
()
forall (m :: * -> *).
MonadResource m =>
ConsumerProperties
-> Subscription
-> Timeout
-> ConduitT
()
(Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
m
()
S.kafkaSource ConsumerProperties
props Subscription
sub Timeout
ts ConduitT
()
(Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
m
()
-> ConduitM
(Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
(Either KafkaError (ConsumerRecord (Maybe ByteString) (Maybe t)))
m
()
-> ConduitT
()
(Either KafkaError (ConsumerRecord (Maybe ByteString) (Maybe t)))
m
()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| (Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))
-> Either KafkaError (ConsumerRecord (Maybe ByteString) (Maybe t)))
-> ConduitM
(Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
(Either KafkaError (ConsumerRecord (Maybe ByteString) (Maybe t)))
m
()
forall (m :: * -> *) a b. Monad m => (a -> b) -> ConduitT a b m ()
mapC ((ConsumerRecord (Maybe ByteString) (Maybe ByteString)
-> ConsumerRecord (Maybe ByteString) (Maybe t))
-> Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))
-> Either KafkaError (ConsumerRecord (Maybe ByteString) (Maybe t))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Proxy sch
-> ConsumerRecord (Maybe ByteString) (Maybe ByteString)
-> ConsumerRecord (Maybe ByteString) (Maybe t)
forall (sch :: Schema * *) sty t.
(FromSchema sch sty t, FromAvro (WithSchema sch sty t),
HasAvroSchema (WithSchema sch sty t)) =>
Proxy sch
-> ConsumerRecord (Maybe ByteString) (Maybe ByteString)
-> ConsumerRecord (Maybe ByteString) (Maybe t)
fromCR Proxy sch
proxy))
kafkaSourceNoClose
:: ( MonadIO m
, FromSchema sch sty t
, A.FromAvro (WithSchema sch sty t)
, A.HasAvroSchema (WithSchema sch sty t) )
=> Proxy sch
-> KafkaConsumer -> Timeout
-> ConduitT () (Either KafkaError (ConsumerRecord (Maybe ByteString) (Maybe t))) m ()
kafkaSourceNoClose :: Proxy sch
-> KafkaConsumer
-> Timeout
-> ConduitT
()
(Either KafkaError (ConsumerRecord (Maybe ByteString) (Maybe t)))
m
()
kafkaSourceNoClose proxy :: Proxy sch
proxy c :: KafkaConsumer
c t :: Timeout
t
= KafkaConsumer
-> Timeout
-> ConduitT
()
(Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
m
()
forall (m :: * -> *).
MonadIO m =>
KafkaConsumer
-> Timeout
-> ConduitT
()
(Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
m
()
S.kafkaSourceNoClose KafkaConsumer
c Timeout
t ConduitT
()
(Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
m
()
-> ConduitM
(Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
(Either KafkaError (ConsumerRecord (Maybe ByteString) (Maybe t)))
m
()
-> ConduitT
()
(Either KafkaError (ConsumerRecord (Maybe ByteString) (Maybe t)))
m
()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| (Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))
-> Either KafkaError (ConsumerRecord (Maybe ByteString) (Maybe t)))
-> ConduitM
(Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
(Either KafkaError (ConsumerRecord (Maybe ByteString) (Maybe t)))
m
()
forall (m :: * -> *) a b. Monad m => (a -> b) -> ConduitT a b m ()
mapC ((ConsumerRecord (Maybe ByteString) (Maybe ByteString)
-> ConsumerRecord (Maybe ByteString) (Maybe t))
-> Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))
-> Either KafkaError (ConsumerRecord (Maybe ByteString) (Maybe t))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Proxy sch
-> ConsumerRecord (Maybe ByteString) (Maybe ByteString)
-> ConsumerRecord (Maybe ByteString) (Maybe t)
forall (sch :: Schema * *) sty t.
(FromSchema sch sty t, FromAvro (WithSchema sch sty t),
HasAvroSchema (WithSchema sch sty t)) =>
Proxy sch
-> ConsumerRecord (Maybe ByteString) (Maybe ByteString)
-> ConsumerRecord (Maybe ByteString) (Maybe t)
fromCR Proxy sch
proxy))
kafkaSourceAutoClose
:: ( MonadResource m
, FromSchema sch sty t
, A.FromAvro (WithSchema sch sty t)
, A.HasAvroSchema (WithSchema sch sty t) )
=> Proxy sch
-> KafkaConsumer -> Timeout
-> ConduitT () (Either KafkaError (ConsumerRecord (Maybe ByteString) (Maybe t))) m ()
kafkaSourceAutoClose :: Proxy sch
-> KafkaConsumer
-> Timeout
-> ConduitT
()
(Either KafkaError (ConsumerRecord (Maybe ByteString) (Maybe t)))
m
()
kafkaSourceAutoClose proxy :: Proxy sch
proxy c :: KafkaConsumer
c t :: Timeout
t
= KafkaConsumer
-> Timeout
-> ConduitT
()
(Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
m
()
forall (m :: * -> *).
MonadResource m =>
KafkaConsumer
-> Timeout
-> ConduitT
()
(Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
m
()
S.kafkaSourceAutoClose KafkaConsumer
c Timeout
t ConduitT
()
(Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
m
()
-> ConduitM
(Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
(Either KafkaError (ConsumerRecord (Maybe ByteString) (Maybe t)))
m
()
-> ConduitT
()
(Either KafkaError (ConsumerRecord (Maybe ByteString) (Maybe t)))
m
()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| (Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))
-> Either KafkaError (ConsumerRecord (Maybe ByteString) (Maybe t)))
-> ConduitM
(Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
(Either KafkaError (ConsumerRecord (Maybe ByteString) (Maybe t)))
m
()
forall (m :: * -> *) a b. Monad m => (a -> b) -> ConduitT a b m ()
mapC ((ConsumerRecord (Maybe ByteString) (Maybe ByteString)
-> ConsumerRecord (Maybe ByteString) (Maybe t))
-> Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))
-> Either KafkaError (ConsumerRecord (Maybe ByteString) (Maybe t))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Proxy sch
-> ConsumerRecord (Maybe ByteString) (Maybe ByteString)
-> ConsumerRecord (Maybe ByteString) (Maybe t)
forall (sch :: Schema * *) sty t.
(FromSchema sch sty t, FromAvro (WithSchema sch sty t),
HasAvroSchema (WithSchema sch sty t)) =>
Proxy sch
-> ConsumerRecord (Maybe ByteString) (Maybe ByteString)
-> ConsumerRecord (Maybe ByteString) (Maybe t)
fromCR Proxy sch
proxy))