{-# LANGUAGE BangPatterns        #-}
{-# LANGUAGE OverloadedStrings   #-}
{-# LANGUAGE RankNTypes          #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Data.Avro.Codec (
    Codec(..)
  , Decompress
  , nullCodec
  , deflateCodec
  ) where

import           Codec.Compression.Zlib.Internal as Zlib
import qualified Data.Binary.Get                 as G
import           Data.ByteString                 (ByteString)
import qualified Data.ByteString                 as BS
import qualified Data.ByteString.Lazy            as LBS

-- | Block decompression function for blocks of Avro.
type Decompress a = LBS.ByteString -> G.Get a -> Either String a

-- | A `Codec` allows for compression/decompression of a block in an
-- Avro container according to the Avro spec.
data Codec = Codec
  {
    -- | The name of the codec according to the Avro spec.
    Codec -> ByteString
codecName       :: ByteString
    -- | Execute a `G.Get` over a chunk of bytes possibly decompressing
    -- the chunk incrementally.
    --
    -- The API is somewhat more complex than say `codecCompress` to allow
    -- interleaving of decompression and binary decoding while still allowing
    -- proper error handling without resorting to async exceptions.
  , Codec -> forall a. Decompress a
codecDecompress :: forall a. Decompress a

    -- | Compresses a lazy stream of bytes.
  , Codec -> ByteString -> ByteString
codecCompress   :: LBS.ByteString -> LBS.ByteString
  }

-- | `nullCodec` specifies @null@ required by Avro spec.
-- (see <https://avro.apache.org/docs/1.8.1/spec.html#null>)
nullCodec :: Codec
nullCodec :: Codec
nullCodec =
  Codec :: ByteString
-> (forall a. Decompress a) -> (ByteString -> ByteString) -> Codec
Codec
    {
      codecName :: ByteString
codecName = ByteString
"null"
    , codecDecompress :: forall a. Decompress a
codecDecompress = \ByteString
input Get a
parser ->
        case Get a
-> ByteString
-> Either
     (ByteString, ByteOffset, String) (ByteString, ByteOffset, a)
forall a.
Get a
-> ByteString
-> Either
     (ByteString, ByteOffset, String) (ByteString, ByteOffset, a)
G.runGetOrFail Get a
parser ByteString
input of
          Right (ByteString
_, ByteOffset
_, a
x)  -> a -> Either String a
forall a b. b -> Either a b
Right a
x
          Left (ByteString
_, ByteOffset
_, String
err) -> String -> Either String a
forall a b. a -> Either a b
Left String
err
    , codecCompress :: ByteString -> ByteString
codecCompress   = ByteString -> ByteString
forall a. a -> a
id
    }

-- | `deflateCodec` specifies @deflate@ codec required by Avro spec.
-- (see <https://avro.apache.org/docs/1.8.1/spec.html#deflate>)
deflateCodec :: Codec
deflateCodec :: Codec
deflateCodec =
  Codec :: ByteString
-> (forall a. Decompress a) -> (ByteString -> ByteString) -> Codec
Codec
    {
      codecName :: ByteString
codecName       = ByteString
"deflate"
    , codecDecompress :: forall a. Decompress a
codecDecompress = forall a. Decompress a
deflateDecompress
    , codecCompress :: ByteString -> ByteString
codecCompress   = ByteString -> ByteString
deflateCompress
    }

deflateCompress :: LBS.ByteString -> LBS.ByteString
deflateCompress :: ByteString -> ByteString
deflateCompress =
  Format -> CompressParams -> ByteString -> ByteString
Zlib.compress Format
Zlib.rawFormat CompressParams
Zlib.defaultCompressParams


-- | Internal type to help construct a lazy list of
-- decompressed bytes interleaved with errors if any.
data Chunk
  = ChunkRest LBS.ByteString
  | ChunkBytes ByteString
  | ChunkError Zlib.DecompressError


deflateDecompress :: forall a. LBS.ByteString -> G.Get a -> Either String a
deflateDecompress :: ByteString -> Get a -> Either String a
deflateDecompress ByteString
bytes Get a
parser = do
  let
    -- N.B. this list is lazily created which allows us to
    -- interleave decompression and binary decoding.
    chunks :: [Chunk]
    chunks :: [Chunk]
chunks =
      (ByteString -> [Chunk] -> [Chunk])
-> (ByteString -> [Chunk])
-> (DecompressError -> [Chunk])
-> (forall s. DecompressStream (ST s))
-> ByteString
-> [Chunk]
forall a.
(ByteString -> a -> a)
-> (ByteString -> a)
-> (DecompressError -> a)
-> (forall s. DecompressStream (ST s))
-> ByteString
-> a
Zlib.foldDecompressStreamWithInput
        (\ByteString
x [Chunk]
xs -> ByteString -> Chunk
ChunkBytes ByteString
x Chunk -> [Chunk] -> [Chunk]
forall a. a -> [a] -> [a]
: [Chunk]
xs)
        (\ByteString
rest -> [ByteString -> Chunk
ChunkRest ByteString
rest])
        (\DecompressError
err -> [DecompressError -> Chunk
ChunkError DecompressError
err])
        (Format -> DecompressParams -> DecompressStream (ST s)
forall s. Format -> DecompressParams -> DecompressStream (ST s)
Zlib.decompressST Format
Zlib.rawFormat DecompressParams
Zlib.defaultDecompressParams)
        ByteString
bytes

    decode :: G.Decoder a -> [Chunk] -> Either String (G.Decoder a)
    decode :: Decoder a -> [Chunk] -> Either String (Decoder a)
decode dec :: Decoder a
dec@G.Fail{} [Chunk]
_ =
      -- short circuit if decoding failed
      Decoder a -> Either String (Decoder a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Decoder a
dec
    decode !Decoder a
dec [] =
      Decoder a -> Either String (Decoder a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Decoder a
dec
    decode !Decoder a
dec (Chunk
inchunk : [Chunk]
inchunks) =
      case Chunk
inchunk of
        ChunkBytes ByteString
x ->
          Decoder a -> [Chunk] -> Either String (Decoder a)
decode (Decoder a -> ByteString -> Decoder a
forall a. Decoder a -> ByteString -> Decoder a
G.pushChunk Decoder a
dec ByteString
x) [Chunk]
inchunks
        ChunkError DecompressError
err ->
          String -> Either String (Decoder a)
forall a b. a -> Either a b
Left (DecompressError -> String
forall a. Show a => a -> String
show DecompressError
err)
        ChunkRest ByteString
rest -> do
          let
            dec' :: Decoder a
dec' = Decoder a -> Decoder a
forall a. Decoder a -> Decoder a
G.pushEndOfInput Decoder a
dec
          Decoder a -> Either String (Decoder a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Decoder a -> Either String (Decoder a))
-> Decoder a -> Either String (Decoder a)
forall a b. (a -> b) -> a -> b
$ Decoder a -> ByteString -> Decoder a
forall a. Decoder a -> ByteString -> Decoder a
G.pushChunks Decoder a
dec' ByteString
rest

  Decoder a
dec <- Decoder a -> [Chunk] -> Either String (Decoder a)
decode (Get a -> Decoder a
forall a. Get a -> Decoder a
G.runGetIncremental Get a
parser) [Chunk]
chunks

  case Decoder a
dec of
    G.Fail ByteString
_ ByteOffset
_ String
err ->
      String -> Either String a
forall a b. a -> Either a b
Left String
err
    G.Partial{} ->
      String -> Either String a
forall a b. a -> Either a b
Left String
"deflate: Not enough input"
    G.Done ByteString
_ ByteOffset
_ a
x ->
      a -> Either String a
forall a b. b -> Either a b
Right a
x