{-# LANGUAGE LambdaCase          #-}
{-# LANGUAGE OverloadedStrings   #-}
{-# LANGUAGE RankNTypes          #-}
{-# LANGUAGE RecordWildCards     #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE StrictData          #-}
{-# LANGUAGE TupleSections       #-}
{-# LANGUAGE TypeApplications    #-}

module Data.Avro.Internal.Container
where

import           Control.Monad                (when)
import qualified Data.Aeson                   as Aeson
import           Data.Avro.Codec              (Codec (..), Decompress)
import qualified Data.Avro.Codec              as Codec
import           Data.Avro.Encoding.ToAvro    (toAvro)
import           Data.Avro.Internal.EncodeRaw (encodeRaw)
import           Data.Avro.Schema.Schema      (Schema)
import qualified Data.Avro.Schema.Schema      as Schema
import           Data.Binary.Get              (Get)
import qualified Data.Binary.Get              as Get
import           Data.ByteString              (ByteString)
import           Data.ByteString.Builder      (Builder, lazyByteString, toLazyByteString)
import qualified Data.ByteString.Lazy         as BL
import qualified Data.ByteString.Lazy.Char8   as BLC
import           Data.Either                  (isRight)
import           Data.HashMap.Strict          (HashMap)
import qualified Data.HashMap.Strict          as HashMap
import           Data.Int                     (Int32, Int64)
import           Data.List                    (foldl', unfoldr)
import qualified Data.Map.Strict              as Map
import           Data.Text                    (Text)
import           System.Random.TF.Init        (initTFGen)
import           System.Random.TF.Instances   (randoms)

import qualified Data.Avro.Internal.Get as AGet

data ContainerHeader = ContainerHeader
  { ContainerHeader -> ByteString
syncBytes       :: BL.ByteString
  , ContainerHeader -> forall a. Decompress a
decompress      :: forall a. Decompress a
  , ContainerHeader -> Schema
containedSchema :: Schema
  }

nrSyncBytes :: Integral sb => sb
nrSyncBytes :: sb
nrSyncBytes = sb
16
{-# INLINE nrSyncBytes #-}

-- | Generates a new synchronization marker for encoding Avro containers
newSyncBytes :: IO BL.ByteString
newSyncBytes :: IO ByteString
newSyncBytes = [Word8] -> ByteString
BL.pack ([Word8] -> ByteString)
-> (TFGen -> [Word8]) -> TFGen -> ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> [Word8] -> [Word8]
forall a. Int -> [a] -> [a]
take Int
forall sb. Integral sb => sb
nrSyncBytes ([Word8] -> [Word8]) -> (TFGen -> [Word8]) -> TFGen -> [Word8]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TFGen -> [Word8]
forall a g. (Random a, RandomGen g) => g -> [a]
randoms (TFGen -> ByteString) -> IO TFGen -> IO ByteString
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO TFGen
initTFGen

getContainerHeader :: Get ContainerHeader
getContainerHeader :: Get ContainerHeader
getContainerHeader = do
  ByteString
magic <- Int -> Get ByteString
getFixed Int
forall sb. Integral sb => sb
avroMagicSize
  Bool -> Get () -> Get ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (ByteString -> ByteString
BL.fromStrict ByteString
magic ByteString -> ByteString -> Bool
forall a. Eq a => a -> a -> Bool
/= ByteString
avroMagicBytes)
        (String -> Get ()
forall (m :: * -> *) a. MonadFail m => String -> m a
fail String
"Invalid magic number at start of container.")
  Map Text ByteString
metadata <- Get (Map Text ByteString)
getMeta
  ByteString
sync  <- ByteString -> ByteString
BL.fromStrict (ByteString -> ByteString) -> Get ByteString -> Get ByteString
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Int -> Get ByteString
getFixed Int
forall sb. Integral sb => sb
nrSyncBytes
  Codec
codec <- Maybe ByteString -> Get Codec
forall (m :: * -> *). Monad m => Maybe ByteString -> m Codec
parseCodec (Text -> Map Text ByteString -> Maybe ByteString
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup Text
"avro.codec" Map Text ByteString
metadata)
  Schema
schema <- case Text -> Map Text ByteString -> Maybe ByteString
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup Text
"avro.schema" Map Text ByteString
metadata of
              Maybe ByteString
Nothing -> String -> Get Schema
forall (m :: * -> *) a. MonadFail m => String -> m a
fail String
"Invalid container object: no schema."
              Just ByteString
s  -> case ByteString -> Either String Schema
forall a. FromJSON a => ByteString -> Either String a
Aeson.eitherDecode' ByteString
s of
                            Left String
e  -> String -> Get Schema
forall (m :: * -> *) a. MonadFail m => String -> m a
fail (String
"Can not decode container schema: " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
e)
                            Right Schema
x -> Schema -> Get Schema
forall (m :: * -> *) a. Monad m => a -> m a
return Schema
x
  ContainerHeader -> Get ContainerHeader
forall (m :: * -> *) a. Monad m => a -> m a
return ContainerHeader :: ByteString -> (forall a. Decompress a) -> Schema -> ContainerHeader
ContainerHeader  { syncBytes :: ByteString
syncBytes = ByteString
sync
                          , decompress :: forall a. Decompress a
decompress = Codec -> forall a. Decompress a
Codec.codecDecompress Codec
codec
                          , containedSchema :: Schema
containedSchema = Schema
schema
                          }
  where avroMagicSize :: Integral a => a
        avroMagicSize :: a
avroMagicSize = a
4

        avroMagicBytes :: BL.ByteString
        avroMagicBytes :: ByteString
avroMagicBytes = String -> ByteString
BLC.pack String
"Obj" ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> [Word8] -> ByteString
BL.pack [Word8
1]

        getFixed :: Int -> Get ByteString
        getFixed :: Int -> Get ByteString
getFixed = Int -> Get ByteString
Get.getByteString

        getMeta :: Get (Map.Map Text BL.ByteString)
        getMeta :: Get (Map Text ByteString)
getMeta =
          let keyValue :: Get (Text, ByteString)
keyValue = (,) (Text -> ByteString -> (Text, ByteString))
-> Get Text -> Get (ByteString -> (Text, ByteString))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Get Text
AGet.getString Get (ByteString -> (Text, ByteString))
-> Get ByteString -> Get (Text, ByteString)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Get ByteString
AGet.getBytesLazy
          in [(Text, ByteString)] -> Map Text ByteString
forall k a. Ord k => [(k, a)] -> Map k a
Map.fromList ([(Text, ByteString)] -> Map Text ByteString)
-> Get [(Text, ByteString)] -> Get (Map Text ByteString)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Get (Text, ByteString) -> Get [(Text, ByteString)]
forall a. Get a -> Get [a]
AGet.decodeBlocks Get (Text, ByteString)
keyValue

-- | Reads the container as a list of blocks without decoding them into actual values.
--
-- This can be useful for streaming / splitting / merging Avro containers without
-- paying the cost for Avro encoding/decoding.
--
-- Each block is returned as a raw 'ByteString' annotated with the number of Avro values
-- that are contained in this block.
--
-- The "outer" error represents the error in opening the container itself
-- (including problems like reading schemas embedded into the container.)
decodeRawBlocks :: BL.ByteString -> Either String (Schema, [Either String (Int, BL.ByteString)])
decodeRawBlocks :: ByteString
-> Either String (Schema, [Either String (Int, ByteString)])
decodeRawBlocks ByteString
bs =
  case Get ContainerHeader
-> ByteString
-> Either
     (ByteString, ByteOffset, String)
     (ByteString, ByteOffset, ContainerHeader)
forall a.
Get a
-> ByteString
-> Either
     (ByteString, ByteOffset, String) (ByteString, ByteOffset, a)
Get.runGetOrFail Get ContainerHeader
getContainerHeader ByteString
bs of
    Left (ByteString
bs', ByteOffset
_, String
err) -> String -> Either String (Schema, [Either String (Int, ByteString)])
forall a b. a -> Either a b
Left String
err
    Right (ByteString
bs', ByteOffset
_, ContainerHeader {ByteString
Schema
forall a. Decompress a
containedSchema :: Schema
decompress :: forall a. Decompress a
syncBytes :: ByteString
containedSchema :: ContainerHeader -> Schema
decompress :: ContainerHeader -> forall a. Decompress a
syncBytes :: ContainerHeader -> ByteString
..}) ->
      let blocks :: [Either String (Int, ByteString)]
blocks = ByteString
-> Decompress ByteString
-> ByteString
-> [Either String (Int, ByteString)]
allBlocks ByteString
syncBytes Decompress ByteString
forall a. Decompress a
decompress ByteString
bs'
      in (Schema, [Either String (Int, ByteString)])
-> Either String (Schema, [Either String (Int, ByteString)])
forall a b. b -> Either a b
Right (Schema
containedSchema, [Either String (Int, ByteString)]
blocks)
  where
    allBlocks :: ByteString
-> Decompress ByteString
-> ByteString
-> [Either String (Int, ByteString)]
allBlocks ByteString
sync Decompress ByteString
decompress ByteString
bytes =
      ((Maybe ByteString
  -> Maybe (Either String (Int, ByteString), Maybe ByteString))
 -> Maybe ByteString -> [Either String (Int, ByteString)])
-> Maybe ByteString
-> (Maybe ByteString
    -> Maybe (Either String (Int, ByteString), Maybe ByteString))
-> [Either String (Int, ByteString)]
forall a b c. (a -> b -> c) -> b -> a -> c
flip (Maybe ByteString
 -> Maybe (Either String (Int, ByteString), Maybe ByteString))
-> Maybe ByteString -> [Either String (Int, ByteString)]
forall b a. (b -> Maybe (a, b)) -> b -> [a]
unfoldr (ByteString -> Maybe ByteString
forall a. a -> Maybe a
Just ByteString
bytes) ((Maybe ByteString
  -> Maybe (Either String (Int, ByteString), Maybe ByteString))
 -> [Either String (Int, ByteString)])
-> (Maybe ByteString
    -> Maybe (Either String (Int, ByteString), Maybe ByteString))
-> [Either String (Int, ByteString)]
forall a b. (a -> b) -> a -> b
$ \case
        Just ByteString
rest -> ByteString
-> Decompress ByteString
-> ByteString
-> Maybe (Either String (Int, ByteString), Maybe ByteString)
next ByteString
sync Decompress ByteString
decompress ByteString
rest
        Maybe ByteString
Nothing   -> Maybe (Either String (Int, ByteString), Maybe ByteString)
forall a. Maybe a
Nothing

    next :: ByteString
-> Decompress ByteString
-> ByteString
-> Maybe (Either String (Int, ByteString), Maybe ByteString)
next ByteString
syncBytes Decompress ByteString
decompress ByteString
bytes =
      case ByteString
-> Decompress ByteString
-> ByteString
-> Either String (Maybe (Int, ByteString, ByteString))
getNextBlock ByteString
syncBytes Decompress ByteString
decompress ByteString
bytes of
        Right (Just (Int
numObj, ByteString
block, ByteString
rest)) -> (Either String (Int, ByteString), Maybe ByteString)
-> Maybe (Either String (Int, ByteString), Maybe ByteString)
forall a. a -> Maybe a
Just ((Int, ByteString) -> Either String (Int, ByteString)
forall a b. b -> Either a b
Right (Int
numObj, ByteString
block), ByteString -> Maybe ByteString
forall a. a -> Maybe a
Just ByteString
rest)
        Right Maybe (Int, ByteString, ByteString)
Nothing                      -> Maybe (Either String (Int, ByteString), Maybe ByteString)
forall a. Maybe a
Nothing
        Left String
err                           -> (Either String (Int, ByteString), Maybe ByteString)
-> Maybe (Either String (Int, ByteString), Maybe ByteString)
forall a. a -> Maybe a
Just (String -> Either String (Int, ByteString)
forall a b. a -> Either a b
Left String
err, Maybe ByteString
forall a. Maybe a
Nothing)

getNextBlock :: BL.ByteString
             -> Decompress BL.ByteString
             -> BL.ByteString
             -> Either String (Maybe (Int, BL.ByteString, BL.ByteString))
getNextBlock :: ByteString
-> Decompress ByteString
-> ByteString
-> Either String (Maybe (Int, ByteString, ByteString))
getNextBlock ByteString
sync Decompress ByteString
decompress ByteString
bs =
  if ByteString -> Bool
BL.null ByteString
bs
    then Maybe (Int, ByteString, ByteString)
-> Either String (Maybe (Int, ByteString, ByteString))
forall a b. b -> Either a b
Right Maybe (Int, ByteString, ByteString)
forall a. Maybe a
Nothing
    else case Get (Int, ByteString)
-> ByteString
-> Either
     (ByteString, ByteOffset, String)
     (ByteString, ByteOffset, (Int, ByteString))
forall a.
Get a
-> ByteString
-> Either
     (ByteString, ByteOffset, String) (ByteString, ByteOffset, a)
Get.runGetOrFail (Decompress ByteString -> Get (Int, ByteString)
getRawBlock Decompress ByteString
decompress) ByteString
bs of
      Left (ByteString
bs', ByteOffset
_, String
err)             -> String -> Either String (Maybe (Int, ByteString, ByteString))
forall a b. a -> Either a b
Left String
err
      Right (ByteString
bs', ByteOffset
_, (Int
nrObj, ByteString
bytes)) ->
        case ByteString -> ByteString -> Either String ByteString
checkMarker ByteString
sync ByteString
bs' of
          Left String
err   -> String -> Either String (Maybe (Int, ByteString, ByteString))
forall a b. a -> Either a b
Left String
err
          Right ByteString
rest -> Maybe (Int, ByteString, ByteString)
-> Either String (Maybe (Int, ByteString, ByteString))
forall a b. b -> Either a b
Right (Maybe (Int, ByteString, ByteString)
 -> Either String (Maybe (Int, ByteString, ByteString)))
-> Maybe (Int, ByteString, ByteString)
-> Either String (Maybe (Int, ByteString, ByteString))
forall a b. (a -> b) -> a -> b
$ (Int, ByteString, ByteString)
-> Maybe (Int, ByteString, ByteString)
forall a. a -> Maybe a
Just (Int
nrObj, ByteString
bytes, ByteString
rest)
  where
    getRawBlock :: Decompress BL.ByteString -> Get (Int, BL.ByteString)
    getRawBlock :: Decompress ByteString -> Get (Int, ByteString)
getRawBlock Decompress ByteString
decompress = do
      Int
nrObj    <- Get ByteOffset
AGet.getLong Get ByteOffset -> (ByteOffset -> Get Int) -> Get Int
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ByteOffset -> Get Int
forall a b (m :: * -> *).
(Monad m, Bounded a, Bounded b, Integral a, Integral b) =>
a -> m b
AGet.sFromIntegral
      ByteOffset
nrBytes  <- Get ByteOffset
AGet.getLong
      ByteString
compressed <- ByteOffset -> Get ByteString
Get.getLazyByteString ByteOffset
nrBytes
      ByteString
bytes <- case Decompress ByteString
decompress ByteString
compressed Get ByteString
Get.getRemainingLazyByteString of
        Right ByteString
x  -> ByteString -> Get ByteString
forall (f :: * -> *) a. Applicative f => a -> f a
pure ByteString
x
        Left String
err -> String -> Get ByteString
forall (m :: * -> *) a. MonadFail m => String -> m a
fail String
err
      (Int, ByteString) -> Get (Int, ByteString)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int
nrObj, ByteString
bytes)

    checkMarker :: BL.ByteString -> BL.ByteString -> Either String BL.ByteString
    checkMarker :: ByteString -> ByteString -> Either String ByteString
checkMarker ByteString
sync ByteString
bs =
      case ByteOffset -> ByteString -> (ByteString, ByteString)
BL.splitAt ByteOffset
forall sb. Integral sb => sb
nrSyncBytes ByteString
bs of
        (ByteString
marker, ByteString
_) | ByteString
marker ByteString -> ByteString -> Bool
forall a. Eq a => a -> a -> Bool
/= ByteString
sync -> String -> Either String ByteString
forall a b. a -> Either a b
Left String
"Invalid marker, does not match sync bytes."
        (ByteString
_, ByteString
rest)                    -> ByteString -> Either String ByteString
forall a b. b -> Either a b
Right ByteString
rest

-- | Splits container into a list of individual avro-encoded values.
-- This version provides both encoded and decoded values.
--
-- This is particularly useful when slicing up containers into one or more
-- smaller files.  By extracting the original bytestring it is possible to
-- avoid re-encoding data.
extractContainerValuesBytes :: forall a schema.
     (Schema -> Either String schema)
  -> (schema -> Get a)
  -> BL.ByteString
  -> Either String (Schema, [Either String (a, BL.ByteString)])
extractContainerValuesBytes :: (Schema -> Either String schema)
-> (schema -> Get a)
-> ByteString
-> Either String (Schema, [Either String (a, ByteString)])
extractContainerValuesBytes Schema -> Either String schema
deconflict schema -> Get a
f =
  (Schema -> Either String schema)
-> (schema -> Get (a, ByteString))
-> ByteString
-> Either String (Schema, [Either String (a, ByteString)])
forall a schema.
(Schema -> Either String schema)
-> (schema -> Get a)
-> ByteString
-> Either String (Schema, [Either String a])
extractContainerValues Schema -> Either String schema
deconflict schema -> Get (a, ByteString)
readBytes
  where
    readBytes :: schema -> Get (a, ByteString)
readBytes schema
sch = do
      ByteOffset
start <- Get ByteOffset
Get.bytesRead
      (a
val, ByteOffset
end) <- Get (a, ByteOffset) -> Get (a, ByteOffset)
forall a. Get a -> Get a
Get.lookAhead (schema -> Get a
f schema
sch Get a -> (a -> Get (a, ByteOffset)) -> Get (a, ByteOffset)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (\a
v -> (a
v, ) (ByteOffset -> (a, ByteOffset))
-> Get ByteOffset -> Get (a, ByteOffset)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Get ByteOffset
Get.bytesRead))
      ByteString
res <- ByteOffset -> Get ByteString
Get.getLazyByteString (ByteOffset
endByteOffset -> ByteOffset -> ByteOffset
forall a. Num a => a -> a -> a
-ByteOffset
start)
      (a, ByteString) -> Get (a, ByteString)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (a
val, ByteString
res)

extractContainerValues :: forall a schema.
     (Schema -> Either String schema)
  -> (schema -> Get a)
  -> BL.ByteString
  -> Either String (Schema, [Either String a])
extractContainerValues :: (Schema -> Either String schema)
-> (schema -> Get a)
-> ByteString
-> Either String (Schema, [Either String a])
extractContainerValues Schema -> Either String schema
deconflict schema -> Get a
f ByteString
bs = do
  (Schema
sch, [Either String (Int, ByteString)]
blocks) <- ByteString
-> Either String (Schema, [Either String (Int, ByteString)])
decodeRawBlocks ByteString
bs
  schema
readSchema <- Schema -> Either String schema
deconflict Schema
sch
  (Schema, [Either String a])
-> Either String (Schema, [Either String a])
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Schema
sch, (Either String a -> Bool) -> [Either String a] -> [Either String a]
forall a. (a -> Bool) -> [a] -> [a]
takeWhileInclusive Either String a -> Bool
forall a b. Either a b -> Bool
isRight ([Either String a] -> [Either String a])
-> [Either String a] -> [Either String a]
forall a b. (a -> b) -> a -> b
$ [Either String (Int, ByteString)]
blocks [Either String (Int, ByteString)]
-> (Either String (Int, ByteString) -> [Either String a])
-> [Either String a]
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= schema -> Either String (Int, ByteString) -> [Either String a]
forall a a.
Integral a =>
schema -> Either a (a, ByteString) -> [Either String a]
decodeBlock schema
readSchema)
  where
    decodeBlock :: schema -> Either a (a, ByteString) -> [Either String a]
decodeBlock schema
_ (Left a
err)               = [Either String a]
forall a. HasCallStack => a
undefined
    decodeBlock schema
sch (Right (a
nrObj, ByteString
bytes)) = (ByteString, [Either String a]) -> [Either String a]
forall a b. (a, b) -> b
snd ((ByteString, [Either String a]) -> [Either String a])
-> (ByteString, [Either String a]) -> [Either String a]
forall a b. (a -> b) -> a -> b
$ ByteOffset
-> (ByteString -> (ByteString, Either String a))
-> ByteString
-> (ByteString, [Either String a])
forall a b. ByteOffset -> (a -> (a, b)) -> a -> (a, [b])
consumeN (a -> ByteOffset
forall a b. (Integral a, Num b) => a -> b
fromIntegral a
nrObj) (schema -> ByteString -> (ByteString, Either String a)
decodeValue schema
sch) ByteString
bytes

    decodeValue :: schema -> ByteString -> (ByteString, Either String a)
decodeValue schema
sch ByteString
bytes =
      case Get a
-> ByteString
-> Either
     (ByteString, ByteOffset, String) (ByteString, ByteOffset, a)
forall a.
Get a
-> ByteString
-> Either
     (ByteString, ByteOffset, String) (ByteString, ByteOffset, a)
Get.runGetOrFail (schema -> Get a
f schema
sch) ByteString
bytes of
        Left (ByteString
bs', ByteOffset
_, String
err)  -> (ByteString
bs', String -> Either String a
forall a b. a -> Either a b
Left String
err)
        Right (ByteString
bs', ByteOffset
_, a
res) -> (ByteString
bs', a -> Either String a
forall a b. b -> Either a b
Right a
res)

-- | Packs a container from a given list of already encoded Avro values
-- Each bytestring should represent exactly one one value serialised to Avro.
packContainerValues :: Codec -> Schema -> [[BL.ByteString]] -> IO BL.ByteString
packContainerValues :: Codec -> Schema -> [[ByteString]] -> IO ByteString
packContainerValues Codec
codec Schema
sch [[ByteString]]
values = do
  ByteString
sync <- IO ByteString
newSyncBytes
  ByteString -> IO ByteString
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ByteString -> IO ByteString) -> ByteString -> IO ByteString
forall a b. (a -> b) -> a -> b
$ Codec -> Schema -> ByteString -> [[ByteString]] -> ByteString
packContainerValuesWithSync Codec
codec Schema
sch ByteString
sync [[ByteString]]
values

-- | Packs a container from a given list of already encoded Avro values
-- Each bytestring should represent exactly one one value serialised to Avro.
packContainerValuesWithSync :: Codec -> Schema -> BL.ByteString -> [[BL.ByteString]] -> BL.ByteString
packContainerValuesWithSync :: Codec -> Schema -> ByteString -> [[ByteString]] -> ByteString
packContainerValuesWithSync = (Schema -> ByteString -> Builder)
-> Codec -> Schema -> ByteString -> [[ByteString]] -> ByteString
forall a.
(Schema -> a -> Builder)
-> Codec -> Schema -> ByteString -> [[a]] -> ByteString
packContainerValuesWithSync' (\Schema
_ ByteString
a -> ByteString -> Builder
lazyByteString ByteString
a)
{-# INLINABLE packContainerValuesWithSync #-}
-- | Packs a container from a given list of already encoded Avro values
-- Each bytestring should represent exactly one one value serialised to Avro.
packContainerValuesWithSync' ::
     (Schema -> a -> Builder)
  -> Codec
  -> Schema
  -> BL.ByteString
  -> [[a]]
  -> BL.ByteString
packContainerValuesWithSync' :: (Schema -> a -> Builder)
-> Codec -> Schema -> ByteString -> [[a]] -> ByteString
packContainerValuesWithSync' Schema -> a -> Builder
encode Codec
codec Schema
sch ByteString
syncBytes [[a]]
values =
  Builder -> ByteString
toLazyByteString (Builder -> ByteString) -> Builder -> ByteString
forall a b. (a -> b) -> a -> b
$ Codec -> Schema -> ByteString -> Builder
containerHeaderWithSync Codec
codec Schema
sch ByteString
syncBytes Builder -> Builder -> Builder
forall a. Semigroup a => a -> a -> a
<> ([a] -> Builder) -> [[a]] -> Builder
forall (t :: * -> *) m a.
(Foldable t, Monoid m) =>
(a -> m) -> t a -> m
foldMap [a] -> Builder
forall (t :: * -> *). Foldable t => t a -> Builder
putBlock [[a]]
values
  where
    putBlock :: t a -> Builder
putBlock t a
ys =
      let nrObj :: Int
nrObj = t a -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length t a
ys
          nrBytes :: ByteOffset
nrBytes = ByteString -> ByteOffset
BL.length ByteString
theBytes
          theBytes :: ByteString
theBytes = Codec -> ByteString -> ByteString
codecCompress Codec
codec (ByteString -> ByteString) -> ByteString -> ByteString
forall a b. (a -> b) -> a -> b
$ Builder -> ByteString
toLazyByteString (Builder -> ByteString) -> Builder -> ByteString
forall a b. (a -> b) -> a -> b
$ (a -> Builder) -> t a -> Builder
forall (t :: * -> *) m a.
(Foldable t, Monoid m) =>
(a -> m) -> t a -> m
foldMap (Schema -> a -> Builder
encode Schema
sch) t a
ys
      in Int32 -> Builder
forall a. EncodeRaw a => a -> Builder
encodeRaw @Int32 (Int -> Int32
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
nrObj) Builder -> Builder -> Builder
forall a. Semigroup a => a -> a -> a
<>
         ByteOffset -> Builder
forall a. EncodeRaw a => a -> Builder
encodeRaw ByteOffset
nrBytes Builder -> Builder -> Builder
forall a. Semigroup a => a -> a -> a
<>
         ByteString -> Builder
lazyByteString ByteString
theBytes Builder -> Builder -> Builder
forall a. Semigroup a => a -> a -> a
<>
         ByteString -> Builder
lazyByteString ByteString
syncBytes

-- | Packs a new container from a list of already encoded Avro blocks.
-- Each block is denoted as a pair of a number of objects within that block and the block content.
packContainerBlocks :: Codec -> Schema -> [(Int, BL.ByteString)] -> IO BL.ByteString
packContainerBlocks :: Codec -> Schema -> [(Int, ByteString)] -> IO ByteString
packContainerBlocks Codec
codec Schema
sch [(Int, ByteString)]
blocks = do
  ByteString
sync <- IO ByteString
newSyncBytes
  ByteString -> IO ByteString
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ByteString -> IO ByteString) -> ByteString -> IO ByteString
forall a b. (a -> b) -> a -> b
$ Codec -> Schema -> ByteString -> [(Int, ByteString)] -> ByteString
packContainerBlocksWithSync Codec
codec Schema
sch ByteString
sync [(Int, ByteString)]
blocks

-- | Packs a new container from a list of already encoded Avro blocks.
-- Each block is denoted as a pair of a number of objects within that block and the block content.
packContainerBlocksWithSync :: Codec -> Schema -> BL.ByteString -> [(Int, BL.ByteString)] -> BL.ByteString
packContainerBlocksWithSync :: Codec -> Schema -> ByteString -> [(Int, ByteString)] -> ByteString
packContainerBlocksWithSync Codec
codec Schema
sch ByteString
syncBytes [(Int, ByteString)]
blocks =
  Builder -> ByteString
toLazyByteString (Builder -> ByteString) -> Builder -> ByteString
forall a b. (a -> b) -> a -> b
$
    Codec -> Schema -> ByteString -> Builder
containerHeaderWithSync Codec
codec Schema
sch ByteString
syncBytes Builder -> Builder -> Builder
forall a. Semigroup a => a -> a -> a
<>
    ((Int, ByteString) -> Builder) -> [(Int, ByteString)] -> Builder
forall (t :: * -> *) m a.
(Foldable t, Monoid m) =>
(a -> m) -> t a -> m
foldMap (Int, ByteString) -> Builder
forall a. Integral a => (a, ByteString) -> Builder
putBlock [(Int, ByteString)]
blocks
  where
    putBlock :: (a, ByteString) -> Builder
putBlock (a
nrObj, ByteString
bytes) =
      let compressed :: ByteString
compressed = Codec -> ByteString -> ByteString
codecCompress Codec
codec ByteString
bytes in
        Int32 -> Builder
forall a. EncodeRaw a => a -> Builder
encodeRaw @Int32 (a -> Int32
forall a b. (Integral a, Num b) => a -> b
fromIntegral a
nrObj) Builder -> Builder -> Builder
forall a. Semigroup a => a -> a -> a
<>
        ByteOffset -> Builder
forall a. EncodeRaw a => a -> Builder
encodeRaw (ByteString -> ByteOffset
BL.length ByteString
compressed) Builder -> Builder -> Builder
forall a. Semigroup a => a -> a -> a
<>
        ByteString -> Builder
lazyByteString ByteString
compressed Builder -> Builder -> Builder
forall a. Semigroup a => a -> a -> a
<>
        ByteString -> Builder
lazyByteString ByteString
syncBytes


-- | Creates an Avro container header for a given schema.
containerHeaderWithSync :: Codec -> Schema -> BL.ByteString -> Builder
containerHeaderWithSync :: Codec -> Schema -> ByteString -> Builder
containerHeaderWithSync Codec
codec Schema
sch ByteString
syncBytes =
  ByteString -> Builder
lazyByteString ByteString
avroMagicBytes
    Builder -> Builder -> Builder
forall a. Semigroup a => a -> a -> a
<> Schema -> HashMap Text ByteString -> Builder
forall a. ToAvro a => Schema -> a -> Builder
toAvro (Schema -> Schema
Schema.Map Schema
Schema.Bytes') HashMap Text ByteString
headers
    Builder -> Builder -> Builder
forall a. Semigroup a => a -> a -> a
<> ByteString -> Builder
lazyByteString ByteString
syncBytes
  where
    avroMagicBytes :: BL.ByteString
    avroMagicBytes :: ByteString
avroMagicBytes = ByteString
"Obj" ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> [Word8] -> ByteString
BL.pack [Word8
1]

    headers :: HashMap Text BL.ByteString
    headers :: HashMap Text ByteString
headers =
      [(Text, ByteString)] -> HashMap Text ByteString
forall k v. (Eq k, Hashable k) => [(k, v)] -> HashMap k v
HashMap.fromList
        [
          (Text
"avro.schema", Schema -> ByteString
forall a. ToJSON a => a -> ByteString
Aeson.encode Schema
sch)
        , (Text
"avro.codec", ByteString -> ByteString
BL.fromStrict (Codec -> ByteString
codecName Codec
codec))
        ]

-----------------------------------------------------------------

consumeN :: Int64 -> (a -> (a, b)) -> a -> (a, [b])
consumeN :: ByteOffset -> (a -> (a, b)) -> a -> (a, [b])
consumeN ByteOffset
n a -> (a, b)
f a
a =
  if ByteOffset
n ByteOffset -> ByteOffset -> Bool
forall a. Eq a => a -> a -> Bool
== ByteOffset
0
    then (a
a, [])
    else
      let (a
a', b
b) = a -> (a, b)
f a
a
          (a
r, [b]
bs) = ByteOffset -> (a -> (a, b)) -> a -> (a, [b])
forall a b. ByteOffset -> (a -> (a, b)) -> a -> (a, [b])
consumeN (ByteOffset
nByteOffset -> ByteOffset -> ByteOffset
forall a. Num a => a -> a -> a
-ByteOffset
1) a -> (a, b)
f a
a'
      in (a
r, b
bb -> [b] -> [b]
forall a. a -> [a] -> [a]
:[b]
bs)
{-# INLINE consumeN #-}

----------------------------------------------------------------
parseCodec :: Monad m => Maybe BL.ByteString -> m Codec
parseCodec :: Maybe ByteString -> m Codec
parseCodec (Just ByteString
"null")    = Codec -> m Codec
forall (f :: * -> *) a. Applicative f => a -> f a
pure Codec
Codec.nullCodec
parseCodec (Just ByteString
"deflate") = Codec -> m Codec
forall (f :: * -> *) a. Applicative f => a -> f a
pure Codec
Codec.deflateCodec
parseCodec (Just ByteString
x)         = String -> m Codec
forall a. HasCallStack => String -> a
error (String -> m Codec) -> String -> m Codec
forall a b. (a -> b) -> a -> b
$ String
"Unrecognized codec: " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> ByteString -> String
BLC.unpack ByteString
x
parseCodec Maybe ByteString
Nothing          = Codec -> m Codec
forall (f :: * -> *) a. Applicative f => a -> f a
pure Codec
Codec.nullCodec

takeWhileInclusive :: (a -> Bool) -> [a] -> [a]
takeWhileInclusive :: (a -> Bool) -> [a] -> [a]
takeWhileInclusive a -> Bool
_ [] = []
takeWhileInclusive a -> Bool
p (a
x:[a]
xs) =
  a
x a -> [a] -> [a]
forall a. a -> [a] -> [a]
: if a -> Bool
p a
x then (a -> Bool) -> [a] -> [a]
forall a. (a -> Bool) -> [a] -> [a]
takeWhileInclusive a -> Bool
p [a]
xs else []
{-# INLINE takeWhileInclusive #-}