{-# LANGUAGE ConstraintKinds #-} {-# LANGUAGE FlexibleInstances #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE TypeApplications #-} -- | Avro encoding and decoding routines. -- -- This library provides a high level interface for encoding and decoding -- Haskell values in Apache's Avro serialization format. module Data.Avro ( -- * Schema Schema(..) , Schema.Field(..), Schema.Order(..) , Schema.TypeName(..) , Schema.Decimal(..) , Schema.LogicalTypeBytes(..), Schema.LogicalTypeFixed(..) , Schema.LogicalTypeInt(..), Schema.LogicalTypeLong(..) , Schema.LogicalTypeString(..) -- * Deconflicting schemas , ReadSchema , deconflict , readSchemaFromSchema -- * Individual values , encodeValue , encodeValueWithSchema , decodeValue , decodeValueWithSchema -- * Working with containers -- ** Decoding containers , decodeContainer , decodeContainerWithEmbeddedSchema , decodeContainerWithReaderSchema , encodeContainer , encodeContainerWithSchema , encodeContainerWithSync , Container.newSyncBytes -- ** Extracting containers' data , extractContainerValuesBytes , decodeContainerValuesBytes -- * Classes , ToAvro , FromAvro -- * Compression , Codec, nullCodec, deflateCodec , HasAvroSchema(..) , schemaOf ) where import Control.Monad ((>=>)) import Data.Avro.Codec (Codec, deflateCodec, nullCodec) import Data.Avro.Encoding.FromAvro import Data.Avro.Encoding.ToAvro import Data.Avro.HasAvroSchema import qualified Data.Avro.Internal.Container as Container import Data.Avro.Schema.Deconflict (deconflict) import Data.Avro.Schema.ReadSchema (ReadSchema, fromSchema) import Data.Avro.Schema.Schema (Schema) import qualified Data.Avro.Schema.Schema as Schema import Data.Binary.Get (runGetOrFail) import Data.ByteString.Builder (toLazyByteString) import qualified Data.ByteString.Lazy as BL import Data.Tagged (untag) {- HLINT ignore "Use section" -} -- | Converts 'Schema' into 'ReadSchema'. This function may be useful when it is known -- that the writer and the reader schemas are the same. readSchemaFromSchema :: Schema -> ReadSchema readSchemaFromSchema = fromSchema {-# INLINE readSchemaFromSchema #-} -- | Serialises an individual value into Avro with the schema provided. encodeValueWithSchema :: ToAvro a => Schema -> a -> BL.ByteString encodeValueWithSchema s = toLazyByteString . toAvro s {-# INLINE encodeValueWithSchema #-} -- | Serialises an individual value into Avro using the schema -- from its coresponding 'HasAvroSchema' instance. encodeValue :: (HasAvroSchema a, ToAvro a) => a -> BL.ByteString encodeValue a = encodeValueWithSchema (schemaOf a) a {-# INLINE encodeValue #-} -- | Deserialises an individual value from Avro. decodeValueWithSchema :: FromAvro a => ReadSchema -> BL.ByteString -> Either String a decodeValueWithSchema schema payload = case runGetOrFail (getValue schema) payload of Right (bs, _, v) -> fromAvro v Left (_, _, e) -> Left e -- | Deserialises an individual value from Avro using the schema from its coresponding 'HasAvroSchema'. -- -- __NOTE__: __This function is only to be used when reader and writes schemas are known to be the same.__ -- Because only one schema is known at this point, and it is the reader schema, -- /no decondlicting/ can be performed. decodeValue :: forall a. (HasAvroSchema a, FromAvro a) => BL.ByteString -> Either String a decodeValue = decodeValueWithSchema (fromSchema (untag @a schema)) {-# INLINE decodeValue #-} -- | Decodes the container using a schema from 'HasAvroSchema' as a reader schema. -- -- Errors are reported as a part of the list and the list will stop at first -- error. This means that the consumer will get all the "good" content from -- the container until the error is detected, then this error and then the list -- is finished. decodeContainer :: forall a. (HasAvroSchema a, FromAvro a) => BL.ByteString -> [Either String a] decodeContainer = decodeContainerWithReaderSchema (untag @a schema) {-# INLINE decodeContainer #-} -- | Decodes the container as a list of values of the requested type. -- -- Errors are reported as a part of the list and the list will stop at first -- error. This means that the consumer will get all the "good" content from -- the container until the error is detected, then this error and then the list -- is finished. decodeContainerWithEmbeddedSchema :: forall a. FromAvro a => BL.ByteString -> [Either String a] decodeContainerWithEmbeddedSchema payload = case Container.extractContainerValues (pure . fromSchema) (getValue >=> (either fail pure . fromAvro)) payload of Left err -> [Left err] Right (_, values) -> values -- | Decodes the container as a list of values of the requested type. -- -- The provided reader schema will be de-conflicted with the schema -- embedded with the container. -- -- Errors are reported as a part of the list and the list will stop at first -- error. This means that the consumer will get all the "good" content from -- the container until the error is detected, then this error and then the list -- is finished. decodeContainerWithReaderSchema :: forall a. FromAvro a => Schema -> BL.ByteString -> [Either String a] decodeContainerWithReaderSchema readerSchema payload = case Container.extractContainerValues (flip deconflict readerSchema) (getValue >=> (either fail pure . fromAvro)) payload of Left err -> [Left err] Right (_, values) -> values -- | Splits container into a list of individual avro-encoded values. -- -- This is particularly useful when slicing up containers into one or more -- smaller files. By extracting the original bytestring it is possible to -- avoid re-encoding data. extractContainerValuesBytes :: BL.ByteString -> Either String (Schema, [Either String BL.ByteString]) extractContainerValuesBytes = (fmap . fmap . fmap . fmap) snd . Container.extractContainerValuesBytes (pure . fromSchema) getValue {-# INLINE extractContainerValuesBytes #-} -- | Splits container into a list of individual avro-encoded values. -- This version provides both encoded and decoded values. -- -- This is particularly useful when slicing up containers into one or more -- smaller files. By extracting the original bytestring it is possible to -- avoid re-encoding data. decodeContainerValuesBytes :: forall a. FromAvro a => Schema -> BL.ByteString -> Either String (Schema, [Either String (a, BL.ByteString)]) decodeContainerValuesBytes readerSchema = Container.extractContainerValuesBytes (flip deconflict readerSchema) (getValue >=> (either fail pure . fromAvro)) {-# INLINE decodeContainerValuesBytes #-} -- | Encode chunks of values into a container, using 16 random bytes for -- the synchronization markers and a corresponding 'HasAvroSchema' schema. -- Blocks are compressed (or not) according to the given 'Codec' ('nullCodec' or 'deflateCodec'). encodeContainer :: forall a. (HasAvroSchema a, ToAvro a) => Codec -> [[a]] -> IO BL.ByteString encodeContainer codec = encodeContainerWithSchema codec (untag @a schema) -- | Encode chunks of values into a container, using 16 random bytes for -- the synchronization markers. Blocks are compressed (or not) according -- to the given 'Codec' ('nullCodec' or 'deflateCodec'). encodeContainerWithSchema :: ToAvro a => Codec -> Schema -> [[a]] -> IO BL.ByteString encodeContainerWithSchema codec sch xss = do sync <- Container.newSyncBytes return $ encodeContainerWithSync codec sch sync xss -- |Encode chunks of objects into a container, using the provided -- ByteString as the synchronization markers. encodeContainerWithSync :: ToAvro a => Codec -> Schema -> BL.ByteString -> [[a]] -> BL.ByteString encodeContainerWithSync = Container.packContainerValuesWithSync' toAvro {-# INLINE encodeContainerWithSync #-}