{-# LANGUAGE ConstraintKinds #-} {-# LANGUAGE FlexibleInstances #-} {-# LANGUAGE MultiWayIf #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE RecordWildCards #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE TupleSections #-} module Data.Avro.Decode.Lazy ( decodeAvro , decodeContainer , decodeContainer' , decodeContainerWithSchema , decodeContainerWithSchema' -- * Lower level interface , getContainerValues , getContainerValuesWith , getAvroOf , GetAvro(..) ) where import qualified Codec.Compression.Zlib as Z import Control.Monad (foldM, replicateM, when) import qualified Data.Aeson as A import qualified Data.Array as Array import Data.Binary.Get (Get, runGetOrFail) import qualified Data.Binary.Get as G import Data.Binary.IEEE754 as IEEE import Data.Bits import Data.ByteString (ByteString) import qualified Data.ByteString.Lazy as BL import qualified Data.ByteString.Lazy.Char8 as BC import Data.Either (isRight) import qualified Data.HashMap.Strict as HashMap import Data.Int import Data.List (foldl') import qualified Data.List.NonEmpty as NE import qualified Data.Map as Map import Data.Maybe import Data.Monoid ((<>)) import qualified Data.Set as Set import Data.Tagged (Tagged, untag) import Data.Text (Text) import qualified Data.Text as Text import qualified Data.Text.Encoding as Text import qualified Data.Vector as V import Prelude as P import qualified Data.Avro.Decode.Lazy.LazyValue as T import Data.Avro.DecodeRaw import Data.Avro.HasAvroSchema (schema) import Data.Avro.Schema as S import Data.Avro.Zag import Data.Avro.Decode.Get import Data.Avro.Decode.Lazy.Convert (toStrictValue) import Data.Avro.Decode.Lazy.Deconflict as C import Data.Avro.FromAvro -- | Decodes the container as a lazy list of values of the requested type. -- -- The schema for the requested type will be de-conflicted with the schema -- embedded with the container. -- -- Errors are reported as a part of the list and the list will stop at first -- error. This means that the consumer will get all the "good" content from -- the container until the error is detected, then this error and then the list -- is finished. decodeContainer :: forall a. FromAvro a => BL.ByteString -> [Either String a] decodeContainer bs = let vals = either (\err -> [Left err]) concat (decodeContainer' bs) in takeWhileInclusive isRight vals -- | Decodes the container as a lazy list of values of the requested type. -- -- The schema for the requested type will be de-conflicted with the schema -- embedded with the container. -- -- The content of the container is returned as a list of "blocks" of values -- inside this container, so the notion of blocks in the container is preserved. -- Since decoding is lazy it should be safe to concat these values into one lazy list. -- -- The "outer" error represents the error in opening the container itself -- (including problems like reading schemas embedded into the container.) -- -- The "inner" errors represent problems in decoding individual values. -- -- Note that this function will not stop decoding at the first occurance of the "inner" -- error, and will continue attempting decoding values, so it is possible to -- get 'Right' after 'Left'. It is up to the user to decide whether it is correct or not to -- continue after errors (most likely it will not be correct). -- -- 'decodeContainer' function makes a choice to stop after the first error. decodeContainer' :: forall a. FromAvro a => BL.ByteString -> Either String [[Either String a]] decodeContainer' = decodeContainerWithSchema' (untag (schema :: Tagged a Schema)) -- | Same as 'decodeContainer' but uses provided schema as a reader schema for the container -- instead of the schema obtained from the type of 'a'. -- -- It is up to the user to make sure that the provided schema is compatible with 'a' -- and with the container's writer schema. decodeContainerWithSchema :: FromAvro a => Schema -> BL.ByteString -> [Either String a] decodeContainerWithSchema s bs = either (\err -> [Left err]) concat (decodeContainerWithSchema' s bs) -- | Same as 'decodeContainer'' but uses provided schema as a reader schema for the container -- instead of the schema obtained from the type of 'a'. -- -- It is up to the user to make sure that the provided schema is compatible with 'a' -- and with the container's writer schema. decodeContainerWithSchema' :: FromAvro a => Schema -> BL.ByteString -> Either String [[Either String a]] decodeContainerWithSchema' readerSchema bs = do (writerSchema, vals) <- getContainerValues bs pure $ (fmap . fmap) (convertValue writerSchema) vals where convertValue w v = toStrictValue (C.deconflict w readerSchema v) >>= (resultToEither . fromAvro) -- |Decode bytes into a 'Value' as described by Schema. decodeAvro :: Schema -> BL.ByteString -> T.LazyValue Type decodeAvro s = snd . getAvroOf s {-# INLINABLE decodeAvro #-} -- | Decodes the container into a list of blocks of raw Avro values. -- -- The content of the container is returned as a list of "blocks" of values -- inside this container, so the notion of blocks in the container is preserved. -- Since decoding is lazy it should be safe to concat these values into one lazy list. -- -- Each 'LazyValue' can be an `Error' and this function doesn't make any attempts -- of dealing with them leaving it up to the user. -- -- The "outer" error represents the error in opening the container itself -- (including problems like reading schemas embedded into the container.) getContainerValues :: BC.ByteString -> Either String (Schema, [[T.LazyValue Type]]) getContainerValues = getContainerValuesWith getAvroOf {-# INLINABLE getContainerValues #-} getContainerValuesWith :: (Schema -> BL.ByteString -> (BL.ByteString, T.LazyValue Type)) -> BL.ByteString -> Either String (Schema, [[T.LazyValue Type]]) getContainerValuesWith schemaToGet bs = case runGetOrFail getAvro bs of Left (bs', _, err) -> Left err Right (bs', _, ContainerHeader {..}) -> Right (containedSchema, snd $ getBlocks (schemaToGet containedSchema) syncBytes bs' decompress) where getRawBlock :: (BL.ByteString -> Get BL.ByteString) -> Get (Int, BC.ByteString) getRawBlock decompress = do nrObj <- getLong >>= sFromIntegral nrBytes <- getLong bytes <- G.getLazyByteString nrBytes >>= decompress pure (nrObj, bytes) checkMarker :: BL.ByteString -> BL.ByteString -> Either String BL.ByteString checkMarker sync bs = case BL.splitAt nrSyncBytes bs of (marker, _) | marker /= sync -> Left "Invalid marker, does not match sync bytes." (_, rest) -> Right rest getBlocks :: (BL.ByteString -> (BL.ByteString, T.LazyValue Type)) -- ^ getValue -> BL.ByteString -- ^ sync bytes -> BL.ByteString -- ^ byteString to parse -> (BL.ByteString -> Get BL.ByteString) -- ^ how to decompress -> (BL.ByteString, [[T.LazyValue Type]]) getBlocks getValue sync bs decompress = case runGetOrFail (getRawBlock decompress) bs of Left (bs', _, err) -> (bs', [[T.Error err]]) Right (bs', _, (nrObj, bytes)) -> let (_, vs) = consumeN (fromIntegral nrObj) getValue bytes in case checkMarker sync bs' of Left err -> (bs', [[T.Error err]]) Right bs'' | BL.null bs'' -> (bs'', [vs]) Right bs'' -> let (rest, vs') = getBlocks getValue sync bs'' decompress in (rest, vs : vs') decodeGet :: GetAvro a => (a -> T.LazyValue Type) -> BL.ByteString -> (BL.ByteString, T.LazyValue Type) decodeGet f bs = let res = runGetOrFail (f <$> getAvro) bs in either (\(rest,_,s) -> (rest, T.Error s)) (\(rest,_,a) -> (rest, a)) res {-# INLINE decodeGet #-} consumeN :: Int64 -> (a -> (a, b)) -> a -> (a, [b]) consumeN n f a = if n == 0 then (a, []) else let (a', b) = f a (r, bs) = consumeN (n-1) f a' in (r, b:bs) {-# INLINE consumeN #-} getAvroOf :: Schema -> BL.ByteString -> (BL.ByteString, T.LazyValue Type) getAvroOf ty0 bs = go ty0 bs where env = S.buildTypeEnvironment envFail ty0 envFail t = fail $ "Named type not in schema: " <> show t go :: Type -> BL.ByteString -> (BL.ByteString, T.LazyValue Type) go ty bs = case ty of Null -> (bs, T.Null) Boolean -> decodeGet T.Boolean bs Int -> decodeGet T.Int bs Long -> decodeGet T.Long bs Float -> decodeGet T.Float bs Double -> decodeGet T.Double bs Bytes -> decodeGet T.Bytes bs String -> decodeGet T.String bs Array t -> T.Array . V.fromList . mconcat <$> getElements bs (go t) Map t -> T.Map . HashMap.fromList . mconcat <$> getKVPairs bs (go t) NamedType tn -> case runGetOrFail (env tn) bs of Left (bs', _, err) -> (bs', T.Error err) Right (bs', _, v) -> go v bs' Record {..} -> do let getField bs' Field {..} = (fldName,) <$> go fldType bs' let flds = foldl' (\(bs', as) fld -> (:as) <$> getField bs' fld ) (bs, []) fields T.Record ty . HashMap.fromList <$> flds Enum {..} -> case runGetOrFail getLong bs of Left (bs', _, err) -> (bs', T.Error err) Right (bs', _, i) -> case symbolLookup i of Nothing -> (bs', T.Error ("Unknown value {" <> show i <> "} for enum " <> Text.unpack (typeName ty) )) Just sym -> (bs', T.Enum ty (fromIntegral i) sym) Union ts unionLookup -> case runGetOrFail getLong bs of Left (bs', _, err) -> (bs', T.Error err) Right (bs', _, i) -> case unionLookup i of Nothing -> (bs', T.Error $ "Decoded Avro tag is outside the expected range for a Union. Tag: " <> show i <> " union of: " <> show (P.map typeName $ NE.toList ts)) Just t -> T.Union ts t <$> go t bs' Fixed {..} -> case runGetOrFail (G.getByteString (fromIntegral size)) bs of Left (bs', _, err) -> (bs', T.Error err) Right (bs', _, v) -> (bs', T.Fixed ty v) {-# INLINABLE getAvroOf #-} getKVPair getElement bs = case runGetOrFail getString bs of Left (bs'', _, err) -> (bs'', ("", T.Error err)) Right (bs'', _, v) -> (v,) <$> getElement bs'' {-# INLINE getKVPair #-} getKVPairs :: BL.ByteString -> (BL.ByteString -> (BL.ByteString, T.LazyValue Type)) -> (BL.ByteString, [[(Text, T.LazyValue Type)]]) getKVPairs bs getElement = case runGetOrFail (abs <$> getLong) bs of Left (bs', _, err) -> (bs', [[("", T.Error err)]]) Right (bs', _, l) | l == 0 -> (bs', []) Right (bs', _, l) -> let (bs'', vs) = consumeN l (getKVPair getElement) bs' (rest, vs') = getKVPairs bs'' getElement in (rest, vs : vs') {-# INLINE getKVPairs #-} getElements :: BL.ByteString -> (BL.ByteString -> (BL.ByteString, T.LazyValue Type)) -> (BL.ByteString, [[T.LazyValue Type]]) getElements bs getElement = case runGetOrFail (abs <$> getLong) bs of Left (bs', _, err) -> (bs', [[T.Error err]]) Right (bs', _, l) | l == 0 -> (bs', []) Right (bs', _, l) -> let (bs'', vs) = consumeN l getElement bs' (rest, vs') = getElements bs'' getElement in (rest, vs : vs') {-# INLINE getElements #-} -- takeWhileInclusive :: (a -> Bool) -> [a] -> [a] takeWhileInclusive _ [] = [] takeWhileInclusive p (x:xs) = x : if p x then takeWhileInclusive p xs else [] {-# INLINE takeWhileInclusive #-}