module Kafka.Avro.Decode
(
DecodeError(..)
, decodeWithSchema, extractSchemaId
) where
import Control.Monad.IO.Class (MonadIO)
import Data.Avro as A (FromAvro, Result (..))
import qualified Data.Avro as A (decodeWithSchema)
import Data.Avro.Schema (Schema)
import Data.Bits (shiftL)
import Data.ByteString.Lazy (ByteString)
import qualified Data.ByteString.Lazy as BL hiding (zipWith)
import Data.Int
import Kafka.Avro.SchemaRegistry
data DecodeError = DecodeRegistryError SchemaRegistryError
| BadPayloadNoSchemaId
| DecodeError Schema String
deriving (Show, Eq)
decodeWithSchema :: (MonadIO m, FromAvro a)
=> SchemaRegistry
-> ByteString
-> m (Either DecodeError a)
decodeWithSchema sr bs =
case schemaData of
Left err -> return $ Left err
Right (sid, payload) -> do
res <- leftMap DecodeRegistryError <$> loadSchema sr sid
return $ res >>= decode payload
where
schemaData = maybe (Left BadPayloadNoSchemaId) Right (extractSchemaId bs)
decode p s = resultToEither s (A.decodeWithSchema s p)
extractSchemaId :: ByteString -> Maybe (SchemaId, ByteString)
extractSchemaId bs = do
(_ , b0) <- BL.uncons bs
(w1, b1) <- BL.uncons b0
(w2, b2) <- BL.uncons b1
(w3, b3) <- BL.uncons b2
(w4, b4) <- BL.uncons b3
let ints = fromIntegral <$> [w4, w3, w2, w1] :: [Int32]
let int = sum $ zipWith shiftL ints [0, 8, 16, 24]
return (SchemaId int, b4)
leftMap :: (e -> e') -> Either e r -> Either e' r
leftMap _ (Right r) = Right r
leftMap f (Left e) = Left (f e)
resultToEither :: Schema -> A.Result a -> Either DecodeError a
resultToEither sc res = case res of
Success a -> Right a
Error msg -> Left $ DecodeError sc msg