{-# LANGUAGE OverloadedStrings   #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# OPTIONS_GHC -Wno-unrecognised-pragmas #-}
{-# HLINT ignore "Use newtype instead of data" #-}
module Kafka.Avro.Encode
( EncodeError(..)
, encodeKey
, encodeValue
, encode

, encodeKeyWithSchema
, encodeValueWithSchema
, encodeWithSchema

, keySubject, valueSubject
) where

import           Control.Monad.IO.Class    (MonadIO)
import           Data.Avro                 (HasAvroSchema, Schema, ToAvro, schemaOf)
import qualified Data.Avro                 as A
import qualified Data.Binary               as B
import           Data.Bits                 (shiftL)
import           Data.ByteString.Lazy      (ByteString)
import qualified Data.ByteString.Lazy      as BL hiding (zipWith)
import           Data.Monoid
import           Kafka.Avro.SchemaRegistry

data EncodeError = EncodeRegistryError SchemaRegistryError
  deriving (Int -> EncodeError -> ShowS
[EncodeError] -> ShowS
EncodeError -> String
(Int -> EncodeError -> ShowS)
-> (EncodeError -> String)
-> ([EncodeError] -> ShowS)
-> Show EncodeError
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [EncodeError] -> ShowS
$cshowList :: [EncodeError] -> ShowS
show :: EncodeError -> String
$cshow :: EncodeError -> String
showsPrec :: Int -> EncodeError -> ShowS
$cshowsPrec :: Int -> EncodeError -> ShowS
Show, EncodeError -> EncodeError -> Bool
(EncodeError -> EncodeError -> Bool)
-> (EncodeError -> EncodeError -> Bool) -> Eq EncodeError
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: EncodeError -> EncodeError -> Bool
$c/= :: EncodeError -> EncodeError -> Bool
== :: EncodeError -> EncodeError -> Bool
$c== :: EncodeError -> EncodeError -> Bool
Eq)

keySubject :: Subject -> Subject
keySubject :: Subject -> Subject
keySubject (Subject Text
subj) = Text -> Subject
Subject (Text
subj Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"-key")
{-# INLINE keySubject #-}

valueSubject :: Subject -> Subject
valueSubject :: Subject -> Subject
valueSubject (Subject Text
subj) = Text -> Subject
Subject (Text
subj Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"-value")
{-# INLINE valueSubject #-}

-- | Encodes a provided value as a message key.
--
-- Registers the schema in SchemaRegistry with "<subject>-key" subject.
encodeKey :: (MonadIO m, HasAvroSchema a, ToAvro a)
  => SchemaRegistry
  -> Subject
  -> a
  -> m (Either EncodeError ByteString)
encodeKey :: SchemaRegistry -> Subject -> a -> m (Either EncodeError ByteString)
encodeKey SchemaRegistry
sr Subject
subj = SchemaRegistry -> Subject -> a -> m (Either EncodeError ByteString)
forall (m :: * -> *) a.
(MonadIO m, HasAvroSchema a, ToAvro a) =>
SchemaRegistry -> Subject -> a -> m (Either EncodeError ByteString)
encode SchemaRegistry
sr (Subject -> Subject
keySubject Subject
subj)
{-# INLINE encodeKey #-}

-- | Encodes a provided value as a message key.
--
-- Registers the schema in SchemaRegistry with "<subject>-key" subject.
encodeKeyWithSchema :: (MonadIO m, ToAvro a)
  => SchemaRegistry
  -> Subject
  -> Schema
  -> a
  -> m (Either EncodeError ByteString)
encodeKeyWithSchema :: SchemaRegistry
-> Subject -> Schema -> a -> m (Either EncodeError ByteString)
encodeKeyWithSchema SchemaRegistry
sr Subject
subj = SchemaRegistry
-> Subject -> Schema -> a -> m (Either EncodeError ByteString)
forall a (m :: * -> *).
(MonadIO m, ToAvro a) =>
SchemaRegistry
-> Subject -> Schema -> a -> m (Either EncodeError ByteString)
encodeWithSchema SchemaRegistry
sr (Subject -> Subject
keySubject Subject
subj)
{-# INLINE encodeKeyWithSchema #-}

-- | Encodes a provided value as a message value.
--
-- Registers the schema in SchemaRegistry with "<subject>-value" subject.
encodeValue :: (MonadIO m, HasAvroSchema a, ToAvro a)
  => SchemaRegistry
  -> Subject
  -> a
  -> m (Either EncodeError ByteString)
encodeValue :: SchemaRegistry -> Subject -> a -> m (Either EncodeError ByteString)
encodeValue SchemaRegistry
sr Subject
subj = SchemaRegistry -> Subject -> a -> m (Either EncodeError ByteString)
forall (m :: * -> *) a.
(MonadIO m, HasAvroSchema a, ToAvro a) =>
SchemaRegistry -> Subject -> a -> m (Either EncodeError ByteString)
encode SchemaRegistry
sr (Subject -> Subject
valueSubject Subject
subj)
{-# INLINE encodeValue #-}

-- | Encodes a provided value as a message value.
--
-- Registers the schema in SchemaRegistry with "<subject>-value" subject.
encodeValueWithSchema :: (MonadIO m, ToAvro a)
  => SchemaRegistry
  -> Subject
  -> Schema
  -> a
  -> m (Either EncodeError ByteString)
encodeValueWithSchema :: SchemaRegistry
-> Subject -> Schema -> a -> m (Either EncodeError ByteString)
encodeValueWithSchema SchemaRegistry
sr Subject
subj = SchemaRegistry
-> Subject -> Schema -> a -> m (Either EncodeError ByteString)
forall a (m :: * -> *).
(MonadIO m, ToAvro a) =>
SchemaRegistry
-> Subject -> Schema -> a -> m (Either EncodeError ByteString)
encodeWithSchema SchemaRegistry
sr (Subject -> Subject
valueSubject Subject
subj)
{-# INLINE encodeValueWithSchema #-}

encode :: (MonadIO m, HasAvroSchema a, ToAvro a)
  => SchemaRegistry
  -> Subject
  -> a
  -> m (Either EncodeError ByteString)
encode :: SchemaRegistry -> Subject -> a -> m (Either EncodeError ByteString)
encode SchemaRegistry
sr Subject
subj a
a = SchemaRegistry
-> Subject -> Schema -> a -> m (Either EncodeError ByteString)
forall a (m :: * -> *).
(MonadIO m, ToAvro a) =>
SchemaRegistry
-> Subject -> Schema -> a -> m (Either EncodeError ByteString)
encodeWithSchema SchemaRegistry
sr Subject
subj (a -> Schema
forall a. HasAvroSchema a => a -> Schema
schemaOf a
a) a
a
{-# INLINE encode #-}

-- | Encodes a provided value into Avro
-- and registers value's schema in SchemaRegistry.
encodeWithSchema :: forall a m. (MonadIO m, ToAvro a)
  => SchemaRegistry
  -> Subject
  -> Schema
  -> a
  -> m (Either EncodeError ByteString)
encodeWithSchema :: SchemaRegistry
-> Subject -> Schema -> a -> m (Either EncodeError ByteString)
encodeWithSchema SchemaRegistry
sr Subject
subj Schema
sch a
a = do
  Either SchemaRegistryError SchemaId
mbSid <- SchemaRegistry
-> Subject -> Schema -> m (Either SchemaRegistryError SchemaId)
forall (m :: * -> *).
MonadIO m =>
SchemaRegistry
-> Subject -> Schema -> m (Either SchemaRegistryError SchemaId)
sendSchema SchemaRegistry
sr Subject
subj Schema
sch
  case Either SchemaRegistryError SchemaId
mbSid of
    Left SchemaRegistryError
err  -> Either EncodeError ByteString -> m (Either EncodeError ByteString)
forall (m :: * -> *) a. Monad m => a -> m a
return (Either EncodeError ByteString
 -> m (Either EncodeError ByteString))
-> (SchemaRegistryError -> Either EncodeError ByteString)
-> SchemaRegistryError
-> m (Either EncodeError ByteString)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. EncodeError -> Either EncodeError ByteString
forall a b. a -> Either a b
Left (EncodeError -> Either EncodeError ByteString)
-> (SchemaRegistryError -> EncodeError)
-> SchemaRegistryError
-> Either EncodeError ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SchemaRegistryError -> EncodeError
EncodeRegistryError (SchemaRegistryError -> m (Either EncodeError ByteString))
-> SchemaRegistryError -> m (Either EncodeError ByteString)
forall a b. (a -> b) -> a -> b
$ SchemaRegistryError
err
    Right SchemaId
sid -> Either EncodeError ByteString -> m (Either EncodeError ByteString)
forall (m :: * -> *) a. Monad m => a -> m a
return (Either EncodeError ByteString
 -> m (Either EncodeError ByteString))
-> (ByteString -> Either EncodeError ByteString)
-> ByteString
-> m (Either EncodeError ByteString)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> Either EncodeError ByteString
forall a b. b -> Either a b
Right (ByteString -> m (Either EncodeError ByteString))
-> ByteString -> m (Either EncodeError ByteString)
forall a b. (a -> b) -> a -> b
$ SchemaId -> ByteString -> ByteString
appendSchemaId SchemaId
sid (Schema -> a -> ByteString
forall a. ToAvro a => Schema -> a -> ByteString
A.encodeValueWithSchema Schema
sch a
a)


appendSchemaId :: SchemaId -> ByteString -> ByteString
appendSchemaId :: SchemaId -> ByteString -> ByteString
appendSchemaId (SchemaId Int32
sid) ByteString
bs =
  -- add a "magic byte" followed by schema id
  Word8 -> ByteString -> ByteString
BL.cons (Int -> Word8
forall a. Enum a => Int -> a
toEnum Int
0) (Int32 -> ByteString
forall a. Binary a => a -> ByteString
B.encode Int32
sid) ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
bs