{-# LANGUAGE ScopedTypeVariables #-}
module Kafka.Avro.Decode
(
DecodeError(..)
, decodeWithSchema, extractSchemaId
) where
import Control.Arrow (left)
import Control.Monad.IO.Class (MonadIO)
import Data.Avro as A (FromAvro, HasAvroSchema (..), Result (..), fromAvro)
import qualified Data.Avro as A (decodeWithSchema)
import qualified Data.Avro.Decode as A (decodeAvro)
import qualified Data.Avro.Deconflict as A (deconflict)
import Data.Avro.Schema (Schema)
import Data.Bits (shiftL)
import Data.ByteString.Lazy (ByteString)
import qualified Data.ByteString.Lazy as BL hiding (zipWith)
import Data.Int
import Data.Tagged (Tagged, untag)
import Kafka.Avro.SchemaRegistry
data DecodeError = DecodeRegistryError SchemaRegistryError
| BadPayloadNoSchemaId
| DecodeError Schema String
deriving (Show, Eq)
decodeWithSchema :: (MonadIO m, FromAvro a)
=> SchemaRegistry
-> ByteString
-> m (Either DecodeError a)
decodeWithSchema sr bs =
case schemaData of
Left err -> return $ Left err
Right (sid, payload) -> do
res <- left DecodeRegistryError <$> loadSchema sr sid
return $ res >>= flip decodeWithDeconflict payload
where
schemaData = maybe (Left BadPayloadNoSchemaId) Right (extractSchemaId bs)
decodeWithDeconflict :: forall a. (FromAvro a) => Schema -> ByteString -> Either DecodeError a
decodeWithDeconflict writerSchema bs =
let readerSchema = untag (schema :: Tagged a Schema)
in left (DecodeError readerSchema) $ do
raw <- A.decodeAvro writerSchema bs
val <- A.deconflict writerSchema readerSchema raw
resultToEither readerSchema (fromAvro val)
extractSchemaId :: ByteString -> Maybe (SchemaId, ByteString)
extractSchemaId bs = do
(_ , b0) <- BL.uncons bs
(w1, b1) <- BL.uncons b0
(w2, b2) <- BL.uncons b1
(w3, b3) <- BL.uncons b2
(w4, b4) <- BL.uncons b3
let ints = fromIntegral <$> [w4, w3, w2, w1] :: [Int32]
let int = sum $ zipWith shiftL ints [0, 8, 16, 24]
return (SchemaId int, b4)
resultToEither :: Schema -> A.Result a -> Either String a
resultToEither sc res = case res of
Success a -> Right a
Error msg -> Left msg