{-# LANGUAGE OverloadedStrings #-}
{-# OPTIONS_GHC -O2 -fdicts-strict -fspec-constr-recursive=16 -fmax-worker-args=16 #-} -- for streamly
module Streamly.External.Cereal
  (
    encodeStreamly
  , decodeStreamly
  , encodeStreamlyArray
  , decodeStreamlyArray  
  , encodeStream
  , decodeStream
  , encodeStreamArray
  , decodeStreamArray
  , putStreamOf
  , getStreamOf
  )
where

import qualified Streamly as Streamly
import qualified Streamly.Prelude as Streamly
import qualified Streamly.Internal.Prelude as Streamly (splitParse)
import qualified Streamly.Internal.Memory.Array as Streamly.Array
import qualified Streamly.Internal.Memory.ArrayStream as Streamly.Array
import qualified Streamly.Internal.Data.Parser.Types as Streamly.Parser
import qualified Streamly.External.ByteString as Streamly.ByteString

import qualified Control.Monad.Catch as Exceptions (MonadThrow(..), MonadCatch(..))
import Control.Monad.IO.Class (MonadIO)

import qualified Data.ByteString as BS
import qualified Data.ByteString.Lazy as BL
import           Data.Functor.Identity (Identity(..))
import qualified Data.Serialize as Cereal
import qualified Data.Text as Text
import qualified Data.Word as Word

-- These go through []. I'm hoping the list fuses away.
-- One issue is that Cereal wants a length first and that requires spine strictness
-- on the encoding side.
-- | Convert something which can encode an @a@ to something which can encode a (non-effectful) stream of @a@
putStreamOf :: Cereal.Putter a -> Cereal.Putter (Streamly.SerialT Identity a)
putStreamOf :: Putter a -> Putter (SerialT Identity a)
putStreamOf pa :: Putter a
pa = Identity Put -> Put
forall a. Identity a -> a
runIdentity (Identity Put -> Put)
-> (SerialT Identity a -> Identity Put)
-> Putter (SerialT Identity a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ([a] -> Put) -> Identity [a] -> Identity Put
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Putter a -> [a] -> Put
forall a. Putter a -> Putter [a]
Cereal.putListOf Putter a
pa) (Identity [a] -> Identity Put)
-> (SerialT Identity a -> Identity [a])
-> SerialT Identity a
-> Identity Put
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SerialT Identity a -> Identity [a]
forall (m :: * -> *) a. Monad m => SerialT m a -> m [a]
Streamly.toList 
{-# INLINEABLE putStreamOf #-}

-- | Convert something which can decode an @a@ to something which can decode a (possibly-effectful) stream of @a@
getStreamOf :: Monad m => Cereal.Get a -> Cereal.Get (Streamly.SerialT m a)
getStreamOf :: Get a -> Get (SerialT m a)
getStreamOf ga :: Get a
ga = ([a] -> SerialT m a) -> Get [a] -> Get (SerialT m a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap [a] -> SerialT m a
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, IsStream t) =>
[a] -> t m a
Streamly.fromList (Get [a] -> Get (SerialT m a)) -> Get [a] -> Get (SerialT m a)
forall a b. (a -> b) -> a -> b
$ Get a -> Get [a]
forall a. Get a -> Get [a]
Cereal.getListOf Get a
ga
{-# INLINEABLE getStreamOf #-}

-- | Given @Serialize a@, encode to a Stream of bytes 
encodeStreamly :: (Cereal.Serialize a, Monad m) => a -> Streamly.SerialT m Word.Word8
encodeStreamly :: a -> SerialT m Word8
encodeStreamly = Put -> SerialT m Word8
forall (m :: * -> *). Monad m => Put -> SerialT m Word8
encodePut (Put -> SerialT m Word8) -> (a -> Put) -> a -> SerialT m Word8
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Put
forall t. Serialize t => Putter t
Cereal.put
{-# INLINEABLE encodeStreamly #-}

-- | Given @Serialize a@, encode to n array of bytes
encodeStreamlyArray :: Cereal.Serialize a => a -> Streamly.Array.Array Word.Word8
encodeStreamlyArray :: a -> Array Word8
encodeStreamlyArray = Put -> Array Word8
encodePutArray (Put -> Array Word8) -> (a -> Put) -> a -> Array Word8
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Put
forall t. Serialize t => Putter t
Cereal.put
{-# INLINEABLE encodeStreamlyArray #-}

-- | Given a Cereal "Putter", encode to a byte stream
encodePut :: Monad m => Cereal.Put -> Streamly.SerialT m Word.Word8
encodePut :: Put -> SerialT m Word8
encodePut = (ByteString -> Maybe (Word8, ByteString))
-> ByteString -> SerialT m Word8
forall (m :: * -> *) (t :: (* -> *) -> * -> *) b a.
(Monad m, IsStream t) =>
(b -> Maybe (a, b)) -> b -> t m a
Streamly.unfoldr ByteString -> Maybe (Word8, ByteString)
BL.uncons (ByteString -> SerialT m Word8)
-> (Put -> ByteString) -> Put -> SerialT m Word8
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Put -> ByteString
Cereal.runPutLazy
{-# INLINEABLE encodePut #-}

-- | Given a Cereal "Putter", encode to a byte array
encodePutArray :: Cereal.Put -> Streamly.Array.Array Word.Word8
encodePutArray :: Put -> Array Word8
encodePutArray = ByteString -> Array Word8
Streamly.ByteString.toArray  (ByteString -> Array Word8)
-> (Put -> ByteString) -> Put -> Array Word8
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Put -> ByteString
Cereal.runPut
{-# INLINEABLE encodePutArray #-}

-- | Give an instance of `@Serialize` for @a@, attempt to decode a byte-stream into an a.
decodeStreamly :: (Cereal.Serialize a, Monad m) => Streamly.SerialT m Word.Word8 -> m (Either Text.Text a)
decodeStreamly :: SerialT m Word8 -> m (Either Text a)
decodeStreamly = Get a -> SerialT m Word8 -> m (Either Text a)
forall (m :: * -> *) a.
Monad m =>
Get a -> SerialT m Word8 -> m (Either Text a)
decodeGet Get a
forall t. Serialize t => Get t
Cereal.get
{-# INLINEABLE decodeStreamly #-}

-- | Given a Cereal @Getter@, decode one @a@ from an effectful stream on bytes. 
decodeGet :: Monad m => Cereal.Get a -> Streamly.SerialT m Word.Word8 -> m (Either Text.Text a)
decodeGet :: Get a -> SerialT m Word8 -> m (Either Text a)
decodeGet g :: Get a
g s :: SerialT m Word8
s = SerialT m Word8 -> (ByteString -> Result a) -> m (Either Text a)
forall (m :: * -> *) b.
Monad m =>
SerialT m Word8 -> (ByteString -> Result b) -> m (Either Text b)
go SerialT m Word8
s ((ByteString -> Result a) -> m (Either Text a))
-> (ByteString -> Result a) -> m (Either Text a)
forall a b. (a -> b) -> a -> b
$ Get a -> ByteString -> Result a
forall a. Get a -> ByteString -> Result a
Cereal.runGetPartial Get a
g where
  go :: SerialT m Word8 -> (ByteString -> Result b) -> m (Either Text b)
go x :: SerialT m Word8
x f :: ByteString -> Result b
f = do
    Maybe (Word8, SerialT m Word8)
y <- SerialT m Word8 -> m (Maybe (Word8, SerialT m Word8))
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
SerialT m a -> m (Maybe (a, t m a))
Streamly.uncons SerialT m Word8
x 
    case Maybe (Word8, SerialT m Word8)
y of
      Nothing -> Either Text b -> m (Either Text b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Either Text b -> m (Either Text b))
-> Either Text b -> m (Either Text b)
forall a b. (a -> b) -> a -> b
$ Text -> Either Text b
forall a b. a -> Either a b
Left "Premature end of stream reached."
      Just (b :: Word8
b, tx :: SerialT m Word8
tx) -> case ByteString -> Result b
f (ByteString -> Result b) -> ByteString -> Result b
forall a b. (a -> b) -> a -> b
$ Word8 -> ByteString
BS.singleton Word8
b of
        Cereal.Fail e :: String
e _ -> Either Text b -> m (Either Text b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Either Text b -> m (Either Text b))
-> Either Text b -> m (Either Text b)
forall a b. (a -> b) -> a -> b
$ Text -> Either Text b
forall a b. a -> Either a b
Left (Text -> Either Text b) -> Text -> Either Text b
forall a b. (a -> b) -> a -> b
$ "Cereal Error: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> (String -> Text
Text.pack String
e)
        Cereal.Done a :: b
a _ -> Either Text b -> m (Either Text b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Either Text b -> m (Either Text b))
-> Either Text b -> m (Either Text b)
forall a b. (a -> b) -> a -> b
$ b -> Either Text b
forall a b. b -> Either a b
Right b
a
        Cereal.Partial f' :: ByteString -> Result b
f' -> SerialT m Word8 -> (ByteString -> Result b) -> m (Either Text b)
go SerialT m Word8
tx ByteString -> Result b
f'
{-# INLINEABLE decodeGet #-}

-- | Given @Serialize a@, attempt to decode a Streamly array of bytes into an @a@
decodeStreamlyArray :: (Cereal.Serialize a) => Streamly.Array.Array Word.Word8 -> Either Text.Text a
decodeStreamlyArray :: Array Word8 -> Either Text a
decodeStreamlyArray = (String -> Either Text a)
-> (a -> Either Text a) -> Either String a -> Either Text a
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either (Text -> Either Text a
forall a b. a -> Either a b
Left (Text -> Either Text a)
-> (String -> Text) -> String -> Either Text a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> Text
Text.pack) a -> Either Text a
forall a b. b -> Either a b
Right (Either String a -> Either Text a)
-> (Array Word8 -> Either String a) -> Array Word8 -> Either Text a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> Either String a
forall a. Serialize a => ByteString -> Either String a
Cereal.decode (ByteString -> Either String a)
-> (Array Word8 -> ByteString) -> Array Word8 -> Either String a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Array Word8 -> ByteString
Streamly.ByteString.fromArray 
{-# INLINEABLE decodeStreamlyArray #-}

-- | Given @Serialize a@, attempt to encode a stream of @a@s as a Streamly stream of bytes.
encodeStream :: (Monad m, Cereal.Serialize a) => Streamly.SerialT m a -> Streamly.SerialT m Word.Word8
encodeStream :: SerialT m a -> SerialT m Word8
encodeStream = (a -> SerialT m Word8) -> SerialT m a -> SerialT m Word8
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> t m b) -> t m a -> t m b
Streamly.concatMap a -> SerialT m Word8
forall a (m :: * -> *).
(Serialize a, Monad m) =>
a -> SerialT m Word8
encodeStreamly
{-# INLINEABLE encodeStream #-}

-- | Given @Serialize a@, attempt to encode a sterm of @a@s as a Streamly array of bytes.
encodeStreamArray :: (Monad m, MonadIO m, Cereal.Serialize a) => Streamly.SerialT m a -> m (Streamly.Array.Array Word.Word8)
encodeStreamArray :: SerialT m a -> m (Array Word8)
encodeStreamArray = SerialT m (Array Word8) -> m (Array Word8)
forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
SerialT m (Array a) -> m (Array a)
Streamly.Array.toArray (SerialT m (Array Word8) -> m (Array Word8))
-> (SerialT m a -> SerialT m (Array Word8))
-> SerialT m a
-> m (Array Word8)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (a -> Array Word8) -> SerialT m a -> SerialT m (Array Word8)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> b) -> t m a -> t m b
Streamly.map a -> Array Word8
forall a. Serialize a => a -> Array Word8
encodeStreamlyArray 
{-# INLINEABLE encodeStreamArray #-}

-- NB this will keep decoding as until failure.  But it can't know why it failed so it
-- assumes failure indicates end of the input stream.
-- Parser state is (Maybe a, ByteStream -> Cereal.Result a)
-- | Streamly Parser for decoding bytes into @a@s (given @Serialize a@) 
streamlyDecodeParser :: (Monad m, Exceptions.MonadThrow m, Cereal.Serialize a) => Streamly.Parser.Parser m Word.Word8 a
streamlyDecodeParser :: Parser m Word8 a
streamlyDecodeParser = ((Maybe a, ByteString -> Result a)
 -> Word8 -> m (Step (Maybe a, ByteString -> Result a) a))
-> m (Maybe a, ByteString -> Result a)
-> ((Maybe a, ByteString -> Result a) -> m a)
-> Parser m Word8 a
forall (m :: * -> *) a b s.
(s -> a -> m (Step s b)) -> m s -> (s -> m b) -> Parser m a b
Streamly.Parser.Parser (Maybe a, ByteString -> Result a)
-> Word8 -> m (Step (Maybe a, ByteString -> Result a) a)
forall (m :: * -> *) a a b.
(Monad m, Serialize a) =>
(a, ByteString -> Result a)
-> Word8 -> m (Step (Maybe a, ByteString -> Result a) b)
step ((Maybe a, ByteString -> Result a)
-> m (Maybe a, ByteString -> Result a)
forall (m :: * -> *) a. Monad m => a -> m a
return ((Maybe a, ByteString -> Result a)
 -> m (Maybe a, ByteString -> Result a))
-> (Maybe a, ByteString -> Result a)
-> m (Maybe a, ByteString -> Result a)
forall a b. (a -> b) -> a -> b
$ (Maybe a
forall a. Maybe a
Nothing, Get a -> ByteString -> Result a
forall a. Get a -> ByteString -> Result a
Cereal.runGetPartial Get a
forall t. Serialize t => Get t
Cereal.get)) (Maybe a, ByteString -> Result a) -> m a
forall (m :: * -> *) a b. MonadThrow m => (Maybe a, b) -> m a
extract where
  step :: (a, ByteString -> Result a)
-> Word8 -> m (Step (Maybe a, ByteString -> Result a) b)
step (_, f :: ByteString -> Result a
f) w :: Word8
w = case ByteString -> Result a
f (ByteString -> Result a) -> ByteString -> Result a
forall a b. (a -> b) -> a -> b
$ Word8 -> ByteString
BS.singleton Word8
w of
    Cereal.Fail e :: String
e _ -> Step (Maybe a, ByteString -> Result a) b
-> m (Step (Maybe a, ByteString -> Result a) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Maybe a, ByteString -> Result a) b
 -> m (Step (Maybe a, ByteString -> Result a) b))
-> Step (Maybe a, ByteString -> Result a) b
-> m (Step (Maybe a, ByteString -> Result a) b)
forall a b. (a -> b) -> a -> b
$ String -> Step (Maybe a, ByteString -> Result a) b
forall s b. String -> Step s b
Streamly.Parser.Error String
e
    Cereal.Done a :: a
a _ -> Step (Maybe a, ByteString -> Result a) b
-> m (Step (Maybe a, ByteString -> Result a) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Maybe a, ByteString -> Result a) b
 -> m (Step (Maybe a, ByteString -> Result a) b))
-> Step (Maybe a, ByteString -> Result a) b
-> m (Step (Maybe a, ByteString -> Result a) b)
forall a b. (a -> b) -> a -> b
$ Int
-> (Maybe a, ByteString -> Result a)
-> Step (Maybe a, ByteString -> Result a) b
forall s b. Int -> s -> Step s b
Streamly.Parser.Yield 0 (a -> Maybe a
forall a. a -> Maybe a
Just a
a, Get a -> ByteString -> Result a
forall a. Get a -> ByteString -> Result a
Cereal.runGetPartial Get a
forall t. Serialize t => Get t
Cereal.get)
    Cereal.Partial f' :: ByteString -> Result a
f' -> Step (Maybe a, ByteString -> Result a) b
-> m (Step (Maybe a, ByteString -> Result a) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Maybe a, ByteString -> Result a) b
 -> m (Step (Maybe a, ByteString -> Result a) b))
-> Step (Maybe a, ByteString -> Result a) b
-> m (Step (Maybe a, ByteString -> Result a) b)
forall a b. (a -> b) -> a -> b
$ Int
-> (Maybe a, ByteString -> Result a)
-> Step (Maybe a, ByteString -> Result a) b
forall s b. Int -> s -> Step s b
Streamly.Parser.Skip 0 (Maybe a
forall a. Maybe a
Nothing, ByteString -> Result a
f')
  extract :: (Maybe a, b) -> m a
extract (ma :: Maybe a
ma, _)  = case Maybe a
ma of
    Just a :: a
a -> a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return a
a
    Nothing -> ParseError -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
Exceptions.throwM (ParseError -> m a) -> ParseError -> m a
forall a b. (a -> b) -> a -> b
$ String -> ParseError
Streamly.Parser.ParseError "Parsing error in streamlyDecodeParser (\"extract\" called on incomplete parse.)."   
{-# INLINEABLE streamlyDecodeParser #-}

-- | Given @Serialize a@, decode a Stream of bytes into a stream of @a@s
decodeStream :: (Monad m, Exceptions.MonadCatch m, Cereal.Serialize a)
             => Streamly.SerialT m Word.Word8 -> Streamly.SerialT m a
decodeStream :: SerialT m Word8 -> SerialT m a
decodeStream = Parser m Word8 a -> SerialT m Word8 -> SerialT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadThrow m) =>
Parser m a b -> t m a -> t m b
Streamly.splitParse Parser m Word8 a
forall (m :: * -> *) a.
(Monad m, MonadThrow m, Serialize a) =>
Parser m Word8 a
streamlyDecodeParser
{-# INLINEABLE decodeStream #-}

-- we convert the array to a stream so we can decode lazily (?)
-- | Given @Serialize a@, decode an array of bytes into a stream of @a@s
decodeStreamArray :: (Monad m, Exceptions.MonadCatch m, Cereal.Serialize a)
                  => Streamly.Array.Array Word.Word8 -> Streamly.SerialT m a
decodeStreamArray :: Array Word8 -> SerialT m a
decodeStreamArray = SerialT m Word8 -> SerialT m a
forall (m :: * -> *) a.
(Monad m, MonadCatch m, Serialize a) =>
SerialT m Word8 -> SerialT m a
decodeStream (SerialT m Word8 -> SerialT m a)
-> (Array Word8 -> SerialT m Word8) -> Array Word8 -> SerialT m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Array Word8 -> SerialT m Word8
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, IsStream t, Storable a) =>
Array a -> t m a
Streamly.Array.toStream 
{-# INLINEABLE decodeStreamArray #-}