{-# LANGUAGE FlexibleInstances, LambdaCase, OverloadedStrings #-}

{- A decoder that understands the Pulsar protocol, as specified at: http://pulsar.apache.org/docs/en/develop-binary-protocol -}
module Pulsar.Protocol.Decoder
  ( decodeBaseCommand
  , dropPayloadGarbage
  )
where

import           Control.Monad                  ( guard )
import qualified Data.Binary                   as B
import qualified Data.Binary.Get               as B
import qualified Data.Binary.Put               as B
import qualified Data.ByteString.Lazy.Char8    as CL
import           Data.Bifunctor                 ( bimap )
import           Data.Int                       ( Int32 )
import qualified Data.ProtoLens.Encoding       as PL
import           Pulsar.Protocol.CheckSum
import           Pulsar.Protocol.Frame

data ValidateCheckSum = Yes | No deriving Int -> ValidateCheckSum -> ShowS
[ValidateCheckSum] -> ShowS
ValidateCheckSum -> String
(Int -> ValidateCheckSum -> ShowS)
-> (ValidateCheckSum -> String)
-> ([ValidateCheckSum] -> ShowS)
-> Show ValidateCheckSum
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [ValidateCheckSum] -> ShowS
$cshowList :: [ValidateCheckSum] -> ShowS
show :: ValidateCheckSum -> String
$cshow :: ValidateCheckSum -> String
showsPrec :: Int -> ValidateCheckSum -> ShowS
$cshowsPrec :: Int -> ValidateCheckSum -> ShowS
Show

{-
 - These 5 bytes are part of a total of 8 bytes sent as the payload's prefix from the Java client.
 - Apparently that's how Google's FlatBuffers serialize data: https://google.github.io/flatbuffers/
 -
 - Source: https://github.com/apache/pulsar/blob/master/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/fbs/Message.java#L22
 -
 - More info on the Ascii spec: https://www.december.com/html/spec/ascii.html. Maybe this could be helpful: https://hackage.haskell.org/package/flatbuffers
 -}
dropPayloadGarbage :: CL.ByteString -> CL.ByteString
dropPayloadGarbage :: ByteString -> ByteString
dropPayloadGarbage bs :: ByteString
bs =
  ByteString
-> (ByteString -> ByteString) -> Maybe ByteString -> ByteString
forall b a. b -> (a -> b) -> Maybe a -> b
maybe ByteString
bs (Int64 -> ByteString -> ByteString
CL.drop 3) (ByteString -> ByteString -> Maybe ByteString
CL.stripPrefix "\NUL\NUL\NUL\EOT\CAN" ByteString
bs)

{- | Parse total size, command size and message. If done, return simple frame. Otherwise, try to parse a payload frame. -}
parseFrame :: B.Get Frame
parseFrame :: Get Frame
parseFrame = do
  Int32
ts <- Get Int32
B.getInt32be
  Int32
cs <- Get Int32
B.getInt32be
  ByteString
ms <- Int64 -> Get ByteString
B.getLazyByteString (Int32 -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int32
cs)
  let simpleCmd :: SimpleCmd
simpleCmd  = Int32 -> Int32 -> ByteString -> SimpleCmd
SimpleCommand Int32
ts Int32
cs ByteString
ms
      payloadRes :: ValidateCheckSum -> Get Frame
payloadRes = Int32 -> Int32 -> SimpleCmd -> ValidateCheckSum -> Get Frame
parsePayload Int32
ts Int32
cs SimpleCmd
simpleCmd
  Get Bool
B.isEmpty Get Bool -> (Bool -> Get Frame) -> Get Frame
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
    True  -> Frame -> Get Frame
forall (m :: * -> *) a. Monad m => a -> m a
return (Frame -> Get Frame) -> Frame -> Get Frame
forall a b. (a -> b) -> a -> b
$! SimpleCmd -> Frame
SimpleFrame SimpleCmd
simpleCmd
    False -> (ValidateCheckSum -> Get Frame) -> Get Frame
validateMagicNumber ValidateCheckSum -> Get Frame
payloadRes

{- | The 2-bytes "magic number" is optional. If present, it indicates that a 4-bytes checksum follows. -}
validateMagicNumber :: (ValidateCheckSum -> B.Get Frame) -> B.Get Frame
validateMagicNumber :: (ValidateCheckSum -> Get Frame) -> Get Frame
validateMagicNumber payload :: ValidateCheckSum -> Get Frame
payload = Get (Maybe ()) -> Get (Maybe ())
forall a. Get (Maybe a) -> Get (Maybe a)
B.lookAheadM Get (Maybe ())
peekMagicNumber Get (Maybe ()) -> (Maybe () -> Get Frame) -> Get Frame
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
  Just _  -> ValidateCheckSum -> Get Frame
payload ValidateCheckSum
Yes
  Nothing -> ValidateCheckSum -> Get Frame
payload ValidateCheckSum
No
 where
  peekMagicNumber :: B.Get (Maybe ())
  peekMagicNumber :: Get (Maybe ())
peekMagicNumber = Bool -> Maybe ()
forall (f :: * -> *). Alternative f => Bool -> f ()
guard (Bool -> Maybe ()) -> (Word16 -> Bool) -> Word16 -> Maybe ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Word16 -> Word16 -> Bool
forall a. Eq a => a -> a -> Bool
== Word16
frameMagicNumber) (Word16 -> Maybe ()) -> Get Word16 -> Get (Maybe ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Get Word16
B.getWord16be

{- | If a checksum is given, validate it. Otherwise, return simple frame. -}
validateCheckSum :: Frame -> B.Get Frame
validateCheckSum :: Frame -> Get Frame
validateCheckSum (PayloadFrame sc :: SimpleCmd
sc (PayloadCommand cs :: Maybe CheckSum
cs@(Just csm :: CheckSum
csm) ms :: Int32
ms md :: ByteString
md pl :: ByteString
pl)) =
  case ByteString -> CheckSum -> CheckSumValidation
runCheckSum (Put -> ByteString
B.runPut (Int32 -> Put
B.putInt32be Int32
ms) ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
md ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
pl) CheckSum
csm of
    Valid -> Frame -> Get Frame
forall (m :: * -> *) a. Monad m => a -> m a
return
      (Frame -> Get Frame) -> Frame -> Get Frame
forall a b. (a -> b) -> a -> b
$! SimpleCmd -> PayloadCmd -> Frame
PayloadFrame SimpleCmd
sc (Maybe CheckSum -> Int32 -> ByteString -> ByteString -> PayloadCmd
PayloadCommand Maybe CheckSum
cs Int32
ms ByteString
md (ByteString -> ByteString
dropPayloadGarbage ByteString
pl))
    Invalid -> String -> Get Frame
forall (m :: * -> *) a. MonadFail m => String -> m a
fail "Invalid checksum"
validateCheckSum x :: Frame
x = Frame -> Get Frame
forall (m :: * -> *) a. Monad m => a -> m a
return (Frame -> Get Frame) -> Frame -> Get Frame
forall a b. (a -> b) -> a -> b
$! Frame
x

{- | Take in a simple command and try to parse a payload command. -}
parsePayload :: Int32 -> Int32 -> SimpleCmd -> ValidateCheckSum -> B.Get Frame
parsePayload :: Int32 -> Int32 -> SimpleCmd -> ValidateCheckSum -> Get Frame
parsePayload ts :: Int32
ts cs :: Int32
cs simpleCmd :: SimpleCmd
simpleCmd vcs :: ValidateCheckSum
vcs = case ValidateCheckSum
vcs of
  Yes -> Maybe CheckSum -> Get Frame
parsePayload' (Maybe CheckSum -> Get Frame)
-> (Word32 -> Maybe CheckSum) -> Word32 -> Get Frame
forall b c a. (b -> c) -> (a -> b) -> a -> c
. CheckSum -> Maybe CheckSum
forall a. a -> Maybe a
Just (CheckSum -> Maybe CheckSum)
-> (Word32 -> CheckSum) -> Word32 -> Maybe CheckSum
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Word32 -> CheckSum
CheckSum (Word32 -> Get Frame) -> Get Word32 -> Get Frame
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Get Word32
B.getWord32be
  No  -> Maybe CheckSum -> Get Frame
parsePayload' Maybe CheckSum
forall a. Maybe a
Nothing
 where
  parsePayload' :: Maybe CheckSum -> Get Frame
parsePayload' cm :: Maybe CheckSum
cm = do
    Int32
ms <- Get Int32
B.getInt32be
    ByteString
md <- Int64 -> Get ByteString
B.getLazyByteString (Int64 -> Get ByteString)
-> (Int32 -> Int64) -> Int32 -> Get ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int32 -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int32 -> Get ByteString) -> Int32 -> Get ByteString
forall a b. (a -> b) -> a -> b
$ Int32
ms
    ByteString
pl <- Int64 -> Get ByteString
B.getLazyByteString (Int64 -> Get ByteString)
-> (Int32 -> Int64) -> Int32 -> Get ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int32 -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int32 -> Get ByteString) -> Int32 -> Get ByteString
forall a b. (a -> b) -> a -> b
$ Int32
ts Int32 -> Int32 -> Int32
forall a. Num a => a -> a -> a
- (Maybe CheckSum -> Int32
forall p a. Num p => Maybe a -> p
remaining Maybe CheckSum
cm Int32 -> Int32 -> Int32
forall a. Num a => a -> a -> a
+ Int32
cs Int32 -> Int32 -> Int32
forall a. Num a => a -> a -> a
+ Int32
ms)
    let frame :: Frame
frame = SimpleCmd -> PayloadCmd -> Frame
PayloadFrame SimpleCmd
simpleCmd (Maybe CheckSum -> Int32 -> ByteString -> ByteString -> PayloadCmd
PayloadCommand Maybe CheckSum
cm Int32
ms ByteString
md ByteString
pl)
    Frame -> Get Frame
validateCheckSum Frame
frame
  remaining :: Maybe a -> p
remaining (Just _) = 14 -- 4 (command size) + 2 (magic number) + 4 (checksum) + 4 (metadata size)
  remaining Nothing  = 8  -- no magic number and checksum

decodeFrame :: CL.ByteString -> Either String Frame
decodeFrame :: ByteString -> Either String Frame
decodeFrame =
  ((ByteString, Int64, String) -> String)
-> ((ByteString, Int64, Frame) -> Frame)
-> Either (ByteString, Int64, String) (ByteString, Int64, Frame)
-> Either String Frame
forall (p :: * -> * -> *) a b c d.
Bifunctor p =>
(a -> b) -> (c -> d) -> p a c -> p b d
bimap (\(_, _, e :: String
e) -> String
e) (\(_, _, f :: Frame
f) -> Frame
f) (Either (ByteString, Int64, String) (ByteString, Int64, Frame)
 -> Either String Frame)
-> (ByteString
    -> Either (ByteString, Int64, String) (ByteString, Int64, Frame))
-> ByteString
-> Either String Frame
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Get Frame
-> ByteString
-> Either (ByteString, Int64, String) (ByteString, Int64, Frame)
forall a.
Get a
-> ByteString
-> Either (ByteString, Int64, String) (ByteString, Int64, a)
B.runGetOrFail Get Frame
parseFrame

{- | Decode either a 'SimpleFrame' or a 'PayloadFrame'. -}
decodeBaseCommand :: CL.ByteString -> Either String Response
decodeBaseCommand :: ByteString -> Either String Response
decodeBaseCommand bytes :: ByteString
bytes = ByteString -> Either String Frame
decodeFrame ByteString
bytes Either String Frame
-> (Frame -> Either String Response) -> Either String Response
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
  SimpleFrame s :: SimpleCmd
s ->
    BaseCommand -> Response
SimpleResponse (BaseCommand -> Response)
-> Either String BaseCommand -> Either String Response
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ByteString -> Either String BaseCommand
forall msg. Message msg => ByteString -> Either String msg
PL.decodeMessage (ByteString -> ByteString
CL.toStrict (ByteString -> ByteString) -> ByteString -> ByteString
forall a b. (a -> b) -> a -> b
$ SimpleCmd -> ByteString
frameMessage SimpleCmd
s)
  PayloadFrame s :: SimpleCmd
s (PayloadCommand _ _ md :: ByteString
md pl :: ByteString
pl) -> do
    BaseCommand
cmd  <- ByteString -> Either String BaseCommand
forall msg. Message msg => ByteString -> Either String msg
PL.decodeMessage (ByteString -> Either String BaseCommand)
-> (ByteString -> ByteString)
-> ByteString
-> Either String BaseCommand
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> ByteString
CL.toStrict (ByteString -> Either String BaseCommand)
-> ByteString -> Either String BaseCommand
forall a b. (a -> b) -> a -> b
$ SimpleCmd -> ByteString
frameMessage SimpleCmd
s
    MessageMetadata
meta <- ByteString -> Either String MessageMetadata
forall msg. Message msg => ByteString -> Either String msg
PL.decodeMessage (ByteString -> Either String MessageMetadata)
-> (ByteString -> ByteString)
-> ByteString
-> Either String MessageMetadata
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> ByteString
CL.toStrict (ByteString -> Either String MessageMetadata)
-> ByteString -> Either String MessageMetadata
forall a b. (a -> b) -> a -> b
$ ByteString
md
    Response -> Either String Response
forall (m :: * -> *) a. Monad m => a -> m a
return (Response -> Either String Response)
-> Response -> Either String Response
forall a b. (a -> b) -> a -> b
$ BaseCommand -> MessageMetadata -> Maybe Payload -> Response
PayloadResponse BaseCommand
cmd MessageMetadata
meta (ByteString -> Maybe Payload
payload ByteString
pl)
   where
    payload :: ByteString -> Maybe Payload
payload p :: ByteString
p | ByteString -> Bool
CL.null ByteString
p = Maybe Payload
forall a. Maybe a
Nothing
              | Bool
otherwise = Payload -> Maybe Payload
forall a. a -> Maybe a
Just (Payload -> Maybe Payload) -> Payload -> Maybe Payload
forall a b. (a -> b) -> a -> b
$ ByteString -> Payload
Payload ByteString
p