{-# language FlexibleContexts #-}
{-# language TypeFamilies     #-}
{-# language TypeOperators    #-}
{-|
Description : Kafka consumers as streams of Mu terms

This module allows you to receive values from
a Kafka topic, and treat then as Mu terms, or
your Haskell types if a conversion exists.

This module is a wrapper over 'Kafka.Conduit.Source'
from the (awesome) package @hw-kafka-client@.
-}
module Mu.Kafka.Consumer (
  kafkaSource
, kafkaSourceNoClose
, kafkaSourceAutoClose
, module X
) where

import           Conduit                      (mapC)
import           Control.Monad.IO.Class
import           Control.Monad.Trans.Resource
import qualified Data.Avro                    as A
import           Data.ByteString
import           Data.Conduit
import           Mu.Schema

import qualified Kafka.Conduit.Source         as S

import           Kafka.Conduit.Combinators    as X
import           Kafka.Consumer               as X

import           Mu.Kafka.Internal

fromCR
  :: ( FromSchema sch sty t
     , A.FromAvro (WithSchema sch sty t)
     , A.HasAvroSchema (WithSchema sch sty t) )
  => Proxy sch
  -> ConsumerRecord (Maybe ByteString) (Maybe ByteString)
  -> ConsumerRecord (Maybe ByteString) (Maybe t)
fromCR :: Proxy sch
-> ConsumerRecord (Maybe ByteString) (Maybe ByteString)
-> ConsumerRecord (Maybe ByteString) (Maybe t)
fromCR proxy :: Proxy sch
proxy (ConsumerRecord t :: TopicName
t p :: PartitionId
p o :: Offset
o ts :: Timestamp
ts k :: Maybe ByteString
k v :: Maybe ByteString
v)
  = TopicName
-> PartitionId
-> Offset
-> Timestamp
-> Maybe ByteString
-> Maybe t
-> ConsumerRecord (Maybe ByteString) (Maybe t)
forall k v.
TopicName
-> PartitionId
-> Offset
-> Timestamp
-> k
-> v
-> ConsumerRecord k v
ConsumerRecord TopicName
t PartitionId
p Offset
o Timestamp
ts Maybe ByteString
k (Maybe ByteString
v Maybe ByteString -> (ByteString -> Maybe t) -> Maybe t
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Proxy sch -> ByteString -> Maybe t
forall (sch :: Schema * *) sty t.
(FromSchema sch sty t, FromAvro (WithSchema sch sty t),
 HasAvroSchema (WithSchema sch sty t)) =>
Proxy sch -> ByteString -> Maybe t
fromBS Proxy sch
proxy)

-- | Creates a kafka producer for given properties and returns a `Source`.
--
-- This method of creating a `Source` represents a simple case
-- and does not provide access to `KafkaProducer`. For more complex scenarious
-- 'kafkaSinkNoClose' or 'kafkaSinkAutoClose' can be used.
kafkaSource
  :: ( MonadResource m
     , FromSchema sch sty t
     , A.FromAvro (WithSchema sch sty t)
     , A.HasAvroSchema (WithSchema sch sty t) )
  => Proxy sch
  -> ConsumerProperties -> Subscription -> Timeout
  -> ConduitT () (Either KafkaError (ConsumerRecord (Maybe ByteString) (Maybe t))) m ()
kafkaSource :: Proxy sch
-> ConsumerProperties
-> Subscription
-> Timeout
-> ConduitT
     ()
     (Either KafkaError (ConsumerRecord (Maybe ByteString) (Maybe t)))
     m
     ()
kafkaSource proxy :: Proxy sch
proxy props :: ConsumerProperties
props sub :: Subscription
sub ts :: Timeout
ts =
  ConsumerProperties
-> Subscription
-> Timeout
-> ConduitT
     ()
     (Either
        KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
     m
     ()
forall (m :: * -> *).
MonadResource m =>
ConsumerProperties
-> Subscription
-> Timeout
-> ConduitT
     ()
     (Either
        KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
     m
     ()
S.kafkaSource ConsumerProperties
props Subscription
sub Timeout
ts ConduitT
  ()
  (Either
     KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
  m
  ()
-> ConduitM
     (Either
        KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
     (Either KafkaError (ConsumerRecord (Maybe ByteString) (Maybe t)))
     m
     ()
-> ConduitT
     ()
     (Either KafkaError (ConsumerRecord (Maybe ByteString) (Maybe t)))
     m
     ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| (Either
   KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))
 -> Either KafkaError (ConsumerRecord (Maybe ByteString) (Maybe t)))
-> ConduitM
     (Either
        KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
     (Either KafkaError (ConsumerRecord (Maybe ByteString) (Maybe t)))
     m
     ()
forall (m :: * -> *) a b. Monad m => (a -> b) -> ConduitT a b m ()
mapC ((ConsumerRecord (Maybe ByteString) (Maybe ByteString)
 -> ConsumerRecord (Maybe ByteString) (Maybe t))
-> Either
     KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))
-> Either KafkaError (ConsumerRecord (Maybe ByteString) (Maybe t))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Proxy sch
-> ConsumerRecord (Maybe ByteString) (Maybe ByteString)
-> ConsumerRecord (Maybe ByteString) (Maybe t)
forall (sch :: Schema * *) sty t.
(FromSchema sch sty t, FromAvro (WithSchema sch sty t),
 HasAvroSchema (WithSchema sch sty t)) =>
Proxy sch
-> ConsumerRecord (Maybe ByteString) (Maybe ByteString)
-> ConsumerRecord (Maybe ByteString) (Maybe t)
fromCR Proxy sch
proxy))

-- | Create a `Source` for a given `KafkaConsumer`.
-- The consumer will NOT be closed automatically when the `Source` is closed.
kafkaSourceNoClose
  :: ( MonadIO m
     , FromSchema sch sty t
     , A.FromAvro (WithSchema sch sty t)
     , A.HasAvroSchema (WithSchema sch sty t) )
  => Proxy sch
  -> KafkaConsumer -> Timeout
  -> ConduitT () (Either KafkaError (ConsumerRecord (Maybe ByteString) (Maybe t))) m ()
kafkaSourceNoClose :: Proxy sch
-> KafkaConsumer
-> Timeout
-> ConduitT
     ()
     (Either KafkaError (ConsumerRecord (Maybe ByteString) (Maybe t)))
     m
     ()
kafkaSourceNoClose proxy :: Proxy sch
proxy c :: KafkaConsumer
c t :: Timeout
t
  = KafkaConsumer
-> Timeout
-> ConduitT
     ()
     (Either
        KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
     m
     ()
forall (m :: * -> *).
MonadIO m =>
KafkaConsumer
-> Timeout
-> ConduitT
     ()
     (Either
        KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
     m
     ()
S.kafkaSourceNoClose KafkaConsumer
c Timeout
t ConduitT
  ()
  (Either
     KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
  m
  ()
-> ConduitM
     (Either
        KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
     (Either KafkaError (ConsumerRecord (Maybe ByteString) (Maybe t)))
     m
     ()
-> ConduitT
     ()
     (Either KafkaError (ConsumerRecord (Maybe ByteString) (Maybe t)))
     m
     ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| (Either
   KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))
 -> Either KafkaError (ConsumerRecord (Maybe ByteString) (Maybe t)))
-> ConduitM
     (Either
        KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
     (Either KafkaError (ConsumerRecord (Maybe ByteString) (Maybe t)))
     m
     ()
forall (m :: * -> *) a b. Monad m => (a -> b) -> ConduitT a b m ()
mapC ((ConsumerRecord (Maybe ByteString) (Maybe ByteString)
 -> ConsumerRecord (Maybe ByteString) (Maybe t))
-> Either
     KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))
-> Either KafkaError (ConsumerRecord (Maybe ByteString) (Maybe t))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Proxy sch
-> ConsumerRecord (Maybe ByteString) (Maybe ByteString)
-> ConsumerRecord (Maybe ByteString) (Maybe t)
forall (sch :: Schema * *) sty t.
(FromSchema sch sty t, FromAvro (WithSchema sch sty t),
 HasAvroSchema (WithSchema sch sty t)) =>
Proxy sch
-> ConsumerRecord (Maybe ByteString) (Maybe ByteString)
-> ConsumerRecord (Maybe ByteString) (Maybe t)
fromCR Proxy sch
proxy))


-- | Create a `Source` for a given `KafkaConsumer`.
-- The consumer will be closed automatically when the `Source` is closed.
kafkaSourceAutoClose
  :: ( MonadResource m
     , FromSchema sch sty t
     , A.FromAvro (WithSchema sch sty t)
     , A.HasAvroSchema (WithSchema sch sty t) )
  => Proxy sch
  -> KafkaConsumer -> Timeout
  -> ConduitT () (Either KafkaError (ConsumerRecord (Maybe ByteString) (Maybe t))) m ()
kafkaSourceAutoClose :: Proxy sch
-> KafkaConsumer
-> Timeout
-> ConduitT
     ()
     (Either KafkaError (ConsumerRecord (Maybe ByteString) (Maybe t)))
     m
     ()
kafkaSourceAutoClose proxy :: Proxy sch
proxy c :: KafkaConsumer
c t :: Timeout
t
  = KafkaConsumer
-> Timeout
-> ConduitT
     ()
     (Either
        KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
     m
     ()
forall (m :: * -> *).
MonadResource m =>
KafkaConsumer
-> Timeout
-> ConduitT
     ()
     (Either
        KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
     m
     ()
S.kafkaSourceAutoClose KafkaConsumer
c Timeout
t ConduitT
  ()
  (Either
     KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
  m
  ()
-> ConduitM
     (Either
        KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
     (Either KafkaError (ConsumerRecord (Maybe ByteString) (Maybe t)))
     m
     ()
-> ConduitT
     ()
     (Either KafkaError (ConsumerRecord (Maybe ByteString) (Maybe t)))
     m
     ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| (Either
   KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))
 -> Either KafkaError (ConsumerRecord (Maybe ByteString) (Maybe t)))
-> ConduitM
     (Either
        KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
     (Either KafkaError (ConsumerRecord (Maybe ByteString) (Maybe t)))
     m
     ()
forall (m :: * -> *) a b. Monad m => (a -> b) -> ConduitT a b m ()
mapC ((ConsumerRecord (Maybe ByteString) (Maybe ByteString)
 -> ConsumerRecord (Maybe ByteString) (Maybe t))
-> Either
     KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))
-> Either KafkaError (ConsumerRecord (Maybe ByteString) (Maybe t))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Proxy sch
-> ConsumerRecord (Maybe ByteString) (Maybe ByteString)
-> ConsumerRecord (Maybe ByteString) (Maybe t)
forall (sch :: Schema * *) sty t.
(FromSchema sch sty t, FromAvro (WithSchema sch sty t),
 HasAvroSchema (WithSchema sch sty t)) =>
Proxy sch
-> ConsumerRecord (Maybe ByteString) (Maybe ByteString)
-> ConsumerRecord (Maybe ByteString) (Maybe t)
fromCR Proxy sch
proxy))