{-# LANGUAGE ScopedTypeVariables #-} module Kafka.Avro.Decode ( DecodeError(..) , decodeWithSchema, extractSchemaId ) where import Control.Arrow (left) import Control.Monad.IO.Class (MonadIO) import Data.Avro as A (FromAvro, HasAvroSchema (..), Result (..), fromAvro) import qualified Data.Avro as A (decodeWithSchema) import qualified Data.Avro.Decode as A (decodeAvro) import qualified Data.Avro.Deconflict as A (deconflict) import Data.Avro.Schema (Schema) import Data.Bits (shiftL) import Data.ByteString.Lazy (ByteString) import qualified Data.ByteString.Lazy as BL hiding (zipWith) import Data.Int import Data.Tagged (Tagged, untag) import Kafka.Avro.SchemaRegistry data DecodeError = DecodeRegistryError SchemaRegistryError | BadPayloadNoSchemaId | DecodeError Schema String deriving (Show, Eq) -- | Decodes a provided Avro-encoded value. -- The serialised value is expected to be in a "confluent-compatible" format -- where the "real" value bytestring is prepended with extra 5 bytes: -- a "magic" byte and 4 bytes representing the schema ID. decodeWithSchema :: (MonadIO m, FromAvro a) => SchemaRegistry -> ByteString -> m (Either DecodeError a) decodeWithSchema sr bs = case schemaData of Left err -> return $ Left err Right (sid, payload) -> do res <- left DecodeRegistryError <$> loadSchema sr sid return $ res >>= flip decodeWithDeconflict payload where schemaData = maybe (Left BadPayloadNoSchemaId) Right (extractSchemaId bs) decodeWithDeconflict :: forall a. (FromAvro a) => Schema -> ByteString -> Either DecodeError a decodeWithDeconflict writerSchema bs = let readerSchema = untag (schema :: Tagged a Schema) in left (DecodeError readerSchema) $ do raw <- A.decodeAvro writerSchema bs val <- A.deconflict writerSchema readerSchema raw resultToEither readerSchema (fromAvro val) extractSchemaId :: ByteString -> Maybe (SchemaId, ByteString) extractSchemaId bs = do (_ , b0) <- BL.uncons bs (w1, b1) <- BL.uncons b0 (w2, b2) <- BL.uncons b1 (w3, b3) <- BL.uncons b2 (w4, b4) <- BL.uncons b3 let ints = fromIntegral <$> [w4, w3, w2, w1] :: [Int32] let int = sum $ zipWith shiftL ints [0, 8, 16, 24] return (SchemaId int, b4) resultToEither :: Schema -> A.Result a -> Either String a resultToEither sc res = case res of Success a -> Right a Error msg -> Left msg