{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE CPP #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE DeriveDataTypeable #-}
{-# LANGUAGE ImplicitParams #-}
{-# LANGUAGE RankNTypes #-}

{-|
Module      : Z.IO.Buffered
Description : Buffered IO interface
Copyright   : (c) Dong Han, 2017-2018
License     : BSD
Maintainer  : winterland1989@gmail.com
Stability   : experimental
Portability : non-portable

This module provide buffered IO interface.

-}

module Z.IO.Buffered
  ( -- * Input & Output device
    Input(..), Output(..)
    -- * Buffered Input
  , BufferedInput
  , newBufferedInput
  , readBuffer
  , unReadBuffer
  , readParser
  , readExactly,  readExactly'
  , readToMagic, readToMagic'
  , readLine, readLine'
  , readAll, readAll'
    -- * Buffered Output
  , BufferedOutput
  , newBufferedOutput
  , writeBuffer
  , writeBuilder
  , flushBuffer
    -- * Stream utilities
  , Source, Sink
  , sourceBuffer
  , sinkBuffer
  , sourceFromList
  , (>+>)
  , parseSource
  , collectSource
  , concatSource
  , zipSource
  , (>>>>=)

    -- * Exceptions
  , BufferedException(..)
    -- * common buffer size
  , V.defaultChunkSize
  , V.smallChunkSize
  ) where

import           Control.Monad
import           Control.Monad.Primitive     (ioToPrim, primToIO)
import           Control.Monad.ST
import           Data.IORef
import           Data.Primitive.PrimArray
import           Data.Typeable
import           Data.Word
import           Data.Bits                 (unsafeShiftR)
import           Foreign.Ptr
import           Z.Data.Array
import qualified Z.Data.Builder.Base       as B
import qualified Z.Data.Parser             as P
import qualified Z.Data.Vector             as V
import qualified Z.Data.Vector.Base        as V
import           Z.Data.PrimRef.PrimIORef
import           Z.Foreign
import           Z.IO.Exception

-- | Input device
--
-- 'readInput' should return 0 on EOF.
--
class Input i where
    readInput :: HasCallStack => i -> Ptr Word8 -> Int -> IO Int

-- | Output device
--
-- 'writeOutput' should not return until all data are written (may not
-- necessarily flushed to hardware, that should be done in device specific way).
--
class Output o where
    writeOutput :: HasCallStack => o -> Ptr Word8 -> Int -> IO ()

-- | Input device with buffer, NOT THREAD SAFE!
--
-- * A 'BufferedInput' should not be used in multiple threads, there's no locking mechanism to protect
--   buffering state.
--
-- * A 'Input' device should only be used with a single 'BufferedInput', If multiple 'BufferedInput' s
--   are opened on a same 'Input' device, the behaviour is undefined.
--
data BufferedInput i = BufferedInput
    { BufferedInput i -> i
bufInput    :: i
    , BufferedInput i -> IORef Bytes
bufPushBack :: {-# UNPACK #-} !(IORef V.Bytes)
    , BufferedInput i -> IORef (MutablePrimArray RealWorld Word8)
inputBuffer :: {-# UNPACK #-} !(IORef (MutablePrimArray RealWorld Word8))
    }

-- | Output device with buffer, NOT THREAD SAFE!
--
-- * A 'BufferedOutput' should not be used in multiple threads, there's no locking mechanism to protect
--   buffering state.
--
-- * A 'Output' device should only be used with a single 'BufferedOutput', If multiple 'BufferedOutput' s
--   are opened on a same 'BufferedOutput' device, the output will be interleaved.
--
data BufferedOutput o = BufferedOutput
    { BufferedOutput o -> o
bufOutput     :: o
    , BufferedOutput o -> Counter
bufIndex      :: {-# UNPACK #-} !Counter
    , BufferedOutput o -> MutablePrimArray RealWorld Word8
outputBuffer  :: {-# UNPACK #-} !(MutablePrimArray RealWorld Word8)
    }

-- | Open a new buffered input with given buffer size, e.g. 'V.defaultChunkSize'.
newBufferedInput :: Int     -- ^ Input buffer size
                 -> i
                 -> IO (BufferedInput i)
newBufferedInput :: Int -> i -> IO (BufferedInput i)
newBufferedInput Int
bufSiz i
i = do
    IORef Bytes
pb <- Bytes -> IO (IORef Bytes)
forall a. a -> IO (IORef a)
newIORef Bytes
forall (v :: * -> *) a. Vec v a => v a
V.empty
    MutablePrimArray RealWorld Word8
buf <- Int -> IO (MutablePrimArray (PrimState IO) Word8)
forall (m :: * -> *) a.
(PrimMonad m, Prim a) =>
Int -> m (MutablePrimArray (PrimState m) a)
newPinnedPrimArray (Int -> Int -> Int
forall a. Ord a => a -> a -> a
max Int
bufSiz Int
0)
    IORef (MutablePrimArray RealWorld Word8)
inputBuffer <- MutablePrimArray RealWorld Word8
-> IO (IORef (MutablePrimArray RealWorld Word8))
forall a. a -> IO (IORef a)
newIORef MutablePrimArray RealWorld Word8
buf
    BufferedInput i -> IO (BufferedInput i)
forall (m :: * -> *) a. Monad m => a -> m a
return (i
-> IORef Bytes
-> IORef (MutablePrimArray RealWorld Word8)
-> BufferedInput i
forall i.
i
-> IORef Bytes
-> IORef (MutablePrimArray RealWorld Word8)
-> BufferedInput i
BufferedInput i
i IORef Bytes
pb IORef (MutablePrimArray RealWorld Word8)
inputBuffer)

-- | Open a new buffered output with given buffer size, e.g. 'V.defaultChunkSize'.
newBufferedOutput :: Int    -- ^ Output buffer size
                  -> o
                  -> IO (BufferedOutput o)
newBufferedOutput :: Int -> o -> IO (BufferedOutput o)
newBufferedOutput Int
bufSiz o
o = do
    Counter
index <- Int -> IO Counter
forall a. Prim a => a -> IO (PrimIORef a)
newPrimIORef Int
0
    MutablePrimArray RealWorld Word8
buf <- Int -> IO (MutablePrimArray (PrimState IO) Word8)
forall (m :: * -> *) a.
(PrimMonad m, Prim a) =>
Int -> m (MutablePrimArray (PrimState m) a)
newPinnedPrimArray (Int -> Int -> Int
forall a. Ord a => a -> a -> a
max Int
bufSiz Int
0)
    BufferedOutput o -> IO (BufferedOutput o)
forall (m :: * -> *) a. Monad m => a -> m a
return (o
-> Counter -> MutablePrimArray RealWorld Word8 -> BufferedOutput o
forall o.
o
-> Counter -> MutablePrimArray RealWorld Word8 -> BufferedOutput o
BufferedOutput o
o Counter
index MutablePrimArray RealWorld Word8
buf)

-- | Request bytes from 'BufferedInput'.
--
-- The buffering logic is quite simple:
--
-- If we have pushed back bytes, directly return it, otherwise we read using buffer size.
-- If we read N bytes, and N is larger than half of the buffer size, then we freeze buffer and return,
-- otherwise we copy buffer into result and reuse buffer afterward.
--
readBuffer :: (Input i, HasCallStack) => BufferedInput i -> IO V.Bytes
readBuffer :: BufferedInput i -> IO Bytes
readBuffer BufferedInput{i
IORef (MutablePrimArray RealWorld Word8)
IORef Bytes
inputBuffer :: IORef (MutablePrimArray RealWorld Word8)
bufPushBack :: IORef Bytes
bufInput :: i
inputBuffer :: forall i.
BufferedInput i -> IORef (MutablePrimArray RealWorld Word8)
bufPushBack :: forall i. BufferedInput i -> IORef Bytes
bufInput :: forall i. BufferedInput i -> i
..} = do
    Bytes
pb <- IORef Bytes -> IO Bytes
forall a. IORef a -> IO a
readIORef IORef Bytes
bufPushBack
    if Bytes -> Bool
forall (v :: * -> *) a. Vec v a => v a -> Bool
V.null Bytes
pb
    then do
        MutablePrimArray RealWorld Word8
rbuf <- IORef (MutablePrimArray RealWorld Word8)
-> IO (MutablePrimArray RealWorld Word8)
forall a. IORef a -> IO a
readIORef IORef (MutablePrimArray RealWorld Word8)
inputBuffer
        Int
bufSiz <- MutablePrimArray (PrimState IO) Word8 -> IO Int
forall (m :: * -> *) a.
(PrimMonad m, Prim a) =>
MutablePrimArray (PrimState m) a -> m Int
getSizeofMutablePrimArray MutablePrimArray RealWorld Word8
MutablePrimArray (PrimState IO) Word8
rbuf
        Int
l <- i -> Ptr Word8 -> Int -> IO Int
forall i.
(Input i, HasCallStack) =>
i -> Ptr Word8 -> Int -> IO Int
readInput i
bufInput (MutablePrimArray RealWorld Word8 -> Ptr Word8
forall s a. MutablePrimArray s a -> Ptr a
mutablePrimArrayContents MutablePrimArray RealWorld Word8
rbuf) Int
bufSiz
        if Int
l Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
bufSiz Int -> Int -> Int
forall a. Integral a => a -> a -> a
`quot` Int
2                -- read less than half size
        then do
            MutablePrimArray RealWorld Word8
mba <- Int -> IO (MutablePrimArray (PrimState IO) Word8)
forall (m :: * -> *) a.
(PrimMonad m, Prim a) =>
Int -> m (MutablePrimArray (PrimState m) a)
newPrimArray Int
l              -- copy result into new array
            MutablePrimArray (PrimState IO) Word8
-> Int
-> MutablePrimArray (PrimState IO) Word8
-> Int
-> Int
-> IO ()
forall (m :: * -> *) a.
(PrimMonad m, Prim a) =>
MutablePrimArray (PrimState m) a
-> Int -> MutablePrimArray (PrimState m) a -> Int -> Int -> m ()
copyMutablePrimArray MutablePrimArray RealWorld Word8
MutablePrimArray (PrimState IO) Word8
mba Int
0 MutablePrimArray RealWorld Word8
MutablePrimArray (PrimState IO) Word8
rbuf Int
0 Int
l
            PrimArray Word8
ba <- MutablePrimArray (PrimState IO) Word8 -> IO (PrimArray Word8)
forall (m :: * -> *) a.
PrimMonad m =>
MutablePrimArray (PrimState m) a -> m (PrimArray a)
unsafeFreezePrimArray MutablePrimArray RealWorld Word8
MutablePrimArray (PrimState IO) Word8
mba
            Bytes -> IO Bytes
forall (m :: * -> *) a. Monad m => a -> m a
return (Bytes -> IO Bytes) -> Bytes -> IO Bytes
forall a b. (a -> b) -> a -> b
$! IArray PrimVector Word8 -> Int -> Int -> Bytes
forall (v :: * -> *) a. Vec v a => IArray v a -> Int -> Int -> v a
V.fromArr PrimArray Word8
IArray PrimVector Word8
ba Int
0 Int
l
        else do                                -- freeze buf into result
            Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
bufSiz Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
/= Int
0) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
                MutablePrimArray RealWorld Word8
buf' <- Int -> IO (MutablePrimArray (PrimState IO) Word8)
forall (m :: * -> *) a.
(PrimMonad m, Prim a) =>
Int -> m (MutablePrimArray (PrimState m) a)
newPinnedPrimArray Int
bufSiz
                IORef (MutablePrimArray RealWorld Word8)
-> MutablePrimArray RealWorld Word8 -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef (MutablePrimArray RealWorld Word8)
inputBuffer MutablePrimArray RealWorld Word8
buf'
            PrimArray Word8
ba <- MutablePrimArray (PrimState IO) Word8 -> IO (PrimArray Word8)
forall (m :: * -> *) a.
PrimMonad m =>
MutablePrimArray (PrimState m) a -> m (PrimArray a)
unsafeFreezePrimArray MutablePrimArray RealWorld Word8
MutablePrimArray (PrimState IO) Word8
rbuf
            Bytes -> IO Bytes
forall (m :: * -> *) a. Monad m => a -> m a
return (Bytes -> IO Bytes) -> Bytes -> IO Bytes
forall a b. (a -> b) -> a -> b
$! IArray PrimVector Word8 -> Int -> Int -> Bytes
forall (v :: * -> *) a. Vec v a => IArray v a -> Int -> Int -> v a
V.fromArr PrimArray Word8
IArray PrimVector Word8
ba Int
0 Int
l
    else do
        IORef Bytes -> Bytes -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef Bytes
bufPushBack Bytes
forall (v :: * -> *) a. Vec v a => v a
V.empty
        Bytes -> IO Bytes
forall (m :: * -> *) a. Monad m => a -> m a
return Bytes
pb

-- | Read exactly N bytes
--
-- If EOF reached before N bytes read, trailing bytes will be returned.
--
readExactly :: (HasCallStack, Input i) => Int -> BufferedInput i -> IO V.Bytes
readExactly :: Int -> BufferedInput i -> IO Bytes
readExactly Int
n0 BufferedInput i
h0 = [Bytes] -> Bytes
forall (v :: * -> *) a. Vec v a => [v a] -> v a
V.concat ([Bytes] -> Bytes) -> IO [Bytes] -> IO Bytes
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
`fmap` (BufferedInput i -> Int -> IO [Bytes]
forall i. Input i => BufferedInput i -> Int -> IO [Bytes]
go BufferedInput i
h0 Int
n0)
  where
    go :: BufferedInput i -> Int -> IO [Bytes]
go BufferedInput i
h Int
n = do
        Bytes
chunk <- BufferedInput i -> IO Bytes
forall i. (Input i, HasCallStack) => BufferedInput i -> IO Bytes
readBuffer BufferedInput i
h
        let l :: Int
l = Bytes -> Int
forall (v :: * -> *) a. Vec v a => v a -> Int
V.length Bytes
chunk
        if Int
l Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
n
        then do
            let (Bytes
lastChunk, Bytes
rest) = Int -> Bytes -> (Bytes, Bytes)
forall (v :: * -> *) a. Vec v a => Int -> v a -> (v a, v a)
V.splitAt Int
n Bytes
chunk
            Bytes -> BufferedInput i -> IO ()
forall i.
(HasCallStack, Input i) =>
Bytes -> BufferedInput i -> IO ()
unReadBuffer Bytes
rest BufferedInput i
h
            [Bytes] -> IO [Bytes]
forall (m :: * -> *) a. Monad m => a -> m a
return [Bytes
lastChunk]
        else if Int
l Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
n
            then [Bytes] -> IO [Bytes]
forall (m :: * -> *) a. Monad m => a -> m a
return [Bytes
chunk]
            else if Int
l Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0
                then [Bytes] -> IO [Bytes]
forall (m :: * -> *) a. Monad m => a -> m a
return [Bytes
chunk]
                else do
                    [Bytes]
chunks <- BufferedInput i -> Int -> IO [Bytes]
go BufferedInput i
h (Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
l)
                    [Bytes] -> IO [Bytes]
forall (m :: * -> *) a. Monad m => a -> m a
return (Bytes
chunk Bytes -> [Bytes] -> [Bytes]
forall a. a -> [a] -> [a]
: [Bytes]
chunks)

-- | Read exactly N bytes
--
-- If EOF reached before N bytes read, a 'ShortReadException' will be thrown
--
readExactly' :: (HasCallStack, Input i) => Int -> BufferedInput i -> IO V.Bytes
readExactly' :: Int -> BufferedInput i -> IO Bytes
readExactly' Int
n BufferedInput i
h = do
    Bytes
v <- Int -> BufferedInput i -> IO Bytes
forall i.
(HasCallStack, Input i) =>
Int -> BufferedInput i -> IO Bytes
readExactly Int
n BufferedInput i
h
    if (Bytes -> Int
forall (v :: * -> *) a. Vec v a => v a -> Int
V.length Bytes
v Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
/= Int
n)
    then BufferedException -> IO Bytes
forall e a. Exception e => e -> IO a
throwIO (CallStack -> BufferedException
ShortReadException CallStack
HasCallStack => CallStack
callStack)
    else Bytes -> IO Bytes
forall (m :: * -> *) a. Monad m => a -> m a
return Bytes
v

-- | Read all chunks from a 'BufferedInput'.
--
-- This function will loop read until meet EOF('Input' device return 'V.empty'),
-- Useful for reading small file into memory.
readAll :: (HasCallStack, Input i) => BufferedInput i -> IO [V.Bytes]
readAll :: BufferedInput i -> IO [Bytes]
readAll BufferedInput i
h = [Bytes] -> IO [Bytes]
loop []
  where
    loop :: [Bytes] -> IO [Bytes]
loop [Bytes]
acc = do
        Bytes
chunk <- BufferedInput i -> IO Bytes
forall i. (Input i, HasCallStack) => BufferedInput i -> IO Bytes
readBuffer BufferedInput i
h
        if Bytes -> Bool
forall (v :: * -> *) a. Vec v a => v a -> Bool
V.null Bytes
chunk
        then [Bytes] -> IO [Bytes]
forall (m :: * -> *) a. Monad m => a -> m a
return ([Bytes] -> IO [Bytes]) -> [Bytes] -> IO [Bytes]
forall a b. (a -> b) -> a -> b
$! [Bytes] -> [Bytes]
forall a. [a] -> [a]
reverse (Bytes
chunkBytes -> [Bytes] -> [Bytes]
forall a. a -> [a] -> [a]
:[Bytes]
acc)
        else [Bytes] -> IO [Bytes]
loop (Bytes
chunkBytes -> [Bytes] -> [Bytes]
forall a. a -> [a] -> [a]
:[Bytes]
acc)

-- | Read all chunks from a 'BufferedInput', and concat chunks together.
--
-- This function will loop read until meet EOF('Input' device return 'V.empty'),
-- Useful for reading small file into memory.
readAll' :: (HasCallStack, Input i) => BufferedInput i -> IO V.Bytes
readAll' :: BufferedInput i -> IO Bytes
readAll' BufferedInput i
i = [Bytes] -> Bytes
forall (v :: * -> *) a. Vec v a => [v a] -> v a
V.concat ([Bytes] -> Bytes) -> IO [Bytes] -> IO Bytes
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> BufferedInput i -> IO [Bytes]
forall i. (HasCallStack, Input i) => BufferedInput i -> IO [Bytes]
readAll BufferedInput i
i

data BufferedException = ParseException P.ParseError CallStack
                       | ShortReadException CallStack deriving (Int -> BufferedException -> ShowS
[BufferedException] -> ShowS
BufferedException -> String
(Int -> BufferedException -> ShowS)
-> (BufferedException -> String)
-> ([BufferedException] -> ShowS)
-> Show BufferedException
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [BufferedException] -> ShowS
$cshowList :: [BufferedException] -> ShowS
show :: BufferedException -> String
$cshow :: BufferedException -> String
showsPrec :: Int -> BufferedException -> ShowS
$cshowsPrec :: Int -> BufferedException -> ShowS
Show, Typeable)

instance Exception BufferedException where
    toException :: BufferedException -> SomeException
toException = BufferedException -> SomeException
forall e. Exception e => e -> SomeException
ioExceptionToException
    fromException :: SomeException -> Maybe BufferedException
fromException = SomeException -> Maybe BufferedException
forall e. Exception e => SomeException -> Maybe e
ioExceptionFromException

-- | Push bytes back into buffer(if not empty).
--
unReadBuffer :: (HasCallStack, Input i) => V.Bytes -> BufferedInput i -> IO ()
unReadBuffer :: Bytes -> BufferedInput i -> IO ()
unReadBuffer Bytes
pb' BufferedInput{i
IORef (MutablePrimArray RealWorld Word8)
IORef Bytes
inputBuffer :: IORef (MutablePrimArray RealWorld Word8)
bufPushBack :: IORef Bytes
bufInput :: i
inputBuffer :: forall i.
BufferedInput i -> IORef (MutablePrimArray RealWorld Word8)
bufPushBack :: forall i. BufferedInput i -> IORef Bytes
bufInput :: forall i. BufferedInput i -> i
..} = Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (Bytes -> Bool
forall (v :: * -> *) a. Vec v a => v a -> Bool
V.null Bytes
pb') (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
    IORef Bytes -> (Bytes -> Bytes) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef' IORef Bytes
bufPushBack ((Bytes -> Bytes) -> IO ()) -> (Bytes -> Bytes) -> IO ()
forall a b. (a -> b) -> a -> b
$ \ Bytes
pb -> Bytes
pb' Bytes -> Bytes -> Bytes
forall (v :: * -> *) a. Vec v a => v a -> v a -> v a
`V.append` Bytes
pb

-- | Read buffer and parse with 'Parser'.
--
-- This function will continuously draw data from input before parsing finish. Unconsumed
-- bytes will be returned to buffer.
--
-- Either during parsing or before parsing, reach EOF will result in 'P.ParseError'.
readParser :: (HasCallStack, Input i) => P.Parser a -> BufferedInput i -> IO (Either P.ParseError a)
readParser :: Parser a -> BufferedInput i -> IO (Either ParseError a)
readParser Parser a
p BufferedInput i
i = do
    Bytes
bs <- BufferedInput i -> IO Bytes
forall i. (Input i, HasCallStack) => BufferedInput i -> IO Bytes
readBuffer BufferedInput i
i
    (Bytes
rest, Either ParseError a
r) <- Parser a -> IO Bytes -> Bytes -> IO (Bytes, Either ParseError a)
forall (m :: * -> *) a.
Monad m =>
Parser a -> m Bytes -> Bytes -> m (Bytes, Either ParseError a)
P.parseChunks Parser a
p (BufferedInput i -> IO Bytes
forall i. (Input i, HasCallStack) => BufferedInput i -> IO Bytes
readBuffer BufferedInput i
i) Bytes
bs
    Bytes -> BufferedInput i -> IO ()
forall i.
(HasCallStack, Input i) =>
Bytes -> BufferedInput i -> IO ()
unReadBuffer Bytes
rest BufferedInput i
i
    Either ParseError a -> IO (Either ParseError a)
forall (m :: * -> *) a. Monad m => a -> m a
return Either ParseError a
r

-- | Read until reach a magic bytes, return bytes(including the magic bytes)
--
-- If EOF is reached before meet a magic byte, partial bytes are returned.
readToMagic :: (HasCallStack, Input i) => Word8 -> BufferedInput i -> IO V.Bytes
readToMagic :: Word8 -> BufferedInput i -> IO Bytes
readToMagic Word8
magic0 BufferedInput i
h0 = [Bytes] -> Bytes
forall (v :: * -> *) a. Vec v a => [v a] -> v a
V.concat ([Bytes] -> Bytes) -> IO [Bytes] -> IO Bytes
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
`fmap` (BufferedInput i -> Word8 -> IO [Bytes]
forall i. Input i => BufferedInput i -> Word8 -> IO [Bytes]
go BufferedInput i
h0 Word8
magic0)
  where
    go :: BufferedInput i -> Word8 -> IO [Bytes]
go BufferedInput i
h Word8
magic = do
        Bytes
chunk <- BufferedInput i -> IO Bytes
forall i. (Input i, HasCallStack) => BufferedInput i -> IO Bytes
readBuffer BufferedInput i
h
        if Bytes -> Bool
forall (v :: * -> *) a. Vec v a => v a -> Bool
V.null Bytes
chunk
        then [Bytes] -> IO [Bytes]
forall (m :: * -> *) a. Monad m => a -> m a
return []
        else case Word8 -> Bytes -> Maybe Int
forall (v :: * -> *) a. (Vec v a, Eq a) => a -> v a -> Maybe Int
V.elemIndex Word8
magic Bytes
chunk of
            Just Int
i -> do
                let (Bytes
lastChunk, Bytes
rest) = Int -> Bytes -> (Bytes, Bytes)
forall (v :: * -> *) a. Vec v a => Int -> v a -> (v a, v a)
V.splitAt (Int
iInt -> Int -> Int
forall a. Num a => a -> a -> a
+Int
1) Bytes
chunk
                Bytes -> BufferedInput i -> IO ()
forall i.
(HasCallStack, Input i) =>
Bytes -> BufferedInput i -> IO ()
unReadBuffer Bytes
rest BufferedInput i
h
                [Bytes] -> IO [Bytes]
forall (m :: * -> *) a. Monad m => a -> m a
return [Bytes
lastChunk]
            Maybe Int
Nothing -> do
                [Bytes]
chunks <- BufferedInput i -> Word8 -> IO [Bytes]
go BufferedInput i
h Word8
magic
                [Bytes] -> IO [Bytes]
forall (m :: * -> *) a. Monad m => a -> m a
return (Bytes
chunk Bytes -> [Bytes] -> [Bytes]
forall a. a -> [a] -> [a]
: [Bytes]
chunks)

-- | Read until reach a magic bytes, return bytes(including the magic bytes)
--
-- If EOF is reached before meet a magic byte, a 'ShortReadException' will be thrown.
readToMagic' :: (HasCallStack, Input i) => Word8 -> BufferedInput i -> IO V.Bytes
readToMagic' :: Word8 -> BufferedInput i -> IO Bytes
readToMagic' Word8
magic0 BufferedInput i
h0 = [Bytes] -> Bytes
forall (v :: * -> *) a. Vec v a => [v a] -> v a
V.concat ([Bytes] -> Bytes) -> IO [Bytes] -> IO Bytes
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
`fmap` (BufferedInput i -> Word8 -> IO [Bytes]
forall i. Input i => BufferedInput i -> Word8 -> IO [Bytes]
go BufferedInput i
h0 Word8
magic0)
  where
    go :: BufferedInput i -> Word8 -> IO [Bytes]
go BufferedInput i
h Word8
magic = do
        Bytes
chunk <- BufferedInput i -> IO Bytes
forall i. (Input i, HasCallStack) => BufferedInput i -> IO Bytes
readBuffer BufferedInput i
h
        if Bytes -> Bool
forall (v :: * -> *) a. Vec v a => v a -> Bool
V.null Bytes
chunk
        then BufferedException -> IO [Bytes]
forall e a. Exception e => e -> IO a
throwIO (CallStack -> BufferedException
ShortReadException CallStack
HasCallStack => CallStack
callStack)
        else case Word8 -> Bytes -> Maybe Int
forall (v :: * -> *) a. (Vec v a, Eq a) => a -> v a -> Maybe Int
V.elemIndex Word8
magic Bytes
chunk of
            Just Int
i -> do
                let (Bytes
lastChunk, Bytes
rest) = Int -> Bytes -> (Bytes, Bytes)
forall (v :: * -> *) a. Vec v a => Int -> v a -> (v a, v a)
V.splitAt (Int
iInt -> Int -> Int
forall a. Num a => a -> a -> a
+Int
1) Bytes
chunk
                Bytes -> BufferedInput i -> IO ()
forall i.
(HasCallStack, Input i) =>
Bytes -> BufferedInput i -> IO ()
unReadBuffer Bytes
rest BufferedInput i
h
                [Bytes] -> IO [Bytes]
forall (m :: * -> *) a. Monad m => a -> m a
return [Bytes
lastChunk]
            Maybe Int
Nothing -> do
                [Bytes]
chunks <- BufferedInput i -> Word8 -> IO [Bytes]
go BufferedInput i
h Word8
magic
                [Bytes] -> IO [Bytes]
forall (m :: * -> *) a. Monad m => a -> m a
return (Bytes
chunk Bytes -> [Bytes] -> [Bytes]
forall a. a -> [a] -> [a]
: [Bytes]
chunks)

-- | Read to a linefeed ('\n' or '\r\n'), return 'Bytes' before it.
--
-- Return bytes don't include linefeed, empty bytes indicate empty line, 'Nothing' indicate EOF.
-- If EOF is reached before meet a line feed, partial line is returned.
readLine :: (HasCallStack, Input i) => BufferedInput i -> Source V.Bytes
readLine :: BufferedInput i -> Source Bytes
readLine BufferedInput i
i = do
    bs :: Bytes
bs@(V.PrimVector PrimArray Word8
arr Int
s Int
l) <- Word8 -> BufferedInput i -> IO Bytes
forall i.
(HasCallStack, Input i) =>
Word8 -> BufferedInput i -> IO Bytes
readToMagic Word8
10 BufferedInput i
i
    if Int
l Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0
    then Maybe Bytes -> Source Bytes
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe Bytes
forall a. Maybe a
Nothing
    else Maybe Bytes -> Source Bytes
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe Bytes -> Source Bytes) -> Maybe Bytes -> Source Bytes
forall a b. (a -> b) -> a -> b
$ case Bytes
bs Bytes -> Int -> Maybe Word8
forall (v :: * -> *) a. Vec v a => v a -> Int -> Maybe a
`V.indexMaybe` (Int
lInt -> Int -> Int
forall a. Num a => a -> a -> a
-Int
2) of
        Maybe Word8
Nothing -> Bytes -> Maybe Bytes
forall a. a -> Maybe a
Just (PrimArray Word8 -> Int -> Int -> Bytes
forall a. PrimArray a -> Int -> Int -> PrimVector a
V.PrimVector PrimArray Word8
arr Int
s (Int
lInt -> Int -> Int
forall a. Num a => a -> a -> a
-Int
1))
        Just Word8
r | Word8
r Word8 -> Word8 -> Bool
forall a. Eq a => a -> a -> Bool
== Word8
13   -> Bytes -> Maybe Bytes
forall a. a -> Maybe a
Just (PrimArray Word8 -> Int -> Int -> Bytes
forall a. PrimArray a -> Int -> Int -> PrimVector a
V.PrimVector PrimArray Word8
arr Int
s (Int
lInt -> Int -> Int
forall a. Num a => a -> a -> a
-Int
2))
               | Bool
otherwise -> Bytes -> Maybe Bytes
forall a. a -> Maybe a
Just (PrimArray Word8 -> Int -> Int -> Bytes
forall a. PrimArray a -> Int -> Int -> PrimVector a
V.PrimVector PrimArray Word8
arr Int
s (Int
lInt -> Int -> Int
forall a. Num a => a -> a -> a
-Int
1))

-- | Read to a linefeed ('\n' or '\r\n'), return 'Bytes' before it.
--
-- Return bytes don't include linefeed, empty bytes indicate empty line, 'Nothing' indicate EOF.
-- If EOF reached before meet a line feed, a 'ShortReadException' will be thrown.
readLine' :: (HasCallStack, Input i) => BufferedInput i -> Source V.Bytes
readLine' :: BufferedInput i -> Source Bytes
readLine' BufferedInput i
i = do
    bs :: Bytes
bs@(V.PrimVector PrimArray Word8
arr Int
s Int
l) <- Word8 -> BufferedInput i -> IO Bytes
forall i.
(HasCallStack, Input i) =>
Word8 -> BufferedInput i -> IO Bytes
readToMagic' Word8
10 BufferedInput i
i
    if Int
l Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0
    then Maybe Bytes -> Source Bytes
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe Bytes
forall a. Maybe a
Nothing
    else Maybe Bytes -> Source Bytes
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe Bytes -> Source Bytes) -> Maybe Bytes -> Source Bytes
forall a b. (a -> b) -> a -> b
$ case Bytes
bs Bytes -> Int -> Maybe Word8
forall (v :: * -> *) a. Vec v a => v a -> Int -> Maybe a
`V.indexMaybe` (Int
lInt -> Int -> Int
forall a. Num a => a -> a -> a
-Int
2) of
        Maybe Word8
Nothing -> Bytes -> Maybe Bytes
forall a. a -> Maybe a
Just (PrimArray Word8 -> Int -> Int -> Bytes
forall a. PrimArray a -> Int -> Int -> PrimVector a
V.PrimVector PrimArray Word8
arr Int
s (Int
lInt -> Int -> Int
forall a. Num a => a -> a -> a
-Int
1))
        Just Word8
r | Word8
r Word8 -> Word8 -> Bool
forall a. Eq a => a -> a -> Bool
== Word8
13   -> Bytes -> Maybe Bytes
forall a. a -> Maybe a
Just (PrimArray Word8 -> Int -> Int -> Bytes
forall a. PrimArray a -> Int -> Int -> PrimVector a
V.PrimVector PrimArray Word8
arr Int
s (Int
lInt -> Int -> Int
forall a. Num a => a -> a -> a
-Int
2))
               | Bool
otherwise -> Bytes -> Maybe Bytes
forall a. a -> Maybe a
Just (PrimArray Word8 -> Int -> Int -> Bytes
forall a. PrimArray a -> Int -> Int -> PrimVector a
V.PrimVector PrimArray Word8
arr Int
s (Int
lInt -> Int -> Int
forall a. Num a => a -> a -> a
-Int
1))

--------------------------------------------------------------------------------

-- | Write 'V.Bytes' into buffered handle.
--
-- * If buffer is empty and bytes are larger than half of buffer, directly write bytes,
--   otherwise copy bytes to buffer.
--
-- * If buffer is not empty, then copy bytes to buffer if it can hold, otherwise
--   write buffer first, then try again.
--
writeBuffer :: (HasCallStack, Output o) => BufferedOutput o -> V.Bytes -> IO ()
writeBuffer :: BufferedOutput o -> Bytes -> IO ()
writeBuffer o :: BufferedOutput o
o@BufferedOutput{o
MutablePrimArray RealWorld Word8
Counter
outputBuffer :: MutablePrimArray RealWorld Word8
bufIndex :: Counter
bufOutput :: o
outputBuffer :: forall o. BufferedOutput o -> MutablePrimArray RealWorld Word8
bufIndex :: forall o. BufferedOutput o -> Counter
bufOutput :: forall o. BufferedOutput o -> o
..} v :: Bytes
v@(V.PrimVector PrimArray Word8
ba Int
s Int
l) = do
    Int
i <- Counter -> IO Int
forall a. Prim a => PrimIORef a -> IO a
readPrimIORef Counter
bufIndex
    Int
bufSiz <- MutablePrimArray (PrimState IO) Word8 -> IO Int
forall (m :: * -> *) a.
(PrimMonad m, Prim a) =>
MutablePrimArray (PrimState m) a -> m Int
getSizeofMutablePrimArray MutablePrimArray RealWorld Word8
MutablePrimArray (PrimState IO) Word8
outputBuffer
    if Int
i Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
/= Int
0
    then if Int
i Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
l Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
bufSiz
        then do
            -- current buffer can hold it
            MutablePrimArray (PrimState IO) Word8
-> Int -> PrimArray Word8 -> Int -> Int -> IO ()
forall (m :: * -> *) a.
(PrimMonad m, Prim a) =>
MutablePrimArray (PrimState m) a
-> Int -> PrimArray a -> Int -> Int -> m ()
copyPrimArray MutablePrimArray RealWorld Word8
MutablePrimArray (PrimState IO) Word8
outputBuffer Int
i PrimArray Word8
ba Int
s Int
l   -- copy to buffer
            Counter -> Int -> IO ()
forall a. Prim a => PrimIORef a -> a -> IO ()
writePrimIORef Counter
bufIndex (Int
iInt -> Int -> Int
forall a. Num a => a -> a -> a
+Int
l)              -- update index
        else do
            -- flush the buffer first
            MutablePrimArray RealWorld Word8 -> (Ptr Word8 -> IO ()) -> IO ()
forall a b. MutablePrimArray RealWorld a -> (Ptr a -> IO b) -> IO b
withMutablePrimArrayContents MutablePrimArray RealWorld Word8
outputBuffer ((Ptr Word8 -> IO ()) -> IO ()) -> (Ptr Word8 -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \ Ptr Word8
ptr -> (o -> Ptr Word8 -> Int -> IO ()
forall o.
(Output o, HasCallStack) =>
o -> Ptr Word8 -> Int -> IO ()
writeOutput o
bufOutput) Ptr Word8
ptr Int
i
            Counter -> Int -> IO ()
forall a. Prim a => PrimIORef a -> a -> IO ()
writePrimIORef Counter
bufIndex Int
0
            -- try write to buffer again
            BufferedOutput o -> Bytes -> IO ()
forall o.
(HasCallStack, Output o) =>
BufferedOutput o -> Bytes -> IO ()
writeBuffer BufferedOutput o
o Bytes
v
    else
        if Int
l Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
bufSiz Int -> Int -> Int
forall a. Bits a => a -> Int -> a
`unsafeShiftR` Int
2
        then Bytes -> (Ptr Word8 -> Int -> IO ()) -> IO ()
forall a b.
Prim a =>
PrimVector a -> (Ptr a -> Int -> IO b) -> IO b
withPrimVectorSafe Bytes
v (o -> Ptr Word8 -> Int -> IO ()
forall o.
(Output o, HasCallStack) =>
o -> Ptr Word8 -> Int -> IO ()
writeOutput o
bufOutput)
        else do
            MutablePrimArray (PrimState IO) Word8
-> Int -> PrimArray Word8 -> Int -> Int -> IO ()
forall (m :: * -> *) a.
(PrimMonad m, Prim a) =>
MutablePrimArray (PrimState m) a
-> Int -> PrimArray a -> Int -> Int -> m ()
copyPrimArray MutablePrimArray RealWorld Word8
MutablePrimArray (PrimState IO) Word8
outputBuffer Int
i PrimArray Word8
ba Int
s Int
l   -- copy to buffer
            Counter -> Int -> IO ()
forall a. Prim a => PrimIORef a -> a -> IO ()
writePrimIORef Counter
bufIndex Int
l             -- update index


-- | Directly write 'B.Builder' into buffered handle.
--
-- Run 'B.Builder' with buffer if it can hold, write to device when buffer is full.
--
writeBuilder :: (HasCallStack, Output o) => BufferedOutput o -> B.Builder a -> IO ()
writeBuilder :: BufferedOutput o -> Builder a -> IO ()
writeBuilder BufferedOutput{o
MutablePrimArray RealWorld Word8
Counter
outputBuffer :: MutablePrimArray RealWorld Word8
bufIndex :: Counter
bufOutput :: o
outputBuffer :: forall o. BufferedOutput o -> MutablePrimArray RealWorld Word8
bufIndex :: forall o. BufferedOutput o -> Counter
bufOutput :: forall o. BufferedOutput o -> o
..} (B.Builder forall s. AllocateStrategy s -> (a -> BuildStep s) -> BuildStep s
b) = do
    Int
i <- Counter -> IO Int
forall a. Prim a => PrimIORef a -> IO a
readPrimIORef Counter
bufIndex
    Int
originBufSiz <- MutablePrimArray (PrimState IO) Word8 -> IO Int
forall (m :: * -> *) a.
(PrimMonad m, Prim a) =>
MutablePrimArray (PrimState m) a -> m Int
getSizeofMutablePrimArray MutablePrimArray RealWorld Word8
MutablePrimArray (PrimState IO) Word8
outputBuffer
    [Bytes]
_ <- ST RealWorld [Bytes] -> IO [Bytes]
forall (m :: * -> *) a.
(PrimBase m, PrimState m ~ RealWorld) =>
m a -> IO a
primToIO (AllocateStrategy RealWorld
-> (a -> BuildStep RealWorld) -> BuildStep RealWorld
forall s. AllocateStrategy s -> (a -> BuildStep s) -> BuildStep s
b ((Bytes -> ST RealWorld ()) -> AllocateStrategy RealWorld
forall s. (Bytes -> ST s ()) -> AllocateStrategy s
B.OneShotAction Bytes -> ST RealWorld ()
action) (Int -> a -> BuildStep RealWorld
forall a. Int -> a -> BuildStep RealWorld
lastStep Int
originBufSiz) (MutablePrimArray RealWorld Word8 -> Int -> Buffer RealWorld
forall s. MutablePrimArray s Word8 -> Int -> Buffer s
B.Buffer MutablePrimArray RealWorld Word8
outputBuffer Int
i))
    () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
  where
    action :: V.Bytes -> ST RealWorld ()
    action :: Bytes -> ST RealWorld ()
action Bytes
bytes = IO () -> ST RealWorld ()
forall (m :: * -> *) a.
(PrimMonad m, PrimState m ~ RealWorld) =>
IO a -> m a
ioToPrim (Bytes -> (Ptr Word8 -> Int -> IO ()) -> IO ()
forall a b.
Prim a =>
PrimVector a -> (Ptr a -> Int -> IO b) -> IO b
withPrimVectorSafe Bytes
bytes (o -> Ptr Word8 -> Int -> IO ()
forall o.
(Output o, HasCallStack) =>
o -> Ptr Word8 -> Int -> IO ()
writeOutput o
bufOutput))

    lastStep :: Int -> a -> B.BuildStep RealWorld
    lastStep :: Int -> a -> BuildStep RealWorld
lastStep Int
originBufSiz a
_ (B.Buffer MutablePrimArray RealWorld Word8
buf Int
offset)
        | MutablePrimArray RealWorld Word8
-> MutablePrimArray RealWorld Word8 -> Bool
forall s a. MutablePrimArray s a -> MutablePrimArray s a -> Bool
sameMutablePrimArray MutablePrimArray RealWorld Word8
buf MutablePrimArray RealWorld Word8
outputBuffer = IO [Bytes] -> ST RealWorld [Bytes]
forall (m :: * -> *) a.
(PrimMonad m, PrimState m ~ RealWorld) =>
IO a -> m a
ioToPrim (IO [Bytes] -> ST RealWorld [Bytes])
-> IO [Bytes] -> ST RealWorld [Bytes]
forall a b. (a -> b) -> a -> b
$ do
            Counter -> Int -> IO ()
forall a. Prim a => PrimIORef a -> a -> IO ()
writePrimIORef Counter
bufIndex Int
offset   -- record new buffer index
            [Bytes] -> IO [Bytes]
forall (m :: * -> *) a. Monad m => a -> m a
return []
        | Int
offset Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
originBufSiz = IO [Bytes] -> ST RealWorld [Bytes]
forall (m :: * -> *) a.
(PrimMonad m, PrimState m ~ RealWorld) =>
IO a -> m a
ioToPrim (IO [Bytes] -> ST RealWorld [Bytes])
-> IO [Bytes] -> ST RealWorld [Bytes]
forall a b. (a -> b) -> a -> b
$ do
            MutablePrimArray RealWorld Word8 -> (Ptr Word8 -> IO ()) -> IO ()
forall a b. MutablePrimArray RealWorld a -> (Ptr a -> IO b) -> IO b
withMutablePrimArrayContents MutablePrimArray RealWorld Word8
buf ((Ptr Word8 -> IO ()) -> IO ()) -> (Ptr Word8 -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \ Ptr Word8
ptr -> (o -> Ptr Word8 -> Int -> IO ()
forall o.
(Output o, HasCallStack) =>
o -> Ptr Word8 -> Int -> IO ()
writeOutput o
bufOutput) Ptr Word8
ptr Int
offset
            Counter -> Int -> IO ()
forall a. Prim a => PrimIORef a -> a -> IO ()
writePrimIORef Counter
bufIndex Int
0
            [Bytes] -> IO [Bytes]
forall (m :: * -> *) a. Monad m => a -> m a
return [] -- to match 'BuildStep' return type
        | Bool
otherwise = IO [Bytes] -> ST RealWorld [Bytes]
forall (m :: * -> *) a.
(PrimMonad m, PrimState m ~ RealWorld) =>
IO a -> m a
ioToPrim (IO [Bytes] -> ST RealWorld [Bytes])
-> IO [Bytes] -> ST RealWorld [Bytes]
forall a b. (a -> b) -> a -> b
$ do
            MutablePrimArray (PrimState IO) Word8
-> Int
-> MutablePrimArray (PrimState IO) Word8
-> Int
-> Int
-> IO ()
forall (m :: * -> *) a.
(PrimMonad m, Prim a) =>
MutablePrimArray (PrimState m) a
-> Int -> MutablePrimArray (PrimState m) a -> Int -> Int -> m ()
copyMutablePrimArray MutablePrimArray RealWorld Word8
MutablePrimArray (PrimState IO) Word8
outputBuffer Int
0 MutablePrimArray RealWorld Word8
MutablePrimArray (PrimState IO) Word8
buf Int
0 Int
offset
            Counter -> Int -> IO ()
forall a. Prim a => PrimIORef a -> a -> IO ()
writePrimIORef Counter
bufIndex Int
offset
            [Bytes] -> IO [Bytes]
forall (m :: * -> *) a. Monad m => a -> m a
return [] -- to match 'BuildStep' return type

-- | Flush the buffer into output device(if buffer is not empty).
--
flushBuffer :: (HasCallStack, Output o) => BufferedOutput o -> IO ()
flushBuffer :: BufferedOutput o -> IO ()
flushBuffer BufferedOutput{o
MutablePrimArray RealWorld Word8
Counter
outputBuffer :: MutablePrimArray RealWorld Word8
bufIndex :: Counter
bufOutput :: o
outputBuffer :: forall o. BufferedOutput o -> MutablePrimArray RealWorld Word8
bufIndex :: forall o. BufferedOutput o -> Counter
bufOutput :: forall o. BufferedOutput o -> o
..} = do
    Int
i <- Counter -> IO Int
forall a. Prim a => PrimIORef a -> IO a
readPrimIORef Counter
bufIndex
    Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
i Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
/= Int
0) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
        MutablePrimArray RealWorld Word8 -> (Ptr Word8 -> IO ()) -> IO ()
forall a b. MutablePrimArray RealWorld a -> (Ptr a -> IO b) -> IO b
withMutablePrimArrayContents MutablePrimArray RealWorld Word8
outputBuffer ((Ptr Word8 -> IO ()) -> IO ()) -> (Ptr Word8 -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \ Ptr Word8
ptr -> (o -> Ptr Word8 -> Int -> IO ()
forall o.
(Output o, HasCallStack) =>
o -> Ptr Word8 -> Int -> IO ()
writeOutput o
bufOutput) Ptr Word8
ptr Int
i
        Counter -> Int -> IO ()
forall a. Prim a => PrimIORef a -> a -> IO ()
writePrimIORef Counter
bufIndex Int
0

--------------------------------------------------------------------------------

-- | Type alias for input stream, 'Nothing' indicate EOF.
type Source a = IO (Maybe a)

-- | Type alias for output stream, contain a write & a flush function.
type Sink a = (a -> IO (), IO ())

-- | Turn a 'BufferedInput' into 'Source', map EOF to Nothing.
--
sourceBuffer :: (HasCallStack, Input i) => BufferedInput i -> Source V.Bytes
{-# INLINABLE sourceBuffer #-}
sourceBuffer :: BufferedInput i -> Source Bytes
sourceBuffer BufferedInput i
i = BufferedInput i -> IO Bytes
forall i. (Input i, HasCallStack) => BufferedInput i -> IO Bytes
readBuffer BufferedInput i
i IO Bytes -> (Bytes -> Source Bytes) -> Source Bytes
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \ Bytes
x -> if Bytes -> Bool
forall (v :: * -> *) a. Vec v a => v a -> Bool
V.null Bytes
x then Maybe Bytes -> Source Bytes
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe Bytes
forall a. Maybe a
Nothing
                                                     else Maybe Bytes -> Source Bytes
forall (m :: * -> *) a. Monad m => a -> m a
return (Bytes -> Maybe Bytes
forall a. a -> Maybe a
Just Bytes
x)

-- | Turn a 'BufferedOutput' into 'Sink'.
--
sinkBuffer :: (HasCallStack, Output o) => BufferedOutput o -> Sink V.Bytes
{-# INLINABLE sinkBuffer #-}
sinkBuffer :: BufferedOutput o -> Sink Bytes
sinkBuffer BufferedOutput o
o = (BufferedOutput o -> Bytes -> IO ()
forall o.
(HasCallStack, Output o) =>
BufferedOutput o -> Bytes -> IO ()
writeBuffer BufferedOutput o
o, BufferedOutput o -> IO ()
forall o. (HasCallStack, Output o) => BufferedOutput o -> IO ()
flushBuffer BufferedOutput o
o)

-- | Source a list streamly.
sourceFromList :: [a] -> IO (Source a)
{-# INLINABLE sourceFromList #-}
sourceFromList :: [a] -> IO (Source a)
sourceFromList [a]
xs0 = do
    IORef [a]
xsRef <- [a] -> IO (IORef [a])
forall a. a -> IO (IORef a)
newIORef [a]
xs0
    Source a -> IO (Source a)
forall (m :: * -> *) a. Monad m => a -> m a
return (IORef [a] -> Source a
forall a. IORef [a] -> IO (Maybe a)
popper IORef [a]
xsRef)
  where
    popper :: IORef [a] -> IO (Maybe a)
popper IORef [a]
xsRef = do
        [a]
xs <- IORef [a] -> IO [a]
forall a. IORef a -> IO a
readIORef IORef [a]
xsRef
        case [a]
xs of
            (a
x:[a]
xs') -> do
                IORef [a] -> [a] -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef [a]
xsRef [a]
xs'
                Maybe a -> IO (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> Maybe a
forall a. a -> Maybe a
Just a
x)
            [a]
_ -> Maybe a -> IO (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing

-- | Connect two streams, after first reach EOF, draw element from second.
(>+>) :: Source a -> Source a  -> IO (Source a)
{-# INLINABLE (>+>) #-}
Source a
input1 >+> :: Source a -> Source a -> IO (Source a)
>+> Source a
input2 = [Source a] -> IO (Source a)
forall a. [Source a] -> IO (Source a)
concatSource [Source a
input1, Source a
input2]

-- | Read all stream elements to a list.
collectSource :: Source a -> IO [a]
{-# INLINABLE collectSource #-}
collectSource :: Source a -> IO [a]
collectSource Source a
input = [a] -> IO [a]
loop []
  where
    loop :: [a] -> IO [a]
loop [a]
acc = do
        Maybe a
r <- Source a
input
        case Maybe a
r of
            Just a
r' -> [a] -> IO [a]
loop (a
r'a -> [a] -> [a]
forall a. a -> [a] -> [a]
:[a]
acc)
            Maybe a
_       -> [a] -> IO [a]
forall (m :: * -> *) a. Monad m => a -> m a
return ([a] -> IO [a]) -> [a] -> IO [a]
forall a b. (a -> b) -> a -> b
$! [a] -> [a]
forall a. [a] -> [a]
reverse [a]
acc


-- | Read buffer and parse with 'Parser'.
--
-- This function will continuously draw data from input before parsing finish. Unconsumed
-- bytes will be returned to buffer.
--
-- Return 'Nothing' if reach EOF before parsing, throw 'ParseException' if parsing fail.
parseSource :: HasCallStack => P.Parser a -> Source V.Bytes -> IO (Source a)
{-# INLINABLE parseSource #-}
parseSource :: Parser a -> Source Bytes -> IO (Source a)
parseSource Parser a
p Source Bytes
source = do
    IORef Bytes
trailingRef <- Bytes -> IO (IORef Bytes)
forall a. a -> IO (IORef a)
newIORef Bytes
forall (v :: * -> *) a. Vec v a => v a
V.empty
    Source a -> IO (Source a)
forall (m :: * -> *) a. Monad m => a -> m a
return (IORef Bytes -> Source a
go IORef Bytes
trailingRef)
  where
    go :: IORef Bytes -> Source a
go IORef Bytes
trailingRef = do
        Bytes
trailing <- IORef Bytes -> IO Bytes
forall a. IORef a -> IO a
readIORef IORef Bytes
trailingRef
        if Bytes -> Bool
forall (v :: * -> *) a. Vec v a => v a -> Bool
V.null Bytes
trailing
        then do
            Maybe Bytes
bs <- Source Bytes
source
            case Maybe Bytes
bs of
                Just Bytes
bs' -> do
                    (Bytes
rest, Either ParseError a
r) <- Parser a -> IO Bytes -> Bytes -> IO (Bytes, Either ParseError a)
forall (m :: * -> *) a.
Monad m =>
Parser a -> m Bytes -> Bytes -> m (Bytes, Either ParseError a)
P.parseChunks Parser a
p IO Bytes
source' Bytes
bs'
                    IORef Bytes -> Bytes -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef Bytes
trailingRef Bytes
rest
                    case Either ParseError a
r of Right a
v -> Maybe a -> Source a
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> Maybe a
forall a. a -> Maybe a
Just a
v)
                              Left ParseError
e  -> BufferedException -> Source a
forall e a. Exception e => e -> IO a
throwIO (ParseError -> CallStack -> BufferedException
ParseException ParseError
e CallStack
HasCallStack => CallStack
callStack)
                Maybe Bytes
_    -> Maybe a -> Source a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing
        else do
            (Bytes
rest, Either ParseError a
r) <- Parser a -> IO Bytes -> Bytes -> IO (Bytes, Either ParseError a)
forall (m :: * -> *) a.
Monad m =>
Parser a -> m Bytes -> Bytes -> m (Bytes, Either ParseError a)
P.parseChunks Parser a
p IO Bytes
source' Bytes
trailing
            IORef Bytes -> Bytes -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef Bytes
trailingRef Bytes
rest
            case Either ParseError a
r of Right a
v -> Maybe a -> Source a
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> Maybe a
forall a. a -> Maybe a
Just a
v)
                      Left ParseError
e  -> BufferedException -> Source a
forall e a. Exception e => e -> IO a
throwIO (ParseError -> CallStack -> BufferedException
ParseException ParseError
e CallStack
HasCallStack => CallStack
callStack)

    source' :: IO Bytes
source' = Source Bytes
source Source Bytes -> (Maybe Bytes -> IO Bytes) -> IO Bytes
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \ Maybe Bytes
r -> case Maybe Bytes
r of Just Bytes
r' -> Bytes -> IO Bytes
forall (m :: * -> *) a. Monad m => a -> m a
return Bytes
r'
                                          Maybe Bytes
_      -> Bytes -> IO Bytes
forall (m :: * -> *) a. Monad m => a -> m a
return Bytes
forall (v :: * -> *) a. Vec v a => v a
V.empty

-- | Connect list of streams, after one stream reach EOF, draw element from next.
concatSource :: [Source a] -> IO (Source a)
{-# INLINABLE concatSource #-}
concatSource :: [Source a] -> IO (Source a)
concatSource [Source a]
ss0 = [Source a] -> IO (IORef [Source a])
forall a. a -> IO (IORef a)
newIORef [Source a]
ss0 IO (IORef [Source a])
-> (IORef [Source a] -> IO (Source a)) -> IO (Source a)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Source a -> IO (Source a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Source a -> IO (Source a))
-> (IORef [Source a] -> Source a)
-> IORef [Source a]
-> IO (Source a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IORef [Source a] -> Source a
forall a. IORef [IO (Maybe a)] -> IO (Maybe a)
loop
  where
    loop :: IORef [IO (Maybe a)] -> IO (Maybe a)
loop IORef [IO (Maybe a)]
ref = do
        [IO (Maybe a)]
ss <- IORef [IO (Maybe a)] -> IO [IO (Maybe a)]
forall a. IORef a -> IO a
readIORef IORef [IO (Maybe a)]
ref
        case [IO (Maybe a)]
ss of
          []       -> Maybe a -> IO (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing
          (IO (Maybe a)
input:[IO (Maybe a)]
rest) -> do
              Maybe a
chunk <- IO (Maybe a)
input
              case Maybe a
chunk of
                Just a
_  -> Maybe a -> IO (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
chunk
                Maybe a
_       -> IORef [IO (Maybe a)] -> [IO (Maybe a)] -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef [IO (Maybe a)]
ref [IO (Maybe a)]
rest IO () -> IO (Maybe a) -> IO (Maybe a)
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IORef [IO (Maybe a)] -> IO (Maybe a)
loop IORef [IO (Maybe a)]
ref

-- | Zip two streams into one.
zipSource :: Source a -> Source b -> Source (a,b)
{-# INLINABLE zipSource #-}
zipSource :: Source a -> Source b -> Source (a, b)
zipSource Source a
inputA Source b
inputB = do
    Maybe a
mA <- Source a
inputA
    Maybe b
mB <- Source b
inputB
    case Maybe a
mA of Just a
a -> case Maybe b
mB of Just b
b -> Maybe (a, b) -> Source (a, b)
forall (m :: * -> *) a. Monad m => a -> m a
return ((a, b) -> Maybe (a, b)
forall a. a -> Maybe a
Just (a
a, b
b))
                                    Maybe b
_ -> Maybe (a, b) -> Source (a, b)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (a, b)
forall a. Maybe a
Nothing
               Maybe a
_ -> Maybe (a, b) -> Source (a, b)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (a, b)
forall a. Maybe a
Nothing

-- | Loop read stream and write to output, when input ends flush the output.
--
(>>>>=) :: Source a     -- ^ stream to write
        -> Sink a
        -> IO ()
{-# INLINABLE (>>>>=) #-}
>>>>= :: Source a -> Sink a -> IO ()
(>>>>=) Source a
input (a -> IO ()
write, IO ()
flush) = IO ()
loop
  where
    loop :: IO ()
loop = do
        Maybe a
m <- Source a
input
        case Maybe a
m of
            Just a
x' -> a -> IO ()
write a
x' IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO ()
loop
            Maybe a
_       -> IO ()
flush