{-# language CPP                   #-}
{-# language DataKinds             #-}
{-# language FlexibleContexts      #-}
{-# language FlexibleInstances     #-}
{-# language MultiParamTypeClasses #-}
{-# language OverloadedStrings     #-}
{-# language PolyKinds             #-}
{-# language ScopedTypeVariables   #-}
{-# language TypeApplications      #-}
{-# language TypeOperators         #-}
{-# language UndecidableInstances  #-}
{-# options_ghc -fno-warn-orphans -fno-warn-simplifiable-class-constraints  #-}
{-|
Description : (Internal) Wrappers for Avro serialization

Intended for internal use.

This module provides the required instances of
the common type classes from 'Mu.GRpc.Bridge'
to make it work with Avro.
-}
module Mu.GRpc.Avro (
  AvroRPC(..)
, ViaFromAvroTypeRef(..)
, ViaToAvroTypeRef(..)
) where

import           Data.Avro
import           Data.Binary.Builder         (fromByteString, putWord32be, singleton)
import           Data.Binary.Get             (Decoder (..), getByteString, getInt8, getWord32be,
                                              runGetIncremental)
import           Data.ByteString.Char8       (ByteString)
import qualified Data.ByteString.Char8       as ByteString
import           Data.ByteString.Lazy        (fromStrict, toStrict)
import           Data.Kind
import           GHC.TypeLits
import           Network.GRPC.HTTP2.Encoding
import           Network.GRPC.HTTP2.Types

#if MIN_VERSION_base(4,11,0)
#else
import           Data.Monoid                 ((<>))
#endif

import           Mu.Adapter.Avro             ()
import           Mu.Rpc
import           Mu.Schema

-- | A proxy type for giving static information about RPCs.
--   Intended for internal use.
data AvroRPC = AvroRPC { AvroRPC -> ByteString
pkg :: ByteString, AvroRPC -> ByteString
srv :: ByteString, AvroRPC -> ByteString
meth :: ByteString }

instance IsRPC AvroRPC where
  path :: AvroRPC -> ByteString
path AvroRPC
rpc = ByteString
"/" ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> AvroRPC -> ByteString
pkg AvroRPC
rpc ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
"." ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> AvroRPC -> ByteString
srv AvroRPC
rpc ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
"/" ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> AvroRPC -> ByteString
meth AvroRPC
rpc
  {-# INLINE path #-}

-- | Wrapper used to tag a type with its corresponding
--   'TypeRef' used for deserialization from Avro.
--   Intended for internal use.
newtype ViaFromAvroTypeRef (ref :: TypeRef snm) t
  = ViaFromAvroTypeRef { ViaFromAvroTypeRef ref t -> t
unViaFromAvroTypeRef :: t }
-- | Wrapper used to tag a type with its corresponding
--   'TypeRef' used for serialization to Avro.
--   Intended for internal use.
newtype ViaToAvroTypeRef (ref :: TypeRef snm) t
  = ViaToAvroTypeRef { ViaToAvroTypeRef ref t -> t
unViaToAvroTypeRef :: t }

instance GRPCInput AvroRPC () where
  encodeInput :: AvroRPC -> Compression -> () -> Builder
encodeInput AvroRPC
_ Compression
c () = Compression -> Builder
encodeEmpty Compression
c
  decodeInput :: AvroRPC -> Compression -> Decoder (Either String ())
decodeInput AvroRPC
_ Compression
_ = Get (Either String ()) -> Decoder (Either String ())
forall a. Get a -> Decoder a
runGetIncremental (Get (Either String ()) -> Decoder (Either String ()))
-> Get (Either String ()) -> Decoder (Either String ())
forall a b. (a -> b) -> a -> b
$ Either String () -> Get (Either String ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either String () -> Get (Either String ()))
-> Either String () -> Get (Either String ())
forall a b. (a -> b) -> a -> b
$ () -> Either String ()
forall a b. b -> Either a b
Right ()

instance GRPCOutput AvroRPC () where
  encodeOutput :: AvroRPC -> Compression -> () -> Builder
encodeOutput AvroRPC
_ Compression
c () = Compression -> Builder
encodeEmpty Compression
c
  decodeOutput :: AvroRPC -> Compression -> Decoder (Either String ())
decodeOutput AvroRPC
_ Compression
_ = Get (Either String ()) -> Decoder (Either String ())
forall a. Get a -> Decoder a
runGetIncremental (Get (Either String ()) -> Decoder (Either String ()))
-> Get (Either String ()) -> Decoder (Either String ())
forall a b. (a -> b) -> a -> b
$ Either String () -> Get (Either String ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either String () -> Get (Either String ()))
-> Either String () -> Get (Either String ())
forall a b. (a -> b) -> a -> b
$ () -> Either String ()
forall a b. b -> Either a b
Right ()

encodeEmpty :: Compression -> Builder
encodeEmpty :: Compression -> Builder
encodeEmpty Compression
compression =
    [Builder] -> Builder
forall a. Monoid a => [a] -> a
mconcat [ Word8 -> Builder
singleton (if Compression -> Bool
_compressionByteSet Compression
compression then Word8
1 else Word8
0)
            , Word32 -> Builder
putWord32be (Int -> Word32
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> Word32) -> Int -> Word32
forall a b. (a -> b) -> a -> b
$ ByteString -> Int
ByteString.length ByteString
bin)
            , ByteString -> Builder
fromByteString ByteString
bin
            ]
  where
    bin :: ByteString
bin = Compression -> ByteString -> ByteString
_compressionFunction Compression
compression ByteString
""

instance forall (sch :: Schema') (sty :: Symbol) (i :: Type).
         ( HasAvroSchema (WithSchema sch sty i)
         , FromAvro (WithSchema sch sty i) )
         => GRPCInput AvroRPC (ViaFromAvroTypeRef ('SchemaRef sch sty) i) where
  encodeInput :: AvroRPC
-> Compression
-> ViaFromAvroTypeRef ('SchemaRef sch sty) i
-> Builder
encodeInput = String
-> AvroRPC
-> Compression
-> ViaFromAvroTypeRef ('SchemaRef sch sty) i
-> Builder
forall a. HasCallStack => String -> a
error String
"eif/you should not call this"
  decodeInput :: AvroRPC
-> Compression
-> Decoder
     (Either String (ViaFromAvroTypeRef ('SchemaRef sch sty) i))
decodeInput AvroRPC
_ Compression
i = (i -> ViaFromAvroTypeRef ('SchemaRef sch sty) i
forall snm (ref :: TypeRef snm) t. t -> ViaFromAvroTypeRef ref t
ViaFromAvroTypeRef (i -> ViaFromAvroTypeRef ('SchemaRef sch sty) i)
-> (WithSchema sch sty i -> i)
-> WithSchema sch sty i
-> ViaFromAvroTypeRef ('SchemaRef sch sty) i
forall b c a. (b -> c) -> (a -> b) -> a -> c
. WithSchema sch sty i -> i
forall tn fn (sch :: Schema tn fn) (sty :: tn) a.
WithSchema sch sty a -> a
unWithSchema @_ @_ @sch @sty @i (WithSchema sch sty i -> ViaFromAvroTypeRef ('SchemaRef sch sty) i)
-> Either String (WithSchema sch sty i)
-> Either String (ViaFromAvroTypeRef ('SchemaRef sch sty) i)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>) (Either String (WithSchema sch sty i)
 -> Either String (ViaFromAvroTypeRef ('SchemaRef sch sty) i))
-> Decoder (Either String (WithSchema sch sty i))
-> Decoder
     (Either String (ViaFromAvroTypeRef ('SchemaRef sch sty) i))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Compression -> Decoder (Either String (WithSchema sch sty i))
forall a.
(HasAvroSchema a, FromAvro a) =>
Compression -> Decoder (Either String a)
decoder Compression
i

instance forall (sch :: Schema') (sty :: Symbol) (i :: Type).
         ( HasAvroSchema (WithSchema sch sty i)
         , FromAvro (WithSchema sch sty i) )
         => GRPCOutput AvroRPC (ViaFromAvroTypeRef ('SchemaRef sch sty) i) where
  encodeOutput :: AvroRPC
-> Compression
-> ViaFromAvroTypeRef ('SchemaRef sch sty) i
-> Builder
encodeOutput = String
-> AvroRPC
-> Compression
-> ViaFromAvroTypeRef ('SchemaRef sch sty) i
-> Builder
forall a. HasCallStack => String -> a
error String
"eof/you should not call this"
  decodeOutput :: AvroRPC
-> Compression
-> Decoder
     (Either String (ViaFromAvroTypeRef ('SchemaRef sch sty) i))
decodeOutput AvroRPC
_ Compression
i = (i -> ViaFromAvroTypeRef ('SchemaRef sch sty) i
forall snm (ref :: TypeRef snm) t. t -> ViaFromAvroTypeRef ref t
ViaFromAvroTypeRef (i -> ViaFromAvroTypeRef ('SchemaRef sch sty) i)
-> (WithSchema sch sty i -> i)
-> WithSchema sch sty i
-> ViaFromAvroTypeRef ('SchemaRef sch sty) i
forall b c a. (b -> c) -> (a -> b) -> a -> c
. WithSchema sch sty i -> i
forall tn fn (sch :: Schema tn fn) (sty :: tn) a.
WithSchema sch sty a -> a
unWithSchema @_ @_ @sch @sty @i (WithSchema sch sty i -> ViaFromAvroTypeRef ('SchemaRef sch sty) i)
-> Either String (WithSchema sch sty i)
-> Either String (ViaFromAvroTypeRef ('SchemaRef sch sty) i)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>) (Either String (WithSchema sch sty i)
 -> Either String (ViaFromAvroTypeRef ('SchemaRef sch sty) i))
-> Decoder (Either String (WithSchema sch sty i))
-> Decoder
     (Either String (ViaFromAvroTypeRef ('SchemaRef sch sty) i))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Compression -> Decoder (Either String (WithSchema sch sty i))
forall a.
(HasAvroSchema a, FromAvro a) =>
Compression -> Decoder (Either String a)
decoder Compression
i

instance forall (sch :: Schema') (sty :: Symbol) (o :: Type).
         ( HasAvroSchema (WithSchema sch sty o)
         , ToAvro (WithSchema sch sty o) )
         => GRPCInput AvroRPC (ViaToAvroTypeRef ('SchemaRef sch sty) o) where
  encodeInput :: AvroRPC
-> Compression
-> ViaToAvroTypeRef ('SchemaRef sch sty) o
-> Builder
encodeInput AvroRPC
_ Compression
compression
    = Compression -> WithSchema sch sty o -> Builder
forall m.
(HasAvroSchema m, ToAvro m) =>
Compression -> m -> Builder
encoder Compression
compression (WithSchema sch sty o -> Builder)
-> (ViaToAvroTypeRef ('SchemaRef sch sty) o
    -> WithSchema sch sty o)
-> ViaToAvroTypeRef ('SchemaRef sch sty) o
-> Builder
forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. a -> WithSchema sch sty a
forall tn fn (sch :: Schema tn fn) (sty :: tn) a.
a -> WithSchema sch sty a
WithSchema @_ @_ @sch @sty (o -> WithSchema sch sty o)
-> (ViaToAvroTypeRef ('SchemaRef sch sty) o -> o)
-> ViaToAvroTypeRef ('SchemaRef sch sty) o
-> WithSchema sch sty o
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ViaToAvroTypeRef ('SchemaRef sch sty) o -> o
forall snm (ref :: TypeRef snm) t. ViaToAvroTypeRef ref t -> t
unViaToAvroTypeRef
  decodeInput :: AvroRPC
-> Compression
-> Decoder
     (Either String (ViaToAvroTypeRef ('SchemaRef sch sty) o))
decodeInput = String
-> AvroRPC
-> Compression
-> Decoder
     (Either String (ViaToAvroTypeRef ('SchemaRef sch sty) o))
forall a. HasCallStack => String -> a
error String
"dit/you should not call this"

instance forall (sch :: Schema') (sty :: Symbol) (o :: Type).
         ( HasAvroSchema (WithSchema sch sty o)
         , ToAvro (WithSchema sch sty o) )
         => GRPCOutput AvroRPC (ViaToAvroTypeRef ('SchemaRef sch sty) o) where
  encodeOutput :: AvroRPC
-> Compression
-> ViaToAvroTypeRef ('SchemaRef sch sty) o
-> Builder
encodeOutput AvroRPC
_ Compression
compression
    = Compression -> WithSchema sch sty o -> Builder
forall m.
(HasAvroSchema m, ToAvro m) =>
Compression -> m -> Builder
encoder Compression
compression (WithSchema sch sty o -> Builder)
-> (ViaToAvroTypeRef ('SchemaRef sch sty) o
    -> WithSchema sch sty o)
-> ViaToAvroTypeRef ('SchemaRef sch sty) o
-> Builder
forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. a -> WithSchema sch sty a
forall tn fn (sch :: Schema tn fn) (sty :: tn) a.
a -> WithSchema sch sty a
WithSchema @_ @_ @sch @sty (o -> WithSchema sch sty o)
-> (ViaToAvroTypeRef ('SchemaRef sch sty) o -> o)
-> ViaToAvroTypeRef ('SchemaRef sch sty) o
-> WithSchema sch sty o
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ViaToAvroTypeRef ('SchemaRef sch sty) o -> o
forall snm (ref :: TypeRef snm) t. ViaToAvroTypeRef ref t -> t
unViaToAvroTypeRef
  decodeOutput :: AvroRPC
-> Compression
-> Decoder
     (Either String (ViaToAvroTypeRef ('SchemaRef sch sty) o))
decodeOutput = String
-> AvroRPC
-> Compression
-> Decoder
     (Either String (ViaToAvroTypeRef ('SchemaRef sch sty) o))
forall a. HasCallStack => String -> a
error String
"dot/you should not call this"

encoder :: (HasAvroSchema m, ToAvro m)
        => Compression -> m -> Builder
encoder :: Compression -> m -> Builder
encoder Compression
compression m
plain =
    [Builder] -> Builder
forall a. Monoid a => [a] -> a
mconcat [ Word8 -> Builder
singleton (if Compression -> Bool
_compressionByteSet Compression
compression then Word8
1 else Word8
0)
            , Word32 -> Builder
putWord32be (Int -> Word32
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> Word32) -> Int -> Word32
forall a b. (a -> b) -> a -> b
$ ByteString -> Int
ByteString.length ByteString
bin)
            , ByteString -> Builder
fromByteString ByteString
bin
            ]
  where
    bin :: ByteString
bin = Compression -> ByteString -> ByteString
_compressionFunction Compression
compression (ByteString -> ByteString) -> ByteString -> ByteString
forall a b. (a -> b) -> a -> b
$ ByteString -> ByteString
toStrict (ByteString -> ByteString) -> ByteString -> ByteString
forall a b. (a -> b) -> a -> b
$ m -> ByteString
forall a. (HasAvroSchema a, ToAvro a) => a -> ByteString
encodeValue m
plain

decoder :: (HasAvroSchema a, FromAvro a)
        => Compression -> Decoder (Either String a)
decoder :: Compression -> Decoder (Either String a)
decoder Compression
compression = Get (Either String a) -> Decoder (Either String a)
forall a. Get a -> Decoder a
runGetIncremental (Get (Either String a) -> Decoder (Either String a))
-> Get (Either String a) -> Decoder (Either String a)
forall a b. (a -> b) -> a -> b
$ do
    Int8
isCompressed <- Get Int8
getInt8      -- 1byte
    let decompress :: ByteString -> Get ByteString
decompress = if Int8
isCompressed Int8 -> Int8 -> Bool
forall a. Eq a => a -> a -> Bool
== Int8
0 then ByteString -> Get ByteString
forall (f :: * -> *) a. Applicative f => a -> f a
pure else Compression -> ByteString -> Get ByteString
_decompressionFunction Compression
compression
    Word32
n <- Get Word32
getWord32be             -- 4bytes
    ByteString -> Either String a
forall a.
(HasAvroSchema a, FromAvro a) =>
ByteString -> Either String a
decodeValue (ByteString -> Either String a)
-> (ByteString -> ByteString) -> ByteString -> Either String a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> ByteString
fromStrict (ByteString -> Either String a)
-> Get ByteString -> Get (Either String a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (ByteString -> Get ByteString
decompress (ByteString -> Get ByteString) -> Get ByteString -> Get ByteString
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Int -> Get ByteString
getByteString (Word32 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word32
n))

-- Based on https://hackage.haskell.org/package/binary/docs/Data-Binary-Get-Internal.html
instance Functor Decoder where
  fmap :: (a -> b) -> Decoder a -> Decoder b
fmap a -> b
f (Done ByteString
b ByteOffset
s a
a)   = ByteString -> ByteOffset -> b -> Decoder b
forall a. ByteString -> ByteOffset -> a -> Decoder a
Done ByteString
b ByteOffset
s (a -> b
f a
a)
  fmap a -> b
f (Partial Maybe ByteString -> Decoder a
k)    = (Maybe ByteString -> Decoder b) -> Decoder b
forall a. (Maybe ByteString -> Decoder a) -> Decoder a
Partial ((a -> b) -> Decoder a -> Decoder b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap a -> b
f (Decoder a -> Decoder b)
-> (Maybe ByteString -> Decoder a) -> Maybe ByteString -> Decoder b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Maybe ByteString -> Decoder a
k)
  fmap a -> b
_ (Fail ByteString
b ByteOffset
s String
msg) = ByteString -> ByteOffset -> String -> Decoder b
forall a. ByteString -> ByteOffset -> String -> Decoder a
Fail ByteString
b ByteOffset
s String
msg