{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# OPTIONS_GHC -Wno-unrecognised-pragmas #-}
{-# HLINT ignore "Use newtype instead of data" #-}
module Kafka.Avro.Encode
( EncodeError(..)
, encodeKey
, encodeValue
, encode
, encodeKeyWithSchema
, encodeValueWithSchema
, encodeWithSchema
, keySubject, valueSubject
) where
import Control.Monad.IO.Class (MonadIO)
import Data.Avro (HasAvroSchema, Schema, ToAvro, schemaOf)
import qualified Data.Avro as A
import qualified Data.Binary as B
import Data.Bits (shiftL)
import Data.ByteString.Lazy (ByteString)
import qualified Data.ByteString.Lazy as BL hiding (zipWith)
import Data.Monoid
import Kafka.Avro.SchemaRegistry
data EncodeError = EncodeRegistryError SchemaRegistryError
deriving (Int -> EncodeError -> ShowS
[EncodeError] -> ShowS
EncodeError -> String
(Int -> EncodeError -> ShowS)
-> (EncodeError -> String)
-> ([EncodeError] -> ShowS)
-> Show EncodeError
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [EncodeError] -> ShowS
$cshowList :: [EncodeError] -> ShowS
show :: EncodeError -> String
$cshow :: EncodeError -> String
showsPrec :: Int -> EncodeError -> ShowS
$cshowsPrec :: Int -> EncodeError -> ShowS
Show, EncodeError -> EncodeError -> Bool
(EncodeError -> EncodeError -> Bool)
-> (EncodeError -> EncodeError -> Bool) -> Eq EncodeError
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: EncodeError -> EncodeError -> Bool
$c/= :: EncodeError -> EncodeError -> Bool
== :: EncodeError -> EncodeError -> Bool
$c== :: EncodeError -> EncodeError -> Bool
Eq)
keySubject :: Subject -> Subject
keySubject :: Subject -> Subject
keySubject (Subject Text
subj) = Text -> Subject
Subject (Text
subj Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"-key")
{-# INLINE keySubject #-}
valueSubject :: Subject -> Subject
valueSubject :: Subject -> Subject
valueSubject (Subject Text
subj) = Text -> Subject
Subject (Text
subj Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"-value")
{-# INLINE valueSubject #-}
encodeKey :: (MonadIO m, HasAvroSchema a, ToAvro a)
=> SchemaRegistry
-> Subject
-> a
-> m (Either EncodeError ByteString)
encodeKey :: SchemaRegistry -> Subject -> a -> m (Either EncodeError ByteString)
encodeKey SchemaRegistry
sr Subject
subj = SchemaRegistry -> Subject -> a -> m (Either EncodeError ByteString)
forall (m :: * -> *) a.
(MonadIO m, HasAvroSchema a, ToAvro a) =>
SchemaRegistry -> Subject -> a -> m (Either EncodeError ByteString)
encode SchemaRegistry
sr (Subject -> Subject
keySubject Subject
subj)
{-# INLINE encodeKey #-}
encodeKeyWithSchema :: (MonadIO m, ToAvro a)
=> SchemaRegistry
-> Subject
-> Schema
-> a
-> m (Either EncodeError ByteString)
encodeKeyWithSchema :: SchemaRegistry
-> Subject -> Schema -> a -> m (Either EncodeError ByteString)
encodeKeyWithSchema SchemaRegistry
sr Subject
subj = SchemaRegistry
-> Subject -> Schema -> a -> m (Either EncodeError ByteString)
forall a (m :: * -> *).
(MonadIO m, ToAvro a) =>
SchemaRegistry
-> Subject -> Schema -> a -> m (Either EncodeError ByteString)
encodeWithSchema SchemaRegistry
sr (Subject -> Subject
keySubject Subject
subj)
{-# INLINE encodeKeyWithSchema #-}
encodeValue :: (MonadIO m, HasAvroSchema a, ToAvro a)
=> SchemaRegistry
-> Subject
-> a
-> m (Either EncodeError ByteString)
encodeValue :: SchemaRegistry -> Subject -> a -> m (Either EncodeError ByteString)
encodeValue SchemaRegistry
sr Subject
subj = SchemaRegistry -> Subject -> a -> m (Either EncodeError ByteString)
forall (m :: * -> *) a.
(MonadIO m, HasAvroSchema a, ToAvro a) =>
SchemaRegistry -> Subject -> a -> m (Either EncodeError ByteString)
encode SchemaRegistry
sr (Subject -> Subject
valueSubject Subject
subj)
{-# INLINE encodeValue #-}
encodeValueWithSchema :: (MonadIO m, ToAvro a)
=> SchemaRegistry
-> Subject
-> Schema
-> a
-> m (Either EncodeError ByteString)
encodeValueWithSchema :: SchemaRegistry
-> Subject -> Schema -> a -> m (Either EncodeError ByteString)
encodeValueWithSchema SchemaRegistry
sr Subject
subj = SchemaRegistry
-> Subject -> Schema -> a -> m (Either EncodeError ByteString)
forall a (m :: * -> *).
(MonadIO m, ToAvro a) =>
SchemaRegistry
-> Subject -> Schema -> a -> m (Either EncodeError ByteString)
encodeWithSchema SchemaRegistry
sr (Subject -> Subject
valueSubject Subject
subj)
{-# INLINE encodeValueWithSchema #-}
encode :: (MonadIO m, HasAvroSchema a, ToAvro a)
=> SchemaRegistry
-> Subject
-> a
-> m (Either EncodeError ByteString)
encode :: SchemaRegistry -> Subject -> a -> m (Either EncodeError ByteString)
encode SchemaRegistry
sr Subject
subj a
a = SchemaRegistry
-> Subject -> Schema -> a -> m (Either EncodeError ByteString)
forall a (m :: * -> *).
(MonadIO m, ToAvro a) =>
SchemaRegistry
-> Subject -> Schema -> a -> m (Either EncodeError ByteString)
encodeWithSchema SchemaRegistry
sr Subject
subj (a -> Schema
forall a. HasAvroSchema a => a -> Schema
schemaOf a
a) a
a
{-# INLINE encode #-}
encodeWithSchema :: forall a m. (MonadIO m, ToAvro a)
=> SchemaRegistry
-> Subject
-> Schema
-> a
-> m (Either EncodeError ByteString)
encodeWithSchema :: SchemaRegistry
-> Subject -> Schema -> a -> m (Either EncodeError ByteString)
encodeWithSchema SchemaRegistry
sr Subject
subj Schema
sch a
a = do
Either SchemaRegistryError SchemaId
mbSid <- SchemaRegistry
-> Subject -> Schema -> m (Either SchemaRegistryError SchemaId)
forall (m :: * -> *).
MonadIO m =>
SchemaRegistry
-> Subject -> Schema -> m (Either SchemaRegistryError SchemaId)
sendSchema SchemaRegistry
sr Subject
subj Schema
sch
case Either SchemaRegistryError SchemaId
mbSid of
Left SchemaRegistryError
err -> Either EncodeError ByteString -> m (Either EncodeError ByteString)
forall (m :: * -> *) a. Monad m => a -> m a
return (Either EncodeError ByteString
-> m (Either EncodeError ByteString))
-> (SchemaRegistryError -> Either EncodeError ByteString)
-> SchemaRegistryError
-> m (Either EncodeError ByteString)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. EncodeError -> Either EncodeError ByteString
forall a b. a -> Either a b
Left (EncodeError -> Either EncodeError ByteString)
-> (SchemaRegistryError -> EncodeError)
-> SchemaRegistryError
-> Either EncodeError ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SchemaRegistryError -> EncodeError
EncodeRegistryError (SchemaRegistryError -> m (Either EncodeError ByteString))
-> SchemaRegistryError -> m (Either EncodeError ByteString)
forall a b. (a -> b) -> a -> b
$ SchemaRegistryError
err
Right SchemaId
sid -> Either EncodeError ByteString -> m (Either EncodeError ByteString)
forall (m :: * -> *) a. Monad m => a -> m a
return (Either EncodeError ByteString
-> m (Either EncodeError ByteString))
-> (ByteString -> Either EncodeError ByteString)
-> ByteString
-> m (Either EncodeError ByteString)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> Either EncodeError ByteString
forall a b. b -> Either a b
Right (ByteString -> m (Either EncodeError ByteString))
-> ByteString -> m (Either EncodeError ByteString)
forall a b. (a -> b) -> a -> b
$ SchemaId -> ByteString -> ByteString
appendSchemaId SchemaId
sid (Schema -> a -> ByteString
forall a. ToAvro a => Schema -> a -> ByteString
A.encodeValueWithSchema Schema
sch a
a)
appendSchemaId :: SchemaId -> ByteString -> ByteString
appendSchemaId :: SchemaId -> ByteString -> ByteString
appendSchemaId (SchemaId Int32
sid) ByteString
bs =
Word8 -> ByteString -> ByteString
BL.cons (Int -> Word8
forall a. Enum a => Int -> a
toEnum Int
0) (Int32 -> ByteString
forall a. Binary a => a -> ByteString
B.encode Int32
sid) ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
bs