{-# LANGUAGE ConstraintKinds     #-}
{-# LANGUAGE FlexibleInstances   #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications    #-}

-- | Avro encoding and decoding routines.
--
-- This library provides a high level interface for encoding and decoding
-- Haskell values in Apache's Avro serialization format.
module Data.Avro
  ( -- * Schema
    Schema(..)
  , Schema.Field(..), Schema.Order(..)
  , Schema.TypeName(..)
  , Schema.Decimal(..)
  , Schema.LogicalTypeBytes(..), Schema.LogicalTypeFixed(..)
  , Schema.LogicalTypeInt(..), Schema.LogicalTypeLong(..)
  , Schema.LogicalTypeString(..)

  -- * Deconflicting schemas
  , ReadSchema
  , deconflict
  , readSchemaFromSchema

  -- * Individual values
  , encodeValue
  , encodeValueWithSchema

  , decodeValue
  , decodeValueWithSchema

  -- * Working with containers
  -- ** Decoding containers
  , decodeContainer
  , decodeContainerWithEmbeddedSchema
  , decodeContainerWithReaderSchema

  , encodeContainer
  , encodeContainerWithSchema
  , encodeContainerWithSync
  , Container.newSyncBytes

  -- ** Extracting containers' data
  , extractContainerValuesBytes
  , decodeContainerValuesBytes

  -- * Classes
  , ToAvro
  , FromAvro

  -- * Compression
  , Codec, nullCodec, deflateCodec

  , HasAvroSchema(..)
  , schemaOf

  ) where

import           Control.Monad                ((>=>))
import           Data.Avro.Codec              (Codec, deflateCodec, nullCodec)
import           Data.Avro.Encoding.FromAvro
import           Data.Avro.Encoding.ToAvro
import           Data.Avro.HasAvroSchema
import qualified Data.Avro.Internal.Container as Container
import           Data.Avro.Schema.Deconflict  (deconflict)
import           Data.Avro.Schema.ReadSchema  (ReadSchema, fromSchema)
import           Data.Avro.Schema.Schema      (Schema)
import qualified Data.Avro.Schema.Schema      as Schema
import           Data.Binary.Get              (runGetOrFail)
import           Data.ByteString.Builder      (toLazyByteString)
import qualified Data.ByteString.Lazy         as BL
import           Data.Tagged                  (untag)

{- HLINT ignore "Use section"         -}

-- | Converts 'Schema' into 'ReadSchema'. This function may be useful when it is known
-- that the writer and the reader schemas are the same.
readSchemaFromSchema :: Schema -> ReadSchema
readSchemaFromSchema :: Schema -> ReadSchema
readSchemaFromSchema = Schema -> ReadSchema
fromSchema
{-# INLINE readSchemaFromSchema #-}

-- | Serialises an individual value into Avro with the schema provided.
encodeValueWithSchema :: ToAvro a => Schema -> a -> BL.ByteString
encodeValueWithSchema :: Schema -> a -> ByteString
encodeValueWithSchema Schema
s = Builder -> ByteString
toLazyByteString (Builder -> ByteString) -> (a -> Builder) -> a -> ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Schema -> a -> Builder
forall a. ToAvro a => Schema -> a -> Builder
toAvro Schema
s
{-# INLINE encodeValueWithSchema #-}

-- | Serialises an individual value into Avro using the schema
-- from its coresponding 'HasAvroSchema' instance.
encodeValue :: (HasAvroSchema a, ToAvro a) => a -> BL.ByteString
encodeValue :: a -> ByteString
encodeValue a
a = Schema -> a -> ByteString
forall a. ToAvro a => Schema -> a -> ByteString
encodeValueWithSchema (a -> Schema
forall a. HasAvroSchema a => a -> Schema
schemaOf a
a) a
a
{-# INLINE encodeValue #-}

-- | Deserialises an individual value from Avro.
decodeValueWithSchema :: FromAvro a => ReadSchema -> BL.ByteString -> Either String a
decodeValueWithSchema :: ReadSchema -> ByteString -> Either String a
decodeValueWithSchema ReadSchema
schema ByteString
payload =
  case Get Value
-> ByteString
-> Either
     (ByteString, ByteOffset, String) (ByteString, ByteOffset, Value)
forall a.
Get a
-> ByteString
-> Either
     (ByteString, ByteOffset, String) (ByteString, ByteOffset, a)
runGetOrFail (ReadSchema -> Get Value
getValue ReadSchema
schema) ByteString
payload of
    Right (ByteString
bs, ByteOffset
_, Value
v) -> Value -> Either String a
forall a. FromAvro a => Value -> Either String a
fromAvro Value
v
    Left (ByteString
_, ByteOffset
_, String
e)   -> String -> Either String a
forall a b. a -> Either a b
Left String
e

-- | Deserialises an individual value from Avro using the schema from its coresponding 'HasAvroSchema'.
--
-- __NOTE__: __This function is only to be used when reader and writes schemas are known to be the same.__
-- Because only one schema is known at this point, and it is the reader schema,
-- /no decondlicting/ can be performed.
decodeValue :: forall a. (HasAvroSchema a, FromAvro a) => BL.ByteString -> Either String a
decodeValue :: ByteString -> Either String a
decodeValue = ReadSchema -> ByteString -> Either String a
forall a. FromAvro a => ReadSchema -> ByteString -> Either String a
decodeValueWithSchema (Schema -> ReadSchema
fromSchema (Tagged a Schema -> Schema
forall k (s :: k) b. Tagged s b -> b
untag @a Tagged a Schema
forall a. HasAvroSchema a => Tagged a Schema
schema))
{-# INLINE decodeValue #-}

-- | Decodes the container using a schema from 'HasAvroSchema' as a reader schema.
--
-- Errors are reported as a part of the list and the list will stop at first
-- error. This means that the consumer will get all the "good" content from
-- the container until the error is detected, then this error and then the list
-- is finished.
decodeContainer :: forall a. (HasAvroSchema a, FromAvro a) => BL.ByteString -> [Either String a]
decodeContainer :: ByteString -> [Either String a]
decodeContainer = Schema -> ByteString -> [Either String a]
forall a. FromAvro a => Schema -> ByteString -> [Either String a]
decodeContainerWithReaderSchema (Tagged a Schema -> Schema
forall k (s :: k) b. Tagged s b -> b
untag @a Tagged a Schema
forall a. HasAvroSchema a => Tagged a Schema
schema)
{-# INLINE decodeContainer #-}

-- | Decodes the container as a list of values of the requested type.
--
-- Errors are reported as a part of the list and the list will stop at first
-- error. This means that the consumer will get all the "good" content from
-- the container until the error is detected, then this error and then the list
-- is finished.
decodeContainerWithEmbeddedSchema :: forall a. FromAvro a => BL.ByteString -> [Either String a]
decodeContainerWithEmbeddedSchema :: ByteString -> [Either String a]
decodeContainerWithEmbeddedSchema ByteString
payload =
  case (Schema -> Either String ReadSchema)
-> (ReadSchema -> Get a)
-> ByteString
-> Either String (Schema, [Either String a])
forall a schema.
(Schema -> Either String schema)
-> (schema -> Get a)
-> ByteString
-> Either String (Schema, [Either String a])
Container.extractContainerValues (ReadSchema -> Either String ReadSchema
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ReadSchema -> Either String ReadSchema)
-> (Schema -> ReadSchema) -> Schema -> Either String ReadSchema
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Schema -> ReadSchema
fromSchema) (ReadSchema -> Get Value
getValue (ReadSchema -> Get Value)
-> (Value -> Get a) -> ReadSchema -> Get a
forall (m :: * -> *) a b c.
Monad m =>
(a -> m b) -> (b -> m c) -> a -> m c
>=> ((String -> Get a) -> (a -> Get a) -> Either String a -> Get a
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either String -> Get a
forall (m :: * -> *) a. MonadFail m => String -> m a
fail a -> Get a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either String a -> Get a)
-> (Value -> Either String a) -> Value -> Get a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Value -> Either String a
forall a. FromAvro a => Value -> Either String a
fromAvro)) ByteString
payload of
    Left String
err          -> [String -> Either String a
forall a b. a -> Either a b
Left String
err]
    Right (Schema
_, [Either String a]
values) -> [Either String a]
values

-- | Decodes the container as a list of values of the requested type.
--
-- The provided reader schema will be de-conflicted with the schema
-- embedded with the container.
--
-- Errors are reported as a part of the list and the list will stop at first
-- error. This means that the consumer will get all the "good" content from
-- the container until the error is detected, then this error and then the list
-- is finished.
decodeContainerWithReaderSchema :: forall a. FromAvro a => Schema -> BL.ByteString -> [Either String a]
decodeContainerWithReaderSchema :: Schema -> ByteString -> [Either String a]
decodeContainerWithReaderSchema Schema
readerSchema ByteString
payload =
  case (Schema -> Either String ReadSchema)
-> (ReadSchema -> Get a)
-> ByteString
-> Either String (Schema, [Either String a])
forall a schema.
(Schema -> Either String schema)
-> (schema -> Get a)
-> ByteString
-> Either String (Schema, [Either String a])
Container.extractContainerValues ((Schema -> Schema -> Either String ReadSchema)
-> Schema -> Schema -> Either String ReadSchema
forall a b c. (a -> b -> c) -> b -> a -> c
flip Schema -> Schema -> Either String ReadSchema
deconflict Schema
readerSchema) (ReadSchema -> Get Value
getValue (ReadSchema -> Get Value)
-> (Value -> Get a) -> ReadSchema -> Get a
forall (m :: * -> *) a b c.
Monad m =>
(a -> m b) -> (b -> m c) -> a -> m c
>=> ((String -> Get a) -> (a -> Get a) -> Either String a -> Get a
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either String -> Get a
forall (m :: * -> *) a. MonadFail m => String -> m a
fail a -> Get a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either String a -> Get a)
-> (Value -> Either String a) -> Value -> Get a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Value -> Either String a
forall a. FromAvro a => Value -> Either String a
fromAvro)) ByteString
payload of
    Left String
err          -> [String -> Either String a
forall a b. a -> Either a b
Left String
err]
    Right (Schema
_, [Either String a]
values) -> [Either String a]
values

-- | Splits container into a list of individual avro-encoded values.
--
-- This is particularly useful when slicing up containers into one or more
-- smaller files.  By extracting the original bytestring it is possible to
-- avoid re-encoding data.
extractContainerValuesBytes :: BL.ByteString -> Either String (Schema, [Either String BL.ByteString])
extractContainerValuesBytes :: ByteString -> Either String (Schema, [Either String ByteString])
extractContainerValuesBytes =
  (((Schema, [Either String (Value, ByteString)])
 -> (Schema, [Either String ByteString]))
-> Either String (Schema, [Either String (Value, ByteString)])
-> Either String (Schema, [Either String ByteString])
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (((Schema, [Either String (Value, ByteString)])
  -> (Schema, [Either String ByteString]))
 -> Either String (Schema, [Either String (Value, ByteString)])
 -> Either String (Schema, [Either String ByteString]))
-> (((Value, ByteString) -> ByteString)
    -> (Schema, [Either String (Value, ByteString)])
    -> (Schema, [Either String ByteString]))
-> ((Value, ByteString) -> ByteString)
-> Either String (Schema, [Either String (Value, ByteString)])
-> Either String (Schema, [Either String ByteString])
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ([Either String (Value, ByteString)] -> [Either String ByteString])
-> (Schema, [Either String (Value, ByteString)])
-> (Schema, [Either String ByteString])
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (([Either String (Value, ByteString)]
  -> [Either String ByteString])
 -> (Schema, [Either String (Value, ByteString)])
 -> (Schema, [Either String ByteString]))
-> (((Value, ByteString) -> ByteString)
    -> [Either String (Value, ByteString)]
    -> [Either String ByteString])
-> ((Value, ByteString) -> ByteString)
-> (Schema, [Either String (Value, ByteString)])
-> (Schema, [Either String ByteString])
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Either String (Value, ByteString) -> Either String ByteString)
-> [Either String (Value, ByteString)]
-> [Either String ByteString]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((Either String (Value, ByteString) -> Either String ByteString)
 -> [Either String (Value, ByteString)]
 -> [Either String ByteString])
-> (((Value, ByteString) -> ByteString)
    -> Either String (Value, ByteString) -> Either String ByteString)
-> ((Value, ByteString) -> ByteString)
-> [Either String (Value, ByteString)]
-> [Either String ByteString]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ((Value, ByteString) -> ByteString)
-> Either String (Value, ByteString) -> Either String ByteString
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap) (Value, ByteString) -> ByteString
forall a b. (a, b) -> b
snd (Either String (Schema, [Either String (Value, ByteString)])
 -> Either String (Schema, [Either String ByteString]))
-> (ByteString
    -> Either String (Schema, [Either String (Value, ByteString)]))
-> ByteString
-> Either String (Schema, [Either String ByteString])
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Schema -> Either String ReadSchema)
-> (ReadSchema -> Get Value)
-> ByteString
-> Either String (Schema, [Either String (Value, ByteString)])
forall a schema.
(Schema -> Either String schema)
-> (schema -> Get a)
-> ByteString
-> Either String (Schema, [Either String (a, ByteString)])
Container.extractContainerValuesBytes (ReadSchema -> Either String ReadSchema
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ReadSchema -> Either String ReadSchema)
-> (Schema -> ReadSchema) -> Schema -> Either String ReadSchema
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Schema -> ReadSchema
fromSchema) ReadSchema -> Get Value
getValue
{-# INLINE extractContainerValuesBytes #-}

-- | Splits container into a list of individual avro-encoded values.
-- This version provides both encoded and decoded values.
--
-- This is particularly useful when slicing up containers into one or more
-- smaller files.  By extracting the original bytestring it is possible to
-- avoid re-encoding data.
decodeContainerValuesBytes :: forall a. FromAvro a
  => Schema
  -> BL.ByteString
  -> Either String (Schema, [Either String (a, BL.ByteString)])
decodeContainerValuesBytes :: Schema
-> ByteString
-> Either String (Schema, [Either String (a, ByteString)])
decodeContainerValuesBytes Schema
readerSchema =
  (Schema -> Either String ReadSchema)
-> (ReadSchema -> Get a)
-> ByteString
-> Either String (Schema, [Either String (a, ByteString)])
forall a schema.
(Schema -> Either String schema)
-> (schema -> Get a)
-> ByteString
-> Either String (Schema, [Either String (a, ByteString)])
Container.extractContainerValuesBytes ((Schema -> Schema -> Either String ReadSchema)
-> Schema -> Schema -> Either String ReadSchema
forall a b c. (a -> b -> c) -> b -> a -> c
flip Schema -> Schema -> Either String ReadSchema
deconflict Schema
readerSchema) (ReadSchema -> Get Value
getValue (ReadSchema -> Get Value)
-> (Value -> Get a) -> ReadSchema -> Get a
forall (m :: * -> *) a b c.
Monad m =>
(a -> m b) -> (b -> m c) -> a -> m c
>=> ((String -> Get a) -> (a -> Get a) -> Either String a -> Get a
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either String -> Get a
forall (m :: * -> *) a. MonadFail m => String -> m a
fail a -> Get a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either String a -> Get a)
-> (Value -> Either String a) -> Value -> Get a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Value -> Either String a
forall a. FromAvro a => Value -> Either String a
fromAvro))
{-# INLINE decodeContainerValuesBytes #-}

-- | Encode chunks of values into a container, using 16 random bytes for
-- the synchronization markers and a corresponding 'HasAvroSchema' schema.
-- Blocks are compressed (or not) according to the given 'Codec' ('nullCodec' or 'deflateCodec').
encodeContainer :: forall a. (HasAvroSchema a, ToAvro a) => Codec -> [[a]] -> IO BL.ByteString
encodeContainer :: Codec -> [[a]] -> IO ByteString
encodeContainer Codec
codec = Codec -> Schema -> [[a]] -> IO ByteString
forall a. ToAvro a => Codec -> Schema -> [[a]] -> IO ByteString
encodeContainerWithSchema Codec
codec (Tagged a Schema -> Schema
forall k (s :: k) b. Tagged s b -> b
untag @a Tagged a Schema
forall a. HasAvroSchema a => Tagged a Schema
schema)

-- | Encode chunks of values into a container, using 16 random bytes for
-- the synchronization markers. Blocks are compressed (or not) according
-- to the given 'Codec' ('nullCodec' or 'deflateCodec').
encodeContainerWithSchema :: ToAvro a => Codec -> Schema -> [[a]] -> IO BL.ByteString
encodeContainerWithSchema :: Codec -> Schema -> [[a]] -> IO ByteString
encodeContainerWithSchema Codec
codec Schema
sch [[a]]
xss =
  do ByteString
sync <- IO ByteString
Container.newSyncBytes
     ByteString -> IO ByteString
forall (m :: * -> *) a. Monad m => a -> m a
return (ByteString -> IO ByteString) -> ByteString -> IO ByteString
forall a b. (a -> b) -> a -> b
$ Codec -> Schema -> ByteString -> [[a]] -> ByteString
forall a.
ToAvro a =>
Codec -> Schema -> ByteString -> [[a]] -> ByteString
encodeContainerWithSync Codec
codec Schema
sch ByteString
sync [[a]]
xss

-- |Encode chunks of objects into a container, using the provided
-- ByteString as the synchronization markers.
encodeContainerWithSync :: ToAvro a => Codec -> Schema -> BL.ByteString -> [[a]] -> BL.ByteString
encodeContainerWithSync :: Codec -> Schema -> ByteString -> [[a]] -> ByteString
encodeContainerWithSync = (Schema -> a -> Builder)
-> Codec -> Schema -> ByteString -> [[a]] -> ByteString
forall a.
(Schema -> a -> Builder)
-> Codec -> Schema -> ByteString -> [[a]] -> ByteString
Container.packContainerValuesWithSync' Schema -> a -> Builder
forall a. ToAvro a => Schema -> a -> Builder
toAvro
{-# INLINE encodeContainerWithSync #-}