{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications #-}
module Kafka.Avro.Decode
(
DecodeError(..)
, decode
, decodeWithSchema
, extractSchemaId
) where
import Control.Arrow (left)
import Control.Monad.IO.Class (MonadIO)
import Control.Monad.Trans.Except
import Data.Avro (FromAvro, HasAvroSchema (..), Schema, decodeValueWithSchema, deconflict)
import Data.Bits (shiftL)
import Data.ByteString.Lazy (ByteString)
import qualified Data.ByteString.Lazy as BL hiding (zipWith)
import Data.Int
import Data.Tagged (untag)
import Kafka.Avro.SchemaRegistry
data DecodeError = DecodeRegistryError SchemaRegistryError
| BadPayloadNoSchemaId
| DecodeError Schema String
| IncompatibleSchema Schema String
deriving (Show, Eq)
decode :: forall a m. (MonadIO m, HasAvroSchema a, FromAvro a)
=> SchemaRegistry
-> ByteString
-> m (Either DecodeError a)
decode sr = decodeWithSchema sr (untag @a schema)
{-# INLINE decode #-}
decodeWithSchema :: forall a m. (MonadIO m, FromAvro a)
=> SchemaRegistry
-> Schema
-> ByteString
-> m (Either DecodeError a)
decodeWithSchema sr readerSchema bs =
case schemaData of
Left err -> return $ Left err
Right (sid, payload) -> runExceptT $ do
writerSchema <- withError DecodeRegistryError (loadSchema sr sid)
readSchema <- withPureError (IncompatibleSchema writerSchema) $ deconflict writerSchema readerSchema
withPureError (DecodeError writerSchema) (decodeValueWithSchema readSchema payload)
where
schemaData = maybe (Left BadPayloadNoSchemaId) Right (extractSchemaId bs)
withError f = withExceptT f . ExceptT
withPureError f = withError f . pure
extractSchemaId :: ByteString -> Maybe (SchemaId, ByteString)
extractSchemaId bs = do
(_ , b0) <- BL.uncons bs
(w1, b1) <- BL.uncons b0
(w2, b2) <- BL.uncons b1
(w3, b3) <- BL.uncons b2
(w4, b4) <- BL.uncons b3
let ints = fromIntegral <$> [w4, w3, w2, w1] :: [Int32]
let int = sum $ zipWith shiftL ints [0, 8, 16, 24]
return (SchemaId int, b4)