-- |
-- Module      : Streamly.Internal.Data.Stream.Chunked
-- Copyright   : (c) 2019 Composewell Technologies
-- License     : BSD3-3-Clause
-- Maintainer  : streamly@composewell.com
-- Stability   : pre-release
-- Portability : GHC
--
-- Combinators to efficiently manipulate streams of immutable arrays.
--
module Streamly.Internal.Data.Stream.Chunked
    (
    -- * Creation
      chunksOf

    -- * Flattening to elements
    , concat
    , concatRev
    , interpose
    , interposeSuffix
    , intercalateSuffix
    , unlines

    -- * Elimination
    -- ** Element Folds
    -- The byte level foldBreak can work as efficiently as the chunk level. We
    -- can flatten the stream to byte stream and use that. But if we want the
    -- remaining stream to be a chunk stream then this could be handy. But it
    -- could also be implemented using parseBreak.
    , foldBreak -- StreamK.foldBreakChunks
    , foldBreakD
    -- The byte level parseBreak cannot work efficiently. Because the stream
    -- will have to be a StreamK for backtracking, StreamK at byte level would
    -- not be efficient.
    , parseBreak -- StreamK.parseBreakChunks
    -- , parseBreakD
    -- , foldManyChunks
    -- , parseManyChunks

    -- ** Array Folds
    -- XXX Use Parser.Chunked instead, need only chunkedParseBreak,
    -- foldBreak can be implemented using parseBreak. Use StreamK.
    , runArrayFold
    , runArrayFoldBreak
    -- , parseArr
    , runArrayParserDBreak -- StreamK.chunkedParseBreak
    , runArrayFoldMany     -- StreamK.chunkedParseMany

    , toArray

    -- * Compaction
    -- We can use something like foldManyChunks, parseManyChunks with a take
    -- fold.
    , lpackArraysChunksOf -- Fold.compactChunks
    , compact -- rechunk, compactChunks

    -- * Splitting
    -- We can use something like foldManyChunks, parseManyChunks with an
    -- appropriate splitting fold.
    , splitOn       -- Stream.rechunkOn
    , splitOnSuffix -- Stream.rechunkOnSuffix
    )
where

#include "ArrayMacros.h"
#include "inline.hs"

import Data.Bifunctor (second)
import Control.Exception (assert)
import Control.Monad.IO.Class (MonadIO(..))
import Data.Proxy (Proxy(..))
import Data.Word (Word8)
import Streamly.Internal.Data.Unboxed (Unbox, peekWith, sizeOf)
import Fusion.Plugin.Types (Fuse(..))
import GHC.Exts (SpecConstrAnnotation(..))
import GHC.Types (SPEC(..))
import Prelude hiding (null, last, (!!), read, concat, unlines)

import Streamly.Data.Fold (Fold)
import Streamly.Internal.Data.Array.Type (Array(..))
import Streamly.Internal.Data.Array.Mut.Type (MutArray)
import Streamly.Internal.Data.Fold.Chunked (ChunkFold(..))
import Streamly.Internal.Data.Parser (ParseError(..))
import Streamly.Internal.Data.Stream.StreamD (Stream)
import Streamly.Internal.Data.Stream.StreamK (StreamK, fromStream, toStream)
import Streamly.Internal.Data.SVar.Type (adaptState, defState)
import Streamly.Internal.Data.Array.Mut.Type
    (allocBytesToElemCount)
import Streamly.Internal.Data.Tuple.Strict (Tuple'(..))

import qualified Streamly.Data.Fold as FL
import qualified Streamly.Internal.Data.Array as A
import qualified Streamly.Internal.Data.Array as Array
import qualified Streamly.Internal.Data.Array.Type as A
import qualified Streamly.Internal.Data.Array.Mut.Type as MA
import qualified Streamly.Internal.Data.Array.Mut.Stream as AS
import qualified Streamly.Internal.Data.Fold.Type as FL (Fold(..), Step(..))
import qualified Streamly.Internal.Data.Parser as PR
import qualified Streamly.Internal.Data.Parser.ParserD as PRD
    (Parser(..), Initial(..))
import qualified Streamly.Internal.Data.Stream.StreamD as D
import qualified Streamly.Internal.Data.Stream.StreamK as K

-- XXX Since these are immutable arrays MonadIO constraint can be removed from
-- most places.

-------------------------------------------------------------------------------
-- Generation
-------------------------------------------------------------------------------

-- | @chunksOf n stream@ groups the elements in the input stream into arrays of
-- @n@ elements each.
--
-- > chunksOf n = Stream.groupsOf n (Array.writeN n)
--
-- /Pre-release/
{-# INLINE chunksOf #-}
chunksOf :: (MonadIO m, Unbox a)
    => Int -> Stream m a -> Stream m (Array a)
chunksOf :: forall (m :: * -> *) a.
(MonadIO m, Unbox a) =>
Int -> Stream m a -> Stream m (Array a)
chunksOf = forall (m :: * -> *) a.
(MonadIO m, Unbox a) =>
Int -> Stream m a -> Stream m (Array a)
A.chunksOf

-------------------------------------------------------------------------------
-- Append
-------------------------------------------------------------------------------

-- XXX efficiently compare two streams of arrays. Two streams can have chunks
-- of different sizes, we can handle that in the stream comparison abstraction.
-- This could be useful e.g. to fast compare whether two files differ.

-- | Convert a stream of arrays into a stream of their elements.
--
-- Same as the following:
--
-- > concat = Stream.unfoldMany Array.read
--
-- @since 0.7.0
{-# INLINE concat #-}
concat :: (Monad m, Unbox a) => Stream m (Array a) -> Stream m a
-- concat m = fromStreamD $ A.flattenArrays (toStreamD m)
-- concat m = fromStreamD $ D.concatMap A.toStreamD (toStreamD m)
concat :: forall (m :: * -> *) a.
(Monad m, Unbox a) =>
Stream m (Array a) -> Stream m a
concat = forall (m :: * -> *) a b.
Monad m =>
Unfold m a b -> Stream m a -> Stream m b
D.unfoldMany forall (m :: * -> *) a. (Monad m, Unbox a) => Unfold m (Array a) a
A.reader

-- | Convert a stream of arrays into a stream of their elements reversing the
-- contents of each array before flattening.
--
-- > concatRev = Stream.unfoldMany Array.readerRev
--
-- @since 0.7.0
{-# INLINE concatRev #-}
concatRev :: (Monad m, Unbox a) => Stream m (Array a) -> Stream m a
-- concatRev m = fromStreamD $ A.flattenArraysRev (toStreamD m)
concatRev :: forall (m :: * -> *) a.
(Monad m, Unbox a) =>
Stream m (Array a) -> Stream m a
concatRev = forall (m :: * -> *) a b.
Monad m =>
Unfold m a b -> Stream m a -> Stream m b
D.unfoldMany forall (m :: * -> *) a. (Monad m, Unbox a) => Unfold m (Array a) a
A.readerRev

-------------------------------------------------------------------------------
-- Intersperse and append
-------------------------------------------------------------------------------

-- | Flatten a stream of arrays after inserting the given element between
-- arrays.
--
-- /Pre-release/
{-# INLINE interpose #-}
interpose :: (Monad m, Unbox a) => a -> Stream m (Array a) -> Stream m a
interpose :: forall (m :: * -> *) a.
(Monad m, Unbox a) =>
a -> Stream m (Array a) -> Stream m a
interpose a
x = forall (m :: * -> *) c b.
Monad m =>
c -> Unfold m b c -> Stream m b -> Stream m c
D.interpose a
x forall (m :: * -> *) a. (Monad m, Unbox a) => Unfold m (Array a) a
A.reader

{-# INLINE intercalateSuffix #-}
intercalateSuffix :: (Monad m, Unbox a)
    => Array a -> Stream m (Array a) -> Stream m a
intercalateSuffix :: forall (m :: * -> *) a.
(Monad m, Unbox a) =>
Array a -> Stream m (Array a) -> Stream m a
intercalateSuffix = forall (m :: * -> *) b c.
Monad m =>
Unfold m b c -> b -> Stream m b -> Stream m c
D.intercalateSuffix forall (m :: * -> *) a. (Monad m, Unbox a) => Unfold m (Array a) a
A.reader

-- | Flatten a stream of arrays appending the given element after each
-- array.
--
-- @since 0.7.0
{-# INLINE interposeSuffix #-}
interposeSuffix :: (Monad m, Unbox a)
    => a -> Stream m (Array a) -> Stream m a
-- interposeSuffix x = fromStreamD . A.unlines x . toStreamD
interposeSuffix :: forall (m :: * -> *) a.
(Monad m, Unbox a) =>
a -> Stream m (Array a) -> Stream m a
interposeSuffix a
x = forall (m :: * -> *) c b.
Monad m =>
c -> Unfold m b c -> Stream m b -> Stream m c
D.interposeSuffix a
x forall (m :: * -> *) a. (Monad m, Unbox a) => Unfold m (Array a) a
A.reader

data FlattenState s =
      OuterLoop s
    | InnerLoop s !MA.MutableByteArray !Int !Int

-- XXX This is a special case of interposeSuffix, can be removed.
-- XXX Remove monadIO constraint
{-# INLINE_NORMAL unlines #-}
unlines :: forall m a. (MonadIO m, Unbox a)
    => a -> D.Stream m (Array a) -> D.Stream m a
unlines :: forall (m :: * -> *) a.
(MonadIO m, Unbox a) =>
a -> Stream m (Array a) -> Stream m a
unlines a
sep (D.Stream State StreamK m (Array a) -> s -> m (Step s (Array a))
step s
state) = forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream forall {m :: * -> *} {a}.
State StreamK m a -> FlattenState s -> m (Step (FlattenState s) a)
step' (forall s. s -> FlattenState s
OuterLoop s
state)
    where
    {-# INLINE_LATE step' #-}
    step' :: State StreamK m a -> FlattenState s -> m (Step (FlattenState s) a)
step' State StreamK m a
gst (OuterLoop s
st) = do
        Step s (Array a)
r <- State StreamK m (Array a) -> s -> m (Step s (Array a))
step (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m a
gst) s
st
        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ case Step s (Array a)
r of
            D.Yield Array{Int
MutableByteArray
arrEnd :: forall a. Array a -> Int
arrStart :: forall a. Array a -> Int
arrContents :: forall a. Array a -> MutableByteArray
arrEnd :: Int
arrStart :: Int
arrContents :: MutableByteArray
..} s
s ->
                forall s a. s -> Step s a
D.Skip (forall s. s -> MutableByteArray -> Int -> Int -> FlattenState s
InnerLoop s
s MutableByteArray
arrContents Int
arrStart Int
arrEnd)
            D.Skip s
s -> forall s a. s -> Step s a
D.Skip (forall s. s -> FlattenState s
OuterLoop s
s)
            Step s (Array a)
D.Stop -> forall s a. Step s a
D.Stop

    step' State StreamK m a
_ (InnerLoop s
st MutableByteArray
_ Int
p Int
end) | Int
p forall a. Eq a => a -> a -> Bool
== Int
end =
        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
D.Yield a
sep forall a b. (a -> b) -> a -> b
$ forall s. s -> FlattenState s
OuterLoop s
st

    step' State StreamK m a
_ (InnerLoop s
st MutableByteArray
contents Int
p Int
end) = do
        a
x <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. Unbox a => MutableByteArray -> Int -> IO a
peekWith MutableByteArray
contents Int
p
        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
D.Yield a
x (forall s. s -> MutableByteArray -> Int -> Int -> FlattenState s
InnerLoop s
st MutableByteArray
contents (INDEX_NEXT(p,a)) end)

-------------------------------------------------------------------------------
-- Compact
-------------------------------------------------------------------------------

-- XXX These would not be needed once we implement compactLEFold, see
-- module Streamly.Internal.Data.Array.Mut.Stream
--
-- XXX Note that this thaws immutable arrays for appending, that may be
-- problematic if multiple users do the same thing, however, immutable arrays
-- would usually have no capacity to append, therefore, a copy will be forced
-- anyway. Confirm this. We can forcefully trim the array capacity before thaw
-- to ensure this.
{-# INLINE_NORMAL packArraysChunksOf #-}
packArraysChunksOf :: (MonadIO m, Unbox a)
    => Int -> D.Stream m (Array a) -> D.Stream m (Array a)
packArraysChunksOf :: forall (m :: * -> *) a.
(MonadIO m, Unbox a) =>
Int -> Stream m (Array a) -> Stream m (Array a)
packArraysChunksOf Int
n Stream m (Array a)
str =
    forall (m :: * -> *) a b.
Monad m =>
(a -> b) -> Stream m a -> Stream m b
D.map forall a. MutArray a -> Array a
A.unsafeFreeze forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
(MonadIO m, Unbox a) =>
Int -> Stream m (MutArray a) -> Stream m (MutArray a)
AS.packArraysChunksOf Int
n forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b.
Monad m =>
(a -> b) -> Stream m a -> Stream m b
D.map forall a. Array a -> MutArray a
A.unsafeThaw Stream m (Array a)
str

-- XXX instead of writing two different versions of this operation, we should
-- write it as a pipe.
--
-- XXX Confirm that immutable arrays won't be modified.
{-# INLINE_NORMAL lpackArraysChunksOf #-}
lpackArraysChunksOf :: (MonadIO m, Unbox a)
    => Int -> Fold m (Array a) () -> Fold m (Array a) ()
lpackArraysChunksOf :: forall (m :: * -> *) a.
(MonadIO m, Unbox a) =>
Int -> Fold m (Array a) () -> Fold m (Array a) ()
lpackArraysChunksOf Int
n Fold m (Array a) ()
fld =
    forall a b (m :: * -> *) r. (a -> b) -> Fold m b r -> Fold m a r
FL.lmap forall a. Array a -> MutArray a
A.unsafeThaw forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
(MonadIO m, Unbox a) =>
Int -> Fold m (MutArray a) () -> Fold m (MutArray a) ()
AS.lpackArraysChunksOf Int
n (forall a b (m :: * -> *) r. (a -> b) -> Fold m b r -> Fold m a r
FL.lmap forall a. MutArray a -> Array a
A.unsafeFreeze Fold m (Array a) ()
fld)

-- | Coalesce adjacent arrays in incoming stream to form bigger arrays of a
-- maximum specified size in bytes.
--
-- @since 0.7.0
{-# INLINE compact #-}
compact :: (MonadIO m, Unbox a)
    => Int -> Stream m (Array a) -> Stream m (Array a)
compact :: forall (m :: * -> *) a.
(MonadIO m, Unbox a) =>
Int -> Stream m (Array a) -> Stream m (Array a)
compact = forall (m :: * -> *) a.
(MonadIO m, Unbox a) =>
Int -> Stream m (Array a) -> Stream m (Array a)
packArraysChunksOf

-------------------------------------------------------------------------------
-- Split
-------------------------------------------------------------------------------

data SplitState s arr
    = Initial s
    | Buffering s arr
    | Splitting s arr
    | Yielding arr (SplitState s arr)
    | Finishing

-- | Split a stream of arrays on a given separator byte, dropping the separator
-- and coalescing all the arrays between two separators into a single array.
--
-- @since 0.7.0
{-# INLINE_NORMAL _splitOn #-}
_splitOn
    :: MonadIO m
    => Word8
    -> D.Stream m (Array Word8)
    -> D.Stream m (Array Word8)
_splitOn :: forall (m :: * -> *).
MonadIO m =>
Word8 -> Stream m (Array Word8) -> Stream m (Array Word8)
_splitOn Word8
byte (D.Stream State StreamK m (Array Word8) -> s -> m (Step s (Array Word8))
step s
state) = forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream State StreamK m (Array Word8)
-> SplitState s (Array Word8)
-> m (Step (SplitState s (Array Word8)) (Array Word8))
step' (forall s arr. s -> SplitState s arr
Initial s
state)

    where

    {-# INLINE_LATE step' #-}
    step' :: State StreamK m (Array Word8)
-> SplitState s (Array Word8)
-> m (Step (SplitState s (Array Word8)) (Array Word8))
step' State StreamK m (Array Word8)
gst (Initial s
st) = do
        Step s (Array Word8)
r <- State StreamK m (Array Word8) -> s -> m (Step s (Array Word8))
step State StreamK m (Array Word8)
gst s
st
        case Step s (Array Word8)
r of
            D.Yield Array Word8
arr s
s -> do
                (Array Word8
arr1, Maybe (Array Word8)
marr2) <- forall (m :: * -> *).
MonadIO m =>
Word8 -> Array Word8 -> m (Array Word8, Maybe (Array Word8))
A.breakOn Word8
byte Array Word8
arr
                forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ case Maybe (Array Word8)
marr2 of
                    Maybe (Array Word8)
Nothing   -> forall s a. s -> Step s a
D.Skip (forall s arr. s -> arr -> SplitState s arr
Buffering s
s Array Word8
arr1)
                    Just Array Word8
arr2 -> forall s a. s -> Step s a
D.Skip (forall s arr. arr -> SplitState s arr -> SplitState s arr
Yielding Array Word8
arr1 (forall s arr. s -> arr -> SplitState s arr
Splitting s
s Array Word8
arr2))
            D.Skip s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
D.Skip (forall s arr. s -> SplitState s arr
Initial s
s)
            Step s (Array Word8)
D.Stop -> forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
D.Stop

    step' State StreamK m (Array Word8)
gst (Buffering s
st Array Word8
buf) = do
        Step s (Array Word8)
r <- State StreamK m (Array Word8) -> s -> m (Step s (Array Word8))
step State StreamK m (Array Word8)
gst s
st
        case Step s (Array Word8)
r of
            D.Yield Array Word8
arr s
s -> do
                (Array Word8
arr1, Maybe (Array Word8)
marr2) <- forall (m :: * -> *).
MonadIO m =>
Word8 -> Array Word8 -> m (Array Word8, Maybe (Array Word8))
A.breakOn Word8
byte Array Word8
arr
                Array Word8
buf' <- forall (m :: * -> *) a.
(MonadIO m, Unbox a) =>
Array a -> Array a -> m (Array a)
A.splice Array Word8
buf Array Word8
arr1
                forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ case Maybe (Array Word8)
marr2 of
                    Maybe (Array Word8)
Nothing -> forall s a. s -> Step s a
D.Skip (forall s arr. s -> arr -> SplitState s arr
Buffering s
s Array Word8
buf')
                    Just Array Word8
x -> forall s a. s -> Step s a
D.Skip (forall s arr. arr -> SplitState s arr -> SplitState s arr
Yielding Array Word8
buf' (forall s arr. s -> arr -> SplitState s arr
Splitting s
s Array Word8
x))
            D.Skip s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
D.Skip (forall s arr. s -> arr -> SplitState s arr
Buffering s
s Array Word8
buf)
            Step s (Array Word8)
D.Stop -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$
                if forall a. Array a -> Int
A.byteLength Array Word8
buf forall a. Eq a => a -> a -> Bool
== Int
0
                then forall s a. Step s a
D.Stop
                else forall s a. s -> Step s a
D.Skip (forall s arr. arr -> SplitState s arr -> SplitState s arr
Yielding Array Word8
buf forall s arr. SplitState s arr
Finishing)

    step' State StreamK m (Array Word8)
_ (Splitting s
st Array Word8
buf) = do
        (Array Word8
arr1, Maybe (Array Word8)
marr2) <- forall (m :: * -> *).
MonadIO m =>
Word8 -> Array Word8 -> m (Array Word8, Maybe (Array Word8))
A.breakOn Word8
byte Array Word8
buf
        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ case Maybe (Array Word8)
marr2 of
                Maybe (Array Word8)
Nothing -> forall s a. s -> Step s a
D.Skip forall a b. (a -> b) -> a -> b
$ forall s arr. s -> arr -> SplitState s arr
Buffering s
st Array Word8
arr1
                Just Array Word8
arr2 -> forall s a. s -> Step s a
D.Skip forall a b. (a -> b) -> a -> b
$ forall s arr. arr -> SplitState s arr -> SplitState s arr
Yielding Array Word8
arr1 (forall s arr. s -> arr -> SplitState s arr
Splitting s
st Array Word8
arr2)

    step' State StreamK m (Array Word8)
_ (Yielding Array Word8
arr SplitState s (Array Word8)
next) = forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
D.Yield Array Word8
arr SplitState s (Array Word8)
next
    step' State StreamK m (Array Word8)
_ SplitState s (Array Word8)
Finishing = forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
D.Stop

-- XXX Remove MonadIO constraint.
-- | Split a stream of arrays on a given separator byte, dropping the separator
-- and coalescing all the arrays between two separators into a single array.
--
-- @since 0.7.0
{-# INLINE splitOn #-}
splitOn
    :: (MonadIO m)
    => Word8
    -> Stream m (Array Word8)
    -> Stream m (Array Word8)
splitOn :: forall (m :: * -> *).
MonadIO m =>
Word8 -> Stream m (Array Word8) -> Stream m (Array Word8)
splitOn Word8
byte = forall (m :: * -> *) (f :: * -> *) a.
Monad m =>
(f a -> m (f a, Maybe (f a)))
-> (f a -> f a -> m (f a)) -> Stream m (f a) -> Stream m (f a)
D.splitInnerBy (forall (m :: * -> *).
MonadIO m =>
Word8 -> Array Word8 -> m (Array Word8, Maybe (Array Word8))
A.breakOn Word8
byte) forall (m :: * -> *) a.
(MonadIO m, Unbox a) =>
Array a -> Array a -> m (Array a)
A.splice

{-# INLINE splitOnSuffix #-}
splitOnSuffix
    :: (MonadIO m)
    => Word8
    -> Stream m (Array Word8)
    -> Stream m (Array Word8)
-- splitOn byte s = fromStreamD $ A.splitOn byte $ toStreamD s
splitOnSuffix :: forall (m :: * -> *).
MonadIO m =>
Word8 -> Stream m (Array Word8) -> Stream m (Array Word8)
splitOnSuffix Word8
byte = forall (m :: * -> *) (f :: * -> *) a.
(Monad m, Eq (f a), Monoid (f a)) =>
(f a -> m (f a, Maybe (f a)))
-> (f a -> f a -> m (f a)) -> Stream m (f a) -> Stream m (f a)
D.splitInnerBySuffix (forall (m :: * -> *).
MonadIO m =>
Word8 -> Array Word8 -> m (Array Word8, Maybe (Array Word8))
A.breakOn Word8
byte) forall (m :: * -> *) a.
(MonadIO m, Unbox a) =>
Array a -> Array a -> m (Array a)
A.splice

-------------------------------------------------------------------------------
-- Elimination - Running folds
-------------------------------------------------------------------------------

{-# INLINE_NORMAL foldBreakD #-}
foldBreakD :: forall m a b. (MonadIO m, Unbox a) =>
    Fold m a b -> D.Stream m (Array a) -> m (b, D.Stream m (Array a))
foldBreakD :: forall (m :: * -> *) a b.
(MonadIO m, Unbox a) =>
Fold m a b -> Stream m (Array a) -> m (b, Stream m (Array a))
foldBreakD (FL.Fold s -> a -> m (Step s b)
fstep m (Step s b)
initial s -> m b
extract) stream :: Stream m (Array a)
stream@(D.Stream State StreamK m (Array a) -> s -> m (Step s (Array a))
step s
state) = do
    Step s b
res <- m (Step s b)
initial
    case Step s b
res of
        FL.Partial s
fs -> SPEC -> s -> s -> m (b, Stream m (Array a))
go SPEC
SPEC s
state s
fs
        FL.Done b
fb -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$! (b
fb, Stream m (Array a)
stream)

    where

    {-# INLINE go #-}
    go :: SPEC -> s -> s -> m (b, Stream m (Array a))
go !SPEC
_ s
st !s
fs = do
        Step s (Array a)
r <- State StreamK m (Array a) -> s -> m (Step s (Array a))
step forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState s
st
        case Step s (Array a)
r of
            D.Yield (Array MutableByteArray
contents Int
start Int
end) s
s ->
                let fp :: Tuple' Int MutableByteArray
fp = forall a b. a -> b -> Tuple' a b
Tuple' Int
end MutableByteArray
contents
                 in SPEC
-> s
-> Tuple' Int MutableByteArray
-> Int
-> s
-> m (b, Stream m (Array a))
goArray SPEC
SPEC s
s Tuple' Int MutableByteArray
fp Int
start s
fs
            D.Skip s
s -> SPEC -> s -> s -> m (b, Stream m (Array a))
go SPEC
SPEC s
s s
fs
            Step s (Array a)
D.Stop -> do
                b
b <- s -> m b
extract s
fs
                forall (m :: * -> *) a. Monad m => a -> m a
return (b
b, forall (m :: * -> *) a. Applicative m => Stream m a
D.nil)

    goArray :: SPEC
-> s
-> Tuple' Int MutableByteArray
-> Int
-> s
-> m (b, Stream m (Array a))
goArray !SPEC
_ s
s (Tuple' Int
end MutableByteArray
_) !Int
cur !s
fs
        | Int
cur forall a. Eq a => a -> a -> Bool
== Int
end = do
            SPEC -> s -> s -> m (b, Stream m (Array a))
go SPEC
SPEC s
s s
fs
    goArray !SPEC
_ s
st fp :: Tuple' Int MutableByteArray
fp@(Tuple' Int
end MutableByteArray
contents) !Int
cur !s
fs = do
        a
x <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. Unbox a => MutableByteArray -> Int -> IO a
peekWith MutableByteArray
contents Int
cur
        Step s b
res <- s -> a -> m (Step s b)
fstep s
fs a
x
        let next :: Int
next = INDEX_NEXT(cur,a)
        case Step s b
res of
            FL.Done b
b -> do
                let arr :: Array a
arr = forall a. MutableByteArray -> Int -> Int -> Array a
Array MutableByteArray
contents Int
next Int
end
                forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$! (b
b, forall (m :: * -> *) a.
Applicative m =>
a -> Stream m a -> Stream m a
D.cons forall {a}. Array a
arr (forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream State StreamK m (Array a) -> s -> m (Step s (Array a))
step s
st))
            FL.Partial s
fs1 -> SPEC
-> s
-> Tuple' Int MutableByteArray
-> Int
-> s
-> m (b, Stream m (Array a))
goArray SPEC
SPEC s
st Tuple' Int MutableByteArray
fp Int
next s
fs1

{-# INLINE_NORMAL foldBreakK #-}
foldBreakK :: forall m a b. (MonadIO m, Unbox a) =>
    Fold m a b -> K.StreamK m (Array a) -> m (b, K.StreamK m (Array a))
foldBreakK :: forall (m :: * -> *) a b.
(MonadIO m, Unbox a) =>
Fold m a b -> StreamK m (Array a) -> m (b, StreamK m (Array a))
foldBreakK (FL.Fold s -> a -> m (Step s b)
fstep m (Step s b)
initial s -> m b
extract) StreamK m (Array a)
stream = do
    Step s b
res <- m (Step s b)
initial
    case Step s b
res of
        FL.Partial s
fs -> forall {a}. s -> StreamK m (Array a) -> m (b, StreamK m (Array a))
go s
fs StreamK m (Array a)
stream
        FL.Done b
fb -> forall (m :: * -> *) a. Monad m => a -> m a
return (b
fb, StreamK m (Array a)
stream)

    where

    {-# INLINE go #-}
    go :: s -> StreamK m (Array a) -> m (b, StreamK m (Array a))
go !s
fs StreamK m (Array a)
st = do
        let stop :: m (b, StreamK m a)
stop = (, forall (m :: * -> *) a. StreamK m a
K.nil) forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> s -> m b
extract s
fs
            single :: Array a -> m (b, StreamK m (Array a))
single Array a
a = forall {a}.
Array a -> StreamK m (Array a) -> m (b, StreamK m (Array a))
yieldk Array a
a forall (m :: * -> *) a. StreamK m a
K.nil
            yieldk :: Array a -> StreamK m (Array a) -> m (b, StreamK m (Array a))
yieldk (Array MutableByteArray
contents Int
start Int
end) StreamK m (Array a)
r =
                let fp :: Tuple' Int MutableByteArray
fp = forall a b. a -> b -> Tuple' a b
Tuple' Int
end MutableByteArray
contents
                 in s
-> StreamK m (Array a)
-> Tuple' Int MutableByteArray
-> Int
-> m (b, StreamK m (Array a))
goArray s
fs StreamK m (Array a)
r Tuple' Int MutableByteArray
fp Int
start
         in forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStream forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState forall {a}.
Array a -> StreamK m (Array a) -> m (b, StreamK m (Array a))
yieldk forall {a}. Array a -> m (b, StreamK m (Array a))
single forall {m :: * -> *} {a}. m (b, StreamK m a)
stop StreamK m (Array a)
st

    goArray :: s
-> StreamK m (Array a)
-> Tuple' Int MutableByteArray
-> Int
-> m (b, StreamK m (Array a))
goArray !s
fs StreamK m (Array a)
st (Tuple' Int
end MutableByteArray
_) !Int
cur
        | Int
cur forall a. Eq a => a -> a -> Bool
== Int
end = do
            s -> StreamK m (Array a) -> m (b, StreamK m (Array a))
go s
fs StreamK m (Array a)
st
    goArray !s
fs StreamK m (Array a)
st fp :: Tuple' Int MutableByteArray
fp@(Tuple' Int
end MutableByteArray
contents) !Int
cur = do
        a
x <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. Unbox a => MutableByteArray -> Int -> IO a
peekWith MutableByteArray
contents Int
cur
        Step s b
res <- s -> a -> m (Step s b)
fstep s
fs a
x
        let next :: Int
next = INDEX_NEXT(cur,a)
        case Step s b
res of
            FL.Done b
b -> do
                let arr :: Array a
arr = forall a. MutableByteArray -> Int -> Int -> Array a
Array MutableByteArray
contents Int
next Int
end
                forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$! (b
b, forall a (m :: * -> *). a -> StreamK m a -> StreamK m a
K.cons forall {a}. Array a
arr StreamK m (Array a)
st)
            FL.Partial s
fs1 -> s
-> StreamK m (Array a)
-> Tuple' Int MutableByteArray
-> Int
-> m (b, StreamK m (Array a))
goArray s
fs1 StreamK m (Array a)
st Tuple' Int MutableByteArray
fp Int
next

-- | Fold an array stream using the supplied 'Fold'. Returns the fold result
-- and the unconsumed stream.
--
-- > foldBreak f = runArrayFoldBreak (ChunkFold.fromFold f)
--
-- /Internal/
--
{-# INLINE_NORMAL foldBreak #-}
foldBreak ::
       (MonadIO m, Unbox a)
    => Fold m a b
    -> StreamK m (A.Array a)
    -> m (b, StreamK m (A.Array a))
-- foldBreak f s = fmap fromStreamD <$> foldBreakD f (toStreamD s)
foldBreak :: forall (m :: * -> *) a b.
(MonadIO m, Unbox a) =>
Fold m a b -> StreamK m (Array a) -> m (b, StreamK m (Array a))
foldBreak = forall (m :: * -> *) a b.
(MonadIO m, Unbox a) =>
Fold m a b -> StreamK m (Array a) -> m (b, StreamK m (Array a))
foldBreakK
-- If foldBreak performs better than runArrayFoldBreak we can use a rewrite
-- rule to rewrite runArrayFoldBreak to fold.
-- foldBreak f = runArrayFoldBreak (ChunkFold.fromFold f)

-------------------------------------------------------------------------------
-- Fold to a single Array
-------------------------------------------------------------------------------

-- When we have to take an array partially, take the last part of the array.
{-# INLINE takeArrayListRev #-}
takeArrayListRev :: forall a. Unbox a => Int -> [Array a] -> [Array a]
takeArrayListRev :: forall a. Unbox a => Int -> [Array a] -> [Array a]
takeArrayListRev = forall a. Unbox a => Int -> [Array a] -> [Array a]
go

    where

    go :: Int -> [Array a] -> [Array a]
go Int
_ [] = []
    go Int
n [Array a]
_ | Int
n forall a. Ord a => a -> a -> Bool
<= Int
0 = []
    go Int
n (Array a
x:[Array a]
xs) =
        let len :: Int
len = forall a. Unbox a => Array a -> Int
Array.length Array a
x
        in if Int
n forall a. Ord a => a -> a -> Bool
> Int
len
           then Array a
x forall a. a -> [a] -> [a]
: Int -> [Array a] -> [Array a]
go (Int
n forall a. Num a => a -> a -> a
- Int
len) [Array a]
xs
           else if Int
n forall a. Eq a => a -> a -> Bool
== Int
len
           then [Array a
x]
           else let !(Array MutableByteArray
contents Int
_ Int
end) = Array a
x
                    !start :: Int
start = Int
end forall a. Num a => a -> a -> a
- (Int
n forall a. Num a => a -> a -> a
* SIZE_OF(a))
                 in [forall a. MutableByteArray -> Int -> Int -> Array a
Array MutableByteArray
contents Int
start Int
end]

-- When we have to take an array partially, take the last part of the array in
-- the first split.
{-# INLINE splitAtArrayListRev #-}
splitAtArrayListRev ::
    forall a. Unbox a => Int -> [Array a] -> ([Array a],[Array a])
splitAtArrayListRev :: forall a. Unbox a => Int -> [Array a] -> ([Array a], [Array a])
splitAtArrayListRev Int
n [Array a]
ls
  | Int
n forall a. Ord a => a -> a -> Bool
<= Int
0 = ([], [Array a]
ls)
  | Bool
otherwise = Int -> [Array a] -> ([Array a], [Array a])
go Int
n [Array a]
ls
    where
        go :: Int -> [Array a] -> ([Array a], [Array a])
        go :: Int -> [Array a] -> ([Array a], [Array a])
go Int
_  []     = ([], [])
        go Int
m (Array a
x:[Array a]
xs) =
            let len :: Int
len = forall a. Unbox a => Array a -> Int
Array.length Array a
x
                ([Array a]
xs', [Array a]
xs'') = Int -> [Array a] -> ([Array a], [Array a])
go (Int
m forall a. Num a => a -> a -> a
- Int
len) [Array a]
xs
             in if Int
m forall a. Ord a => a -> a -> Bool
> Int
len
                then (Array a
xforall a. a -> [a] -> [a]
:[Array a]
xs', [Array a]
xs'')
                else if Int
m forall a. Eq a => a -> a -> Bool
== Int
len
                then ([Array a
x],[Array a]
xs)
                else let !(Array MutableByteArray
contents Int
start Int
end) = Array a
x
                         end1 :: Int
end1 = Int
end forall a. Num a => a -> a -> a
- (Int
m forall a. Num a => a -> a -> a
* SIZE_OF(a))
                         arr2 :: Array a
arr2 = forall a. MutableByteArray -> Int -> Int -> Array a
Array MutableByteArray
contents Int
start Int
end1
                         arr1 :: Array a
arr1 = forall a. MutableByteArray -> Int -> Int -> Array a
Array MutableByteArray
contents Int
end1 Int
end
                      in ([forall {a}. Array a
arr1], forall {a}. Array a
arr2forall a. a -> [a] -> [a]
:[Array a]
xs)

-------------------------------------------------------------------------------
-- Fold to a single Array
-------------------------------------------------------------------------------

-- XXX Both of these implementations of splicing seem to perform equally well.
-- We need to perform benchmarks over a range of sizes though.

-- CAUTION! length must more than equal to lengths of all the arrays in the
-- stream.
{-# INLINE spliceArraysLenUnsafe #-}
spliceArraysLenUnsafe :: (MonadIO m, Unbox a)
    => Int -> Stream m (MutArray a) -> m (MutArray a)
spliceArraysLenUnsafe :: forall (m :: * -> *) a.
(MonadIO m, Unbox a) =>
Int -> Stream m (MutArray a) -> m (MutArray a)
spliceArraysLenUnsafe Int
len Stream m (MutArray a)
buffered = do
    MutArray a
arr <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
(MonadIO m, Unbox a) =>
Int -> m (MutArray a)
MA.newPinned Int
len
    forall (m :: * -> *) b a.
Monad m =>
(b -> a -> m b) -> m b -> Stream m a -> m b
D.foldlM' forall (m :: * -> *) a.
MonadIO m =>
MutArray a -> MutArray a -> m (MutArray a)
MA.spliceUnsafe (forall (m :: * -> *) a. Monad m => a -> m a
return MutArray a
arr) Stream m (MutArray a)
buffered

{-# INLINE _spliceArrays #-}
_spliceArrays :: (MonadIO m, Unbox a)
    => Stream m (Array a) -> m (Array a)
_spliceArrays :: forall (m :: * -> *) a.
(MonadIO m, Unbox a) =>
Stream m (Array a) -> m (Array a)
_spliceArrays Stream m (Array a)
s = do
    StreamK m (Array a)
buffered <- forall (m :: * -> *) a b.
Monad m =>
(a -> b -> b) -> b -> Stream m a -> m b
D.foldr forall a (m :: * -> *). a -> StreamK m a -> StreamK m a
K.cons forall (m :: * -> *) a. StreamK m a
K.nil Stream m (Array a)
s
    Int
len <- forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> StreamK m a -> m b
K.fold forall (m :: * -> *) a. (Monad m, Num a) => Fold m a a
FL.sum (forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall a. Unbox a => Array a -> Int
Array.length StreamK m (Array a)
buffered)
    MutArray a
arr <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
(MonadIO m, Unbox a) =>
Int -> m (MutArray a)
MA.newPinned Int
len
    MutArray a
final <- forall (m :: * -> *) b a.
Monad m =>
(b -> a -> m b) -> m b -> Stream m a -> m b
D.foldlM' forall {m :: * -> *} {a}.
MonadIO m =>
MutArray a -> Array a -> m (MutArray a)
writeArr (forall (m :: * -> *) a. Monad m => a -> m a
return MutArray a
arr) (forall (m :: * -> *) a. Applicative m => StreamK m a -> Stream m a
toStream StreamK m (Array a)
buffered)
    forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a. MutArray a -> Array a
A.unsafeFreeze MutArray a
final

    where

    writeArr :: MutArray a -> Array a -> m (MutArray a)
writeArr MutArray a
dst Array a
arr = forall (m :: * -> *) a.
MonadIO m =>
MutArray a -> MutArray a -> m (MutArray a)
MA.spliceUnsafe MutArray a
dst (forall a. Array a -> MutArray a
A.unsafeThaw Array a
arr)

{-# INLINE _spliceArraysBuffered #-}
_spliceArraysBuffered :: (MonadIO m, Unbox a)
    => Stream m (Array a) -> m (Array a)
_spliceArraysBuffered :: forall (m :: * -> *) a.
(MonadIO m, Unbox a) =>
Stream m (Array a) -> m (Array a)
_spliceArraysBuffered Stream m (Array a)
s = do
    StreamK m (Array a)
buffered <- forall (m :: * -> *) a b.
Monad m =>
(a -> b -> b) -> b -> Stream m a -> m b
D.foldr forall a (m :: * -> *). a -> StreamK m a -> StreamK m a
K.cons forall (m :: * -> *) a. StreamK m a
K.nil Stream m (Array a)
s
    Int
len <- forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> StreamK m a -> m b
K.fold forall (m :: * -> *) a. (Monad m, Num a) => Fold m a a
FL.sum (forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall a. Unbox a => Array a -> Int
Array.length StreamK m (Array a)
buffered)
    forall a. MutArray a -> Array a
A.unsafeFreeze forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
        forall (m :: * -> *) a.
(MonadIO m, Unbox a) =>
Int -> Stream m (MutArray a) -> m (MutArray a)
spliceArraysLenUnsafe Int
len (forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall a. Array a -> MutArray a
A.unsafeThaw (forall (m :: * -> *) a. Applicative m => StreamK m a -> Stream m a
toStream StreamK m (Array a)
buffered))

{-# INLINE spliceArraysRealloced #-}
spliceArraysRealloced :: forall m a. (MonadIO m, Unbox a)
    => Stream m (Array a) -> m (Array a)
spliceArraysRealloced :: forall (m :: * -> *) a.
(MonadIO m, Unbox a) =>
Stream m (Array a) -> m (Array a)
spliceArraysRealloced Stream m (Array a)
s = do
    let n :: Int
n = forall a. Unbox a => a -> Int -> Int
allocBytesToElemCount (forall a. HasCallStack => a
undefined :: a) (Int
4 forall a. Num a => a -> a -> a
* Int
1024)
        idst :: m (MutArray a)
idst = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
(MonadIO m, Unbox a) =>
Int -> m (MutArray a)
MA.newPinned Int
n

    MutArray a
arr <- forall (m :: * -> *) b a.
Monad m =>
(b -> a -> m b) -> m b -> Stream m a -> m b
D.foldlM' forall (m :: * -> *) a.
(MonadIO m, Unbox a) =>
MutArray a -> MutArray a -> m (MutArray a)
MA.spliceExp m (MutArray a)
idst (forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall a. Array a -> MutArray a
A.unsafeThaw Stream m (Array a)
s)
    forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. MutArray a -> Array a
A.unsafeFreeze forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall (m :: * -> *) a.
(MonadIO m, Unbox a) =>
MutArray a -> m (MutArray a)
MA.rightSize MutArray a
arr

-- XXX This should just be "fold A.write"
--
-- | Given a stream of arrays, splice them all together to generate a single
-- array. The stream must be /finite/.
--
-- @since 0.7.0
{-# INLINE toArray #-}
toArray :: (MonadIO m, Unbox a) => Stream m (Array a) -> m (Array a)
toArray :: forall (m :: * -> *) a.
(MonadIO m, Unbox a) =>
Stream m (Array a) -> m (Array a)
toArray = forall (m :: * -> *) a.
(MonadIO m, Unbox a) =>
Stream m (Array a) -> m (Array a)
spliceArraysRealloced
-- spliceArrays = _spliceArraysBuffered

-- exponentially increasing sizes of the chunks upto the max limit.
-- XXX this will be easier to implement with parsers/terminating folds
-- With this we should be able to reduce the number of chunks/allocations.
-- The reallocation/copy based toArray can also be implemented using this.
--
{-
{-# INLINE toArraysInRange #-}
toArraysInRange :: (MonadIO m, Unbox a)
    => Int -> Int -> Fold m (Array a) b -> Fold m a b
toArraysInRange low high (Fold step initial extract) =
-}

{-
-- | Fold the input to a pure buffered stream (List) of arrays.
{-# INLINE _toArraysOf #-}
_toArraysOf :: (MonadIO m, Unbox a)
    => Int -> Fold m a (Stream Identity (Array a))
_toArraysOf n = FL.groupsOf n (A.writeNF n) FL.toStream
-}

-------------------------------------------------------------------------------
-- Elimination - running element parsers
-------------------------------------------------------------------------------

-- GHC parser does not accept {-# ANN type [] NoSpecConstr #-}, so we need
-- to make a newtype.
{-# ANN type List NoSpecConstr #-}
newtype List a = List {forall a. List a -> [a]
getList :: [a]}

{-
-- This can be generalized to any type provided it can be unfolded to a stream
-- and it can be combined using a semigroup operation.
--
-- XXX This should be written using CPS (as parseK) if we want it to scale wrt
-- to the number of times it can be called on the same stream.
{-# INLINE_NORMAL parseBreakD #-}
parseBreakD ::
       forall m a b. (MonadIO m, MonadThrow m, Unbox a)
    => PRD.Parser a m b
    -> D.Stream m (Array.Array a)
    -> m (b, D.Stream m (Array.Array a))
parseBreakD
    (PRD.Parser pstep initial extract) stream@(D.Stream step state) = do

    res <- initial
    case res of
        PRD.IPartial s -> go SPEC state (List []) s
        PRD.IDone b -> return (b, stream)
        PRD.IError err -> throwM $ ParseError err

    where

    -- "backBuf" contains last few items in the stream that we may have to
    -- backtrack to.
    --
    -- XXX currently we are using a dumb list based approach for backtracking
    -- buffer. This can be replaced by a sliding/ring buffer using Data.Array.
    -- That will allow us more efficient random back and forth movement.
    go !_ st backBuf !pst = do
        r <- step defState st
        case r of
            D.Yield (Array contents start end) s ->
                gobuf SPEC s backBuf
                    (Tuple' end contents) start pst
            D.Skip s -> go SPEC s backBuf pst
            D.Stop -> do
                b <- extract pst
                return (b, D.nil)

    -- Use strictness on "cur" to keep it unboxed
    gobuf !_ s backBuf (Tuple' end _) !cur !pst
        | cur == end = do
            go SPEC s backBuf pst
    gobuf !_ s backBuf fp@(Tuple' end contents) !cur !pst = do
        x <- liftIO $ peekWith contents cur
        pRes <- pstep pst x
        let next = INDEX_NEXT(cur,a)
        case pRes of
            PR.Partial 0 pst1 ->
                 gobuf SPEC s (List []) fp next pst1
            PR.Partial n pst1 -> do
                assert (n <= Prelude.length (x:getList backBuf)) (return ())
                let src0 = Prelude.take n (x:getList backBuf)
                    arr0 = A.fromListN n (Prelude.reverse src0)
                    arr1 = Array contents next end
                    src = arr0 <> arr1
                let !(Array cont1 start end1) = src
                    fp1 = Tuple' end1 cont1
                gobuf SPEC s (List []) fp1 start pst1
            PR.Continue 0 pst1 ->
                gobuf SPEC s (List (x:getList backBuf)) fp next pst1
            PR.Continue n pst1 -> do
                assert (n <= Prelude.length (x:getList backBuf)) (return ())
                let (src0, buf1) = splitAt n (x:getList backBuf)
                    arr0 = A.fromListN n (Prelude.reverse src0)
                    arr1 = Array contents next end
                    src = arr0 <> arr1
                let !(Array cont1 start end1) = src
                    fp1 = Tuple' end1 cont1
                gobuf SPEC s (List buf1) fp1 start pst1
            PR.Done 0 b -> do
                let arr = Array contents next end
                return (b, D.cons arr (D.Stream step s))
            PR.Done n b -> do
                assert (n <= Prelude.length (x:getList backBuf)) (return ())
                let src0 = Prelude.take n (x:getList backBuf)
                    -- XXX create the array in reverse instead
                    arr0 = A.fromListN n (Prelude.reverse src0)
                    arr1 = Array contents next end
                    -- XXX Use StreamK to avoid adding arbitrary layers of
                    -- constructors every time.
                    str = D.cons arr0 (D.cons arr1 (D.Stream step s))
                return (b, str)
            PR.Error err -> throwM $ ParseError err
-}

{-# INLINE_NORMAL parseBreakK #-}
parseBreakK ::
       forall m a b. (MonadIO m, Unbox a)
    => PRD.Parser a m b
    -> K.StreamK m (Array.Array a)
    -> m (Either ParseError b, K.StreamK m (Array.Array a))
parseBreakK :: forall (m :: * -> *) a b.
(MonadIO m, Unbox a) =>
Parser a m b
-> StreamK m (Array a)
-> m (Either ParseError b, StreamK m (Array a))
parseBreakK (PRD.Parser s -> a -> m (Step s b)
pstep m (Initial s b)
initial s -> m (Step s b)
extract) StreamK m (Array a)
stream = do
    Initial s b
res <- m (Initial s b)
initial
    case Initial s b
res of
        PRD.IPartial s
s -> s
-> StreamK m (Array a)
-> [a]
-> m (Either ParseError b, StreamK m (Array a))
go s
s StreamK m (Array a)
stream []
        PRD.IDone b
b -> forall (m :: * -> *) a. Monad m => a -> m a
return (forall a b. b -> Either a b
Right b
b, StreamK m (Array a)
stream)
        PRD.IError String
err -> forall (m :: * -> *) a. Monad m => a -> m a
return (forall a b. a -> Either a b
Left (String -> ParseError
ParseError String
err), StreamK m (Array a)
stream)

    where

    -- "backBuf" contains last few items in the stream that we may have to
    -- backtrack to.
    --
    -- XXX currently we are using a dumb list based approach for backtracking
    -- buffer. This can be replaced by a sliding/ring buffer using Data.Array.
    -- That will allow us more efficient random back and forth movement.
    go :: s
-> StreamK m (Array a)
-> [a]
-> m (Either ParseError b, StreamK m (Array a))
go !s
pst StreamK m (Array a)
st [a]
backBuf = do
        let stop :: m (Either ParseError b, StreamK m (Array a))
stop = forall {m :: * -> *}.
s -> [a] -> m (Either ParseError b, StreamK m (Array a))
goStop s
pst [a]
backBuf -- (, K.nil) <$> extract pst
            single :: Array a -> m (Either ParseError b, StreamK m (Array a))
single Array a
a = Array a
-> StreamK m (Array a)
-> m (Either ParseError b, StreamK m (Array a))
yieldk Array a
a forall (m :: * -> *) a. StreamK m a
K.nil
            yieldk :: Array a
-> StreamK m (Array a)
-> m (Either ParseError b, StreamK m (Array a))
yieldk Array a
arr StreamK m (Array a)
r = s
-> [a]
-> StreamK m (Array a)
-> Array a
-> m (Either ParseError b, StreamK m (Array a))
goArray s
pst [a]
backBuf StreamK m (Array a)
r Array a
arr
         in forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStream forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState Array a
-> StreamK m (Array a)
-> m (Either ParseError b, StreamK m (Array a))
yieldk Array a -> m (Either ParseError b, StreamK m (Array a))
single forall {m :: * -> *}. m (Either ParseError b, StreamK m (Array a))
stop StreamK m (Array a)
st

    -- Use strictness on "cur" to keep it unboxed
    goArray :: s
-> [a]
-> StreamK m (Array a)
-> Array a
-> m (Either ParseError b, StreamK m (Array a))
goArray !s
pst [a]
backBuf StreamK m (Array a)
st (Array MutableByteArray
_ Int
cur Int
end) | Int
cur forall a. Eq a => a -> a -> Bool
== Int
end = s
-> StreamK m (Array a)
-> [a]
-> m (Either ParseError b, StreamK m (Array a))
go s
pst StreamK m (Array a)
st [a]
backBuf
    goArray !s
pst [a]
backBuf StreamK m (Array a)
st (Array MutableByteArray
contents Int
cur Int
end) = do
        a
x <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. Unbox a => MutableByteArray -> Int -> IO a
peekWith MutableByteArray
contents Int
cur
        Step s b
pRes <- s -> a -> m (Step s b)
pstep s
pst a
x
        let next :: Int
next = INDEX_NEXT(cur,a)
        case Step s b
pRes of
            PR.Partial Int
0 s
s ->
                 s
-> [a]
-> StreamK m (Array a)
-> Array a
-> m (Either ParseError b, StreamK m (Array a))
goArray s
s [] StreamK m (Array a)
st (forall a. MutableByteArray -> Int -> Int -> Array a
Array MutableByteArray
contents Int
next Int
end)
            PR.Partial Int
n s
s -> do
                forall a. HasCallStack => Bool -> a -> a
assert (Int
n forall a. Ord a => a -> a -> Bool
<= forall (t :: * -> *) a. Foldable t => t a -> Int
Prelude.length (a
xforall a. a -> [a] -> [a]
:[a]
backBuf)) (forall (m :: * -> *) a. Monad m => a -> m a
return ())
                let src0 :: [a]
src0 = forall a. Int -> [a] -> [a]
Prelude.take Int
n (a
xforall a. a -> [a] -> [a]
:[a]
backBuf)
                    arr0 :: Array a
arr0 = forall a. Unbox a => Int -> [a] -> Array a
A.fromListN Int
n (forall a. [a] -> [a]
Prelude.reverse [a]
src0)
                    arr1 :: Array a
arr1 = forall a. MutableByteArray -> Int -> Int -> Array a
Array MutableByteArray
contents Int
next Int
end
                    src :: Array a
src = Array a
arr0 forall a. Semigroup a => a -> a -> a
<> forall {a}. Array a
arr1
                s
-> [a]
-> StreamK m (Array a)
-> Array a
-> m (Either ParseError b, StreamK m (Array a))
goArray s
s [] StreamK m (Array a)
st Array a
src
            PR.Continue Int
0 s
s ->
                s
-> [a]
-> StreamK m (Array a)
-> Array a
-> m (Either ParseError b, StreamK m (Array a))
goArray s
s (a
xforall a. a -> [a] -> [a]
:[a]
backBuf) StreamK m (Array a)
st (forall a. MutableByteArray -> Int -> Int -> Array a
Array MutableByteArray
contents Int
next Int
end)
            PR.Continue Int
n s
s -> do
                forall a. HasCallStack => Bool -> a -> a
assert (Int
n forall a. Ord a => a -> a -> Bool
<= forall (t :: * -> *) a. Foldable t => t a -> Int
Prelude.length (a
xforall a. a -> [a] -> [a]
:[a]
backBuf)) (forall (m :: * -> *) a. Monad m => a -> m a
return ())
                let ([a]
src0, [a]
buf1) = forall a. Int -> [a] -> ([a], [a])
splitAt Int
n (a
xforall a. a -> [a] -> [a]
:[a]
backBuf)
                    arr0 :: Array a
arr0 = forall a. Unbox a => Int -> [a] -> Array a
A.fromListN Int
n (forall a. [a] -> [a]
Prelude.reverse [a]
src0)
                    arr1 :: Array a
arr1 = forall a. MutableByteArray -> Int -> Int -> Array a
Array MutableByteArray
contents Int
next Int
end
                    src :: Array a
src = Array a
arr0 forall a. Semigroup a => a -> a -> a
<> forall {a}. Array a
arr1
                s
-> [a]
-> StreamK m (Array a)
-> Array a
-> m (Either ParseError b, StreamK m (Array a))
goArray s
s [a]
buf1 StreamK m (Array a)
st Array a
src
            PR.Done Int
0 b
b -> do
                let arr :: Array a
arr = forall a. MutableByteArray -> Int -> Int -> Array a
Array MutableByteArray
contents Int
next Int
end
                forall (m :: * -> *) a. Monad m => a -> m a
return (forall a b. b -> Either a b
Right b
b, forall a (m :: * -> *). a -> StreamK m a -> StreamK m a
K.cons forall {a}. Array a
arr StreamK m (Array a)
st)
            PR.Done Int
n b
b -> do
                forall a. HasCallStack => Bool -> a -> a
assert (Int
n forall a. Ord a => a -> a -> Bool
<= forall (t :: * -> *) a. Foldable t => t a -> Int
Prelude.length (a
xforall a. a -> [a] -> [a]
:[a]
backBuf)) (forall (m :: * -> *) a. Monad m => a -> m a
return ())
                let src0 :: [a]
src0 = forall a. Int -> [a] -> [a]
Prelude.take Int
n (a
xforall a. a -> [a] -> [a]
:[a]
backBuf)
                    -- XXX Use fromListRevN once implemented
                    -- arr0 = A.fromListRevN n src0
                    arr0 :: Array a
arr0 = forall a. Unbox a => Int -> [a] -> Array a
A.fromListN Int
n (forall a. [a] -> [a]
Prelude.reverse [a]
src0)
                    arr1 :: Array a
arr1 = forall a. MutableByteArray -> Int -> Int -> Array a
Array MutableByteArray
contents Int
next Int
end
                    str :: StreamK m (Array a)
str = forall a (m :: * -> *). a -> StreamK m a -> StreamK m a
K.cons Array a
arr0 (forall a (m :: * -> *). a -> StreamK m a -> StreamK m a
K.cons forall {a}. Array a
arr1 StreamK m (Array a)
st)
                forall (m :: * -> *) a. Monad m => a -> m a
return (forall a b. b -> Either a b
Right b
b, StreamK m (Array a)
str)
            PR.Error String
err -> do
                let str :: StreamK m (Array a)
str = forall a (m :: * -> *). a -> StreamK m a -> StreamK m a
K.cons (forall a. MutableByteArray -> Int -> Int -> Array a
Array MutableByteArray
contents Int
cur Int
end) StreamK m (Array a)
stream
                forall (m :: * -> *) a. Monad m => a -> m a
return (forall a b. a -> Either a b
Left (String -> ParseError
ParseError String
err), StreamK m (Array a)
str)

    -- This is a simplified goArray
    goExtract :: s -> [a] -> Array a -> m (Either ParseError b, StreamK m (Array a))
goExtract !s
pst [a]
backBuf (Array MutableByteArray
_ Int
cur Int
end)
        | Int
cur forall a. Eq a => a -> a -> Bool
== Int
end = s -> [a] -> m (Either ParseError b, StreamK m (Array a))
goStop s
pst [a]
backBuf
    goExtract !s
pst [a]
backBuf (Array MutableByteArray
contents Int
cur Int
end) = do
        a
x <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. Unbox a => MutableByteArray -> Int -> IO a
peekWith MutableByteArray
contents Int
cur
        Step s b
pRes <- s -> a -> m (Step s b)
pstep s
pst a
x
        let next :: Int
next = INDEX_NEXT(cur,a)
        case Step s b
pRes of
            PR.Partial Int
0 s
s ->
                 s -> [a] -> Array a -> m (Either ParseError b, StreamK m (Array a))
goExtract s
s [] (forall a. MutableByteArray -> Int -> Int -> Array a
Array MutableByteArray
contents Int
next Int
end)
            PR.Partial Int
n s
s -> do
                forall a. HasCallStack => Bool -> a -> a
assert (Int
n forall a. Ord a => a -> a -> Bool
<= forall (t :: * -> *) a. Foldable t => t a -> Int
Prelude.length (a
xforall a. a -> [a] -> [a]
:[a]
backBuf)) (forall (m :: * -> *) a. Monad m => a -> m a
return ())
                let src0 :: [a]
src0 = forall a. Int -> [a] -> [a]
Prelude.take Int
n (a
xforall a. a -> [a] -> [a]
:[a]
backBuf)
                    arr0 :: Array a
arr0 = forall a. Unbox a => Int -> [a] -> Array a
A.fromListN Int
n (forall a. [a] -> [a]
Prelude.reverse [a]
src0)
                    arr1 :: Array a
arr1 = forall a. MutableByteArray -> Int -> Int -> Array a
Array MutableByteArray
contents Int
next Int
end
                    src :: Array a
src = Array a
arr0 forall a. Semigroup a => a -> a -> a
<> forall {a}. Array a
arr1
                s -> [a] -> Array a -> m (Either ParseError b, StreamK m (Array a))
goExtract s
s [] Array a
src
            PR.Continue Int
0 s
s ->
                s -> [a] -> Array a -> m (Either ParseError b, StreamK m (Array a))
goExtract s
s [a]
backBuf (forall a. MutableByteArray -> Int -> Int -> Array a
Array MutableByteArray
contents Int
next Int
end)
            PR.Continue Int
n s
s -> do
                forall a. HasCallStack => Bool -> a -> a
assert (Int
n forall a. Ord a => a -> a -> Bool
<= forall (t :: * -> *) a. Foldable t => t a -> Int
Prelude.length (a
xforall a. a -> [a] -> [a]
:[a]
backBuf)) (forall (m :: * -> *) a. Monad m => a -> m a
return ())
                let ([a]
src0, [a]
buf1) = forall a. Int -> [a] -> ([a], [a])
splitAt Int
n (a
xforall a. a -> [a] -> [a]
:[a]
backBuf)
                    arr0 :: Array a
arr0 = forall a. Unbox a => Int -> [a] -> Array a
A.fromListN Int
n (forall a. [a] -> [a]
Prelude.reverse [a]
src0)
                    arr1 :: Array a
arr1 = forall a. MutableByteArray -> Int -> Int -> Array a
Array MutableByteArray
contents Int
next Int
end
                    src :: Array a
src = Array a
arr0 forall a. Semigroup a => a -> a -> a
<> forall {a}. Array a
arr1
                s -> [a] -> Array a -> m (Either ParseError b, StreamK m (Array a))
goExtract s
s [a]
buf1 Array a
src
            PR.Done Int
0 b
b -> do
                let arr :: Array a
arr = forall a. MutableByteArray -> Int -> Int -> Array a
Array MutableByteArray
contents Int
next Int
end
                forall (m :: * -> *) a. Monad m => a -> m a
return (forall a b. b -> Either a b
Right b
b, forall a (m :: * -> *). a -> StreamK m a
K.fromPure forall {a}. Array a
arr)
            PR.Done Int
n b
b -> do
                forall a. HasCallStack => Bool -> a -> a
assert (Int
n forall a. Ord a => a -> a -> Bool
<= forall (t :: * -> *) a. Foldable t => t a -> Int
Prelude.length [a]
backBuf) (forall (m :: * -> *) a. Monad m => a -> m a
return ())
                let src0 :: [a]
src0 = forall a. Int -> [a] -> [a]
Prelude.take Int
n [a]
backBuf
                    -- XXX Use fromListRevN once implemented
                    -- arr0 = A.fromListRevN n src0
                    arr0 :: Array a
arr0 = forall a. Unbox a => Int -> [a] -> Array a
A.fromListN Int
n (forall a. [a] -> [a]
Prelude.reverse [a]
src0)
                    arr1 :: Array a
arr1 = forall a. MutableByteArray -> Int -> Int -> Array a
Array MutableByteArray
contents Int
next Int
end
                    str :: StreamK m (Array a)
str = forall a (m :: * -> *). a -> StreamK m a -> StreamK m a
K.cons Array a
arr0 (forall a (m :: * -> *). a -> StreamK m a
K.fromPure forall {a}. Array a
arr1)
                forall (m :: * -> *) a. Monad m => a -> m a
return (forall a b. b -> Either a b
Right b
b, forall {m :: * -> *}. StreamK m (Array a)
str)
            PR.Error String
err -> do
                let str :: StreamK m (Array a)
str = forall a (m :: * -> *). a -> StreamK m a
K.fromPure (forall a. MutableByteArray -> Int -> Int -> Array a
Array MutableByteArray
contents Int
cur Int
end)
                forall (m :: * -> *) a. Monad m => a -> m a
return (forall a b. a -> Either a b
Left (String -> ParseError
ParseError String
err), forall {m :: * -> *} {a}. StreamK m (Array a)
str)

    -- This is a simplified goExtract
    {-# INLINE goStop #-}
    goStop :: s -> [a] -> m (Either ParseError b, StreamK m (Array a))
goStop !s
pst [a]
backBuf = do
        Step s b
pRes <- s -> m (Step s b)
extract s
pst
        case Step s b
pRes of
            PR.Partial Int
_ s
_ -> forall a. HasCallStack => String -> a
error String
"Bug: parseBreak: Partial in extract"
            PR.Continue Int
0 s
s ->
                s -> [a] -> m (Either ParseError b, StreamK m (Array a))
goStop s
s [a]
backBuf
            PR.Continue Int
n s
s -> do
                forall a. HasCallStack => Bool -> a -> a
assert (Int
n forall a. Ord a => a -> a -> Bool
<= forall (t :: * -> *) a. Foldable t => t a -> Int
Prelude.length [a]
backBuf) (forall (m :: * -> *) a. Monad m => a -> m a
return ())
                let ([a]
src0, [a]
buf1) = forall a. Int -> [a] -> ([a], [a])
splitAt Int
n [a]
backBuf
                    arr :: Array a
arr = forall a. Unbox a => Int -> [a] -> Array a
A.fromListN Int
n (forall a. [a] -> [a]
Prelude.reverse [a]
src0)
                s -> [a] -> Array a -> m (Either ParseError b, StreamK m (Array a))
goExtract s
s [a]
buf1 Array a
arr
            PR.Done Int
0 b
b ->
                forall (m :: * -> *) a. Monad m => a -> m a
return (forall a b. b -> Either a b
Right b
b, forall (m :: * -> *) a. StreamK m a
K.nil)
            PR.Done Int
n b
b -> do
                forall a. HasCallStack => Bool -> a -> a
assert (Int
n forall a. Ord a => a -> a -> Bool
<= forall (t :: * -> *) a. Foldable t => t a -> Int
Prelude.length [a]
backBuf) (forall (m :: * -> *) a. Monad m => a -> m a
return ())
                let src0 :: [a]
src0 = forall a. Int -> [a] -> [a]
Prelude.take Int
n [a]
backBuf
                    -- XXX Use fromListRevN once implemented
                    -- arr0 = A.fromListRevN n src0
                    arr0 :: Array a
arr0 = forall a. Unbox a => Int -> [a] -> Array a
A.fromListN Int
n (forall a. [a] -> [a]
Prelude.reverse [a]
src0)
                forall (m :: * -> *) a. Monad m => a -> m a
return (forall a b. b -> Either a b
Right b
b, forall a (m :: * -> *). a -> StreamK m a
K.fromPure Array a
arr0)
            PR.Error String
err ->
                forall (m :: * -> *) a. Monad m => a -> m a
return (forall a b. a -> Either a b
Left (String -> ParseError
ParseError String
err), forall (m :: * -> *) a. StreamK m a
K.nil)

-- | Parse an array stream using the supplied 'Parser'.  Returns the parse
-- result and the unconsumed stream. Throws 'ParseError' if the parse fails.
--
-- /Internal/
--
{-# INLINE_NORMAL parseBreak #-}
parseBreak ::
       (MonadIO m, Unbox a)
    => PR.Parser a m b
    -> StreamK m (A.Array a)
    -> m (Either ParseError b, StreamK m (A.Array a))
{-
parseBreak p s =
    fmap fromStreamD <$> parseBreakD (PRD.fromParserK p) (toStreamD s)
-}
parseBreak :: forall (m :: * -> *) a b.
(MonadIO m, Unbox a) =>
Parser a m b
-> StreamK m (Array a)
-> m (Either ParseError b, StreamK m (Array a))
parseBreak = forall (m :: * -> *) a b.
(MonadIO m, Unbox a) =>
Parser a m b
-> StreamK m (Array a)
-> m (Either ParseError b, StreamK m (Array a))
parseBreakK

-------------------------------------------------------------------------------
-- Elimination - Running Array Folds and parsers
-------------------------------------------------------------------------------

-- | Note that this is not the same as using a @Parser (Array a) m b@ with the
-- regular "Streamly.Internal.Data.IsStream.parse" function. The regular parse
-- would consume the input arrays as single unit. This parser parses in the way
-- as described in the ChunkFold module. The input arrays are treated as @n@
-- element units and can be consumed partially. The remaining elements are
-- inserted in the source stream as an array.
--
{-# INLINE_NORMAL runArrayParserDBreak #-}
runArrayParserDBreak ::
       forall m a b. (MonadIO m, Unbox a)
    => PRD.Parser (Array a) m b
    -> D.Stream m (Array.Array a)
    -> m (Either ParseError b, D.Stream m (Array.Array a))
runArrayParserDBreak :: forall (m :: * -> *) a b.
(MonadIO m, Unbox a) =>
Parser (Array a) m b
-> Stream m (Array a)
-> m (Either ParseError b, Stream m (Array a))
runArrayParserDBreak
    (PRD.Parser s -> Array a -> m (Step s b)
pstep m (Initial s b)
initial s -> m (Step s b)
extract)
    stream :: Stream m (Array a)
stream@(D.Stream State StreamK m (Array a) -> s -> m (Step s (Array a))
step s
state) = do

    Initial s b
res <- m (Initial s b)
initial
    case Initial s b
res of
        PRD.IPartial s
s -> SPEC
-> s
-> List (Array a)
-> s
-> m (Either ParseError b, Stream m (Array a))
go SPEC
SPEC s
state (forall a. [a] -> List a
List []) s
s
        PRD.IDone b
b -> forall (m :: * -> *) a. Monad m => a -> m a
return (forall a b. b -> Either a b
Right b
b, Stream m (Array a)
stream)
        PRD.IError String
err -> forall (m :: * -> *) a. Monad m => a -> m a
return (forall a b. a -> Either a b
Left (String -> ParseError
ParseError String
err), Stream m (Array a)
stream)

    where

    -- "backBuf" contains last few items in the stream that we may have to
    -- backtrack to.
    --
    -- XXX currently we are using a dumb list based approach for backtracking
    -- buffer. This can be replaced by a sliding/ring buffer using Data.Array.
    -- That will allow us more efficient random back and forth movement.
    go :: SPEC
-> s
-> List (Array a)
-> s
-> m (Either ParseError b, Stream m (Array a))
go SPEC
_ s
st List (Array a)
backBuf !s
pst = do
        Step s (Array a)
r <- State StreamK m (Array a) -> s -> m (Step s (Array a))
step forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState s
st
        case Step s (Array a)
r of
            D.Yield Array a
x s
s -> SPEC
-> [Array a]
-> s
-> List (Array a)
-> s
-> m (Either ParseError b, Stream m (Array a))
gobuf SPEC
SPEC [Array a
x] s
s List (Array a)
backBuf s
pst
            D.Skip s
s -> SPEC
-> s
-> List (Array a)
-> s
-> m (Either ParseError b, Stream m (Array a))
go SPEC
SPEC s
s List (Array a)
backBuf s
pst
            Step s (Array a)
D.Stop -> forall {m :: * -> *}.
Applicative m =>
List (Array a) -> s -> m (Either ParseError b, Stream m (Array a))
goStop List (Array a)
backBuf s
pst

    gobuf :: SPEC
-> [Array a]
-> s
-> List (Array a)
-> s
-> m (Either ParseError b, Stream m (Array a))
gobuf !SPEC
_ [] s
s List (Array a)
backBuf !s
pst = SPEC
-> s
-> List (Array a)
-> s
-> m (Either ParseError b, Stream m (Array a))
go SPEC
SPEC s
s List (Array a)
backBuf s
pst
    gobuf !SPEC
_ (Array a
x:[Array a]
xs) s
s List (Array a)
backBuf !s
pst = do
        Step s b
pRes <- s -> Array a -> m (Step s b)
pstep s
pst Array a
x
        case Step s b
pRes of
            PR.Partial Int
0 s
pst1 ->
                 SPEC
-> [Array a]
-> s
-> List (Array a)
-> s
-> m (Either ParseError b, Stream m (Array a))
gobuf SPEC
SPEC [Array a]
xs s
s (forall a. [a] -> List a
List []) s
pst1
            PR.Partial Int
n s
pst1 -> do
                forall a. HasCallStack => Bool -> a -> a
assert
                    (Int
n forall a. Ord a => a -> a -> Bool
<= forall (t :: * -> *) a. (Foldable t, Num a) => t a -> a
sum (forall a b. (a -> b) -> [a] -> [b]
map forall a. Unbox a => Array a -> Int
Array.length (Array a
xforall a. a -> [a] -> [a]
:forall a. List a -> [a]
getList List (Array a)
backBuf)))
                    (forall (m :: * -> *) a. Monad m => a -> m a
return ())
                let src0 :: [Array a]
src0 = forall a. Unbox a => Int -> [Array a] -> [Array a]
takeArrayListRev Int
n (Array a
xforall a. a -> [a] -> [a]
:forall a. List a -> [a]
getList List (Array a)
backBuf)
                    src :: [Array a]
src  = forall a. [a] -> [a]
Prelude.reverse [Array a]
src0 forall a. [a] -> [a] -> [a]
++ [Array a]
xs
                SPEC
-> [Array a]
-> s
-> List (Array a)
-> s
-> m (Either ParseError b, Stream m (Array a))
gobuf SPEC
SPEC [Array a]
src s
s (forall a. [a] -> List a
List []) s
pst1
            PR.Continue Int
0 s
pst1 ->
                SPEC
-> [Array a]
-> s
-> List (Array a)
-> s
-> m (Either ParseError b, Stream m (Array a))
gobuf SPEC
SPEC [Array a]
xs s
s (forall a. [a] -> List a
List (Array a
xforall a. a -> [a] -> [a]
:forall a. List a -> [a]
getList List (Array a)
backBuf)) s
pst1
            PR.Continue Int
n s
pst1 -> do
                forall a. HasCallStack => Bool -> a -> a
assert
                    (Int
n forall a. Ord a => a -> a -> Bool
<= forall (t :: * -> *) a. (Foldable t, Num a) => t a -> a
sum (forall a b. (a -> b) -> [a] -> [b]
map forall a. Unbox a => Array a -> Int
Array.length (Array a
xforall a. a -> [a] -> [a]
:forall a. List a -> [a]
getList List (Array a)
backBuf)))
                    (forall (m :: * -> *) a. Monad m => a -> m a
return ())
                let ([Array a]
src0, [Array a]
buf1) = forall a. Unbox a => Int -> [Array a] -> ([Array a], [Array a])
splitAtArrayListRev Int
n (Array a
xforall a. a -> [a] -> [a]
:forall a. List a -> [a]
getList List (Array a)
backBuf)
                    src :: [Array a]
src  = forall a. [a] -> [a]
Prelude.reverse [Array a]
src0 forall a. [a] -> [a] -> [a]
++ [Array a]
xs
                SPEC
-> [Array a]
-> s
-> List (Array a)
-> s
-> m (Either ParseError b, Stream m (Array a))
gobuf SPEC
SPEC [Array a]
src s
s (forall a. [a] -> List a
List [Array a]
buf1) s
pst1
            PR.Done Int
0 b
b -> do
                let str :: Stream m (Array a)
str = forall (m :: * -> *) a.
Monad m =>
Stream m a -> Stream m a -> Stream m a
D.append (forall (m :: * -> *) a. Applicative m => [a] -> Stream m a
D.fromList [Array a]
xs) (forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream State StreamK m (Array a) -> s -> m (Step s (Array a))
step s
s)
                forall (m :: * -> *) a. Monad m => a -> m a
return (forall a b. b -> Either a b
Right b
b, Stream m (Array a)
str)
            PR.Done Int
n b
b -> do
                forall a. HasCallStack => Bool -> a -> a
assert
                    (Int
n forall a. Ord a => a -> a -> Bool
<= forall (t :: * -> *) a. (Foldable t, Num a) => t a -> a
sum (forall a b. (a -> b) -> [a] -> [b]
map forall a. Unbox a => Array a -> Int
Array.length (Array a
xforall a. a -> [a] -> [a]
:forall a. List a -> [a]
getList List (Array a)
backBuf)))
                    (forall (m :: * -> *) a. Monad m => a -> m a
return ())
                let src0 :: [Array a]
src0 = forall a. Unbox a => Int -> [Array a] -> [Array a]
takeArrayListRev Int
n (Array a
xforall a. a -> [a] -> [a]
:forall a. List a -> [a]
getList List (Array a)
backBuf)
                    src :: [Array a]
src = forall a. [a] -> [a]
Prelude.reverse [Array a]
src0 forall a. [a] -> [a] -> [a]
++ [Array a]
xs
                forall (m :: * -> *) a. Monad m => a -> m a
return (forall a b. b -> Either a b
Right b
b, forall (m :: * -> *) a.
Monad m =>
Stream m a -> Stream m a -> Stream m a
D.append (forall (m :: * -> *) a. Applicative m => [a] -> Stream m a
D.fromList [Array a]
src) (forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream State StreamK m (Array a) -> s -> m (Step s (Array a))
step s
s))
            PR.Error String
err -> do
                let strm :: Stream m (Array a)
strm = forall (m :: * -> *) a.
Monad m =>
Stream m a -> Stream m a -> Stream m a
D.append (forall (m :: * -> *) a. Applicative m => [a] -> Stream m a
D.fromList (Array a
xforall a. a -> [a] -> [a]
:[Array a]
xs)) (forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream State StreamK m (Array a) -> s -> m (Step s (Array a))
step s
s)
                forall (m :: * -> *) a. Monad m => a -> m a
return (forall a b. a -> Either a b
Left (String -> ParseError
ParseError String
err), Stream m (Array a)
strm)

    -- This is a simplified gobuf
    goExtract :: SPEC
-> [Array a]
-> List (Array a)
-> s
-> m (Either ParseError b, Stream m (Array a))
goExtract SPEC
_ [] List (Array a)
backBuf !s
pst = List (Array a) -> s -> m (Either ParseError b, Stream m (Array a))
goStop List (Array a)
backBuf s
pst
    goExtract SPEC
_ (Array a
x:[Array a]
xs) List (Array a)
backBuf !s
pst = do
        Step s b
pRes <- s -> Array a -> m (Step s b)
pstep s
pst Array a
x
        case Step s b
pRes of
            PR.Partial Int
0 s
pst1 ->
                 SPEC
-> [Array a]
-> List (Array a)
-> s
-> m (Either ParseError b, Stream m (Array a))
goExtract SPEC
SPEC [Array a]
xs (forall a. [a] -> List a
List []) s
pst1
            PR.Partial Int
n s
pst1 -> do
                forall a. HasCallStack => Bool -> a -> a
assert
                    (Int
n forall a. Ord a => a -> a -> Bool
<= forall (t :: * -> *) a. (Foldable t, Num a) => t a -> a
sum (forall a b. (a -> b) -> [a] -> [b]
map forall a. Unbox a => Array a -> Int
Array.length (Array a
xforall a. a -> [a] -> [a]
:forall a. List a -> [a]
getList List (Array a)
backBuf)))
                    (forall (m :: * -> *) a. Monad m => a -> m a
return ())
                let src0 :: [Array a]
src0 = forall a. Unbox a => Int -> [Array a] -> [Array a]
takeArrayListRev Int
n (Array a
xforall a. a -> [a] -> [a]
:forall a. List a -> [a]
getList List (Array a)
backBuf)
                    src :: [Array a]
src  = forall a. [a] -> [a]
Prelude.reverse [Array a]
src0 forall a. [a] -> [a] -> [a]
++ [Array a]
xs
                SPEC
-> [Array a]
-> List (Array a)
-> s
-> m (Either ParseError b, Stream m (Array a))
goExtract SPEC
SPEC [Array a]
src (forall a. [a] -> List a
List []) s
pst1
            PR.Continue Int
0 s
pst1 ->
                SPEC
-> [Array a]
-> List (Array a)
-> s
-> m (Either ParseError b, Stream m (Array a))
goExtract SPEC
SPEC [Array a]
xs (forall a. [a] -> List a
List (Array a
xforall a. a -> [a] -> [a]
:forall a. List a -> [a]
getList List (Array a)
backBuf)) s
pst1
            PR.Continue Int
n s
pst1 -> do
                forall a. HasCallStack => Bool -> a -> a
assert
                    (Int
n forall a. Ord a => a -> a -> Bool
<= forall (t :: * -> *) a. (Foldable t, Num a) => t a -> a
sum (forall a b. (a -> b) -> [a] -> [b]
map forall a. Unbox a => Array a -> Int
Array.length (Array a
xforall a. a -> [a] -> [a]
:forall a. List a -> [a]
getList List (Array a)
backBuf)))
                    (forall (m :: * -> *) a. Monad m => a -> m a
return ())
                let ([Array a]
src0, [Array a]
buf1) = forall a. Unbox a => Int -> [Array a] -> ([Array a], [Array a])
splitAtArrayListRev Int
n (Array a
xforall a. a -> [a] -> [a]
:forall a. List a -> [a]
getList List (Array a)
backBuf)
                    src :: [Array a]
src  = forall a. [a] -> [a]
Prelude.reverse [Array a]
src0 forall a. [a] -> [a] -> [a]
++ [Array a]
xs
                SPEC
-> [Array a]
-> List (Array a)
-> s
-> m (Either ParseError b, Stream m (Array a))
goExtract SPEC
SPEC [Array a]
src (forall a. [a] -> List a
List [Array a]
buf1) s
pst1
            PR.Done Int
0 b
b ->
                forall (m :: * -> *) a. Monad m => a -> m a
return (forall a b. b -> Either a b
Right b
b, forall (m :: * -> *) a. Applicative m => [a] -> Stream m a
D.fromList [Array a]
xs)
            PR.Done Int
n b
b -> do
                forall a. HasCallStack => Bool -> a -> a
assert
                    (Int
n forall a. Ord a => a -> a -> Bool
<= forall (t :: * -> *) a. (Foldable t, Num a) => t a -> a
sum (forall a b. (a -> b) -> [a] -> [b]
map forall a. Unbox a => Array a -> Int
Array.length (Array a
xforall a. a -> [a] -> [a]
:forall a. List a -> [a]
getList List (Array a)
backBuf)))
                    (forall (m :: * -> *) a. Monad m => a -> m a
return ())
                let src0 :: [Array a]
src0 = forall a. Unbox a => Int -> [Array a] -> [Array a]
takeArrayListRev Int
n (Array a
xforall a. a -> [a] -> [a]
:forall a. List a -> [a]
getList List (Array a)
backBuf)
                    src :: [Array a]
src = forall a. [a] -> [a]
Prelude.reverse [Array a]
src0 forall a. [a] -> [a] -> [a]
++ [Array a]
xs
                forall (m :: * -> *) a. Monad m => a -> m a
return (forall a b. b -> Either a b
Right b
b, forall (m :: * -> *) a. Applicative m => [a] -> Stream m a
D.fromList [Array a]
src)
            PR.Error String
err ->
                forall (m :: * -> *) a. Monad m => a -> m a
return (forall a b. a -> Either a b
Left (String -> ParseError
ParseError String
err), forall (m :: * -> *) a. Applicative m => [a] -> Stream m a
D.fromList (Array a
xforall a. a -> [a] -> [a]
:[Array a]
xs))

    -- This is a simplified goExtract
    {-# INLINE goStop #-}
    goStop :: List (Array a) -> s -> m (Either ParseError b, Stream m (Array a))
goStop List (Array a)
backBuf s
pst = do
        Step s b
pRes <- s -> m (Step s b)
extract s
pst
        case Step s b
pRes of
            PR.Partial Int
_ s
_ -> forall a. HasCallStack => String -> a
error String
"Bug: runArrayParserDBreak: Partial in extract"
            PR.Continue Int
0 s
pst1 ->
                List (Array a) -> s -> m (Either ParseError b, Stream m (Array a))
goStop List (Array a)
backBuf s
pst1
            PR.Continue Int
n s
pst1 -> do
                forall a. HasCallStack => Bool -> a -> a
assert
                    (Int
n forall a. Ord a => a -> a -> Bool
<= forall (t :: * -> *) a. (Foldable t, Num a) => t a -> a
sum (forall a b. (a -> b) -> [a] -> [b]
map forall a. Unbox a => Array a -> Int
Array.length (forall a. List a -> [a]
getList List (Array a)
backBuf)))
                    (forall (m :: * -> *) a. Monad m => a -> m a
return ())
                let ([Array a]
src0, [Array a]
buf1) = forall a. Unbox a => Int -> [Array a] -> ([Array a], [Array a])
splitAtArrayListRev Int
n (forall a. List a -> [a]
getList List (Array a)
backBuf)
                    src :: [Array a]
src = forall a. [a] -> [a]
Prelude.reverse [Array a]
src0
                SPEC
-> [Array a]
-> List (Array a)
-> s
-> m (Either ParseError b, Stream m (Array a))
goExtract SPEC
SPEC [Array a]
src (forall a. [a] -> List a
List [Array a]
buf1) s
pst1
            PR.Done Int
0 b
b -> forall (m :: * -> *) a. Monad m => a -> m a
return (forall a b. b -> Either a b
Right b
b, forall (m :: * -> *) a. Applicative m => Stream m a
D.nil)
            PR.Done Int
n b
b -> do
                forall a. HasCallStack => Bool -> a -> a
assert
                    (Int
n forall a. Ord a => a -> a -> Bool
<= forall (t :: * -> *) a. (Foldable t, Num a) => t a -> a
sum (forall a b. (a -> b) -> [a] -> [b]
map forall a. Unbox a => Array a -> Int
Array.length (forall a. List a -> [a]
getList List (Array a)
backBuf)))
                    (forall (m :: * -> *) a. Monad m => a -> m a
return ())
                let src0 :: [Array a]
src0 = forall a. Unbox a => Int -> [Array a] -> [Array a]
takeArrayListRev Int
n (forall a. List a -> [a]
getList List (Array a)
backBuf)
                    src :: [Array a]
src = forall a. [a] -> [a]
Prelude.reverse [Array a]
src0
                forall (m :: * -> *) a. Monad m => a -> m a
return (forall a b. b -> Either a b
Right b
b, forall (m :: * -> *) a. Applicative m => [a] -> Stream m a
D.fromList [Array a]
src)
            PR.Error String
err ->
                forall (m :: * -> *) a. Monad m => a -> m a
return (forall a b. a -> Either a b
Left (String -> ParseError
ParseError String
err), forall (m :: * -> *) a. Applicative m => Stream m a
D.nil)

{-
-- | Parse an array stream using the supplied 'Parser'.  Returns the parse
-- result and the unconsumed stream. Throws 'ParseError' if the parse fails.
--
-- /Internal/
--
{-# INLINE parseArr #-}
parseArr ::
       (MonadIO m, MonadThrow m, Unbox a)
    => ASF.Parser a m b
    -> Stream m (A.Array a)
    -> m (b, Stream m (A.Array a))
parseArr p s = fmap fromStreamD <$> parseBreakD p (toStreamD s)
-}

-- | Fold an array stream using the supplied array stream 'Fold'.
--
-- /Pre-release/
--
{-# INLINE runArrayFold #-}
runArrayFold :: (MonadIO m, Unbox a) =>
    ChunkFold m a b -> StreamK m (A.Array a) -> m (Either ParseError b)
runArrayFold :: forall (m :: * -> *) a b.
(MonadIO m, Unbox a) =>
ChunkFold m a b -> StreamK m (Array a) -> m (Either ParseError b)
runArrayFold (ChunkFold Parser (Array a) m b
p) StreamK m (Array a)
s = forall a b. (a, b) -> a
fst forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall (m :: * -> *) a b.
(MonadIO m, Unbox a) =>
Parser (Array a) m b
-> Stream m (Array a)
-> m (Either ParseError b, Stream m (Array a))
runArrayParserDBreak Parser (Array a) m b
p (forall (m :: * -> *) a. Applicative m => StreamK m a -> Stream m a
toStream StreamK m (Array a)
s)

-- | Like 'fold' but also returns the remaining stream.
--
-- /Pre-release/
--
{-# INLINE runArrayFoldBreak #-}
runArrayFoldBreak :: (MonadIO m, Unbox a) =>
    ChunkFold m a b -> StreamK m (A.Array a) -> m (Either ParseError b, StreamK m (A.Array a))
runArrayFoldBreak :: forall (m :: * -> *) a b.
(MonadIO m, Unbox a) =>
ChunkFold m a b
-> StreamK m (Array a)
-> m (Either ParseError b, StreamK m (Array a))
runArrayFoldBreak (ChunkFold Parser (Array a) m b
p) StreamK m (Array a)
s =
    forall (p :: * -> * -> *) b c a.
Bifunctor p =>
(b -> c) -> p a b -> p a c
second forall (m :: * -> *) a. Monad m => Stream m a -> StreamK m a
fromStream forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall (m :: * -> *) a b.
(MonadIO m, Unbox a) =>
Parser (Array a) m b
-> Stream m (Array a)
-> m (Either ParseError b, Stream m (Array a))
runArrayParserDBreak Parser (Array a) m b
p (forall (m :: * -> *) a. Applicative m => StreamK m a -> Stream m a
toStream StreamK m (Array a)
s)

{-# ANN type ParseChunksState Fuse #-}
data ParseChunksState x inpBuf st pst =
      ParseChunksInit inpBuf st
    | ParseChunksInitBuf inpBuf
    | ParseChunksInitLeftOver inpBuf
    | ParseChunksStream st inpBuf !pst
    | ParseChunksStop inpBuf !pst
    | ParseChunksBuf inpBuf st inpBuf !pst
    | ParseChunksExtract inpBuf inpBuf !pst
    | ParseChunksYield x (ParseChunksState x inpBuf st pst)

{-# INLINE_NORMAL runArrayFoldManyD #-}
runArrayFoldManyD
    :: (Monad m, Unbox a)
    => ChunkFold m a b
    -> D.Stream m (Array a)
    -> D.Stream m (Either ParseError b)
runArrayFoldManyD :: forall (m :: * -> *) a b.
(Monad m, Unbox a) =>
ChunkFold m a b
-> Stream m (Array a) -> Stream m (Either ParseError b)
runArrayFoldManyD
    (ChunkFold (PRD.Parser s -> Array a -> m (Step s b)
pstep m (Initial s b)
initial s -> m (Step s b)
extract)) (D.Stream State StreamK m (Array a) -> s -> m (Step s (Array a))
step s
state) =

    forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream forall {m :: * -> *} {a}.
State StreamK m a
-> ParseChunksState (Either ParseError b) [Array a] s s
-> m (Step
        (ParseChunksState (Either ParseError b) [Array a] s s)
        (Either ParseError b))
stepOuter (forall x inpBuf st pst.
inpBuf -> st -> ParseChunksState x inpBuf st pst
ParseChunksInit [] s
state)

    where

    {-# INLINE_LATE stepOuter #-}
    -- Buffer is empty, get the first element from the stream, initialize the
    -- fold and then go to stream processing loop.
    stepOuter :: State StreamK m a
-> ParseChunksState (Either ParseError b) [Array a] s s
-> m (Step
        (ParseChunksState (Either ParseError b) [Array a] s s)
        (Either ParseError b))
stepOuter State StreamK m a
gst (ParseChunksInit [] s
st) = do
        Step s (Array a)
r <- State StreamK m (Array a) -> s -> m (Step s (Array a))
step (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m a
gst) s
st
        case Step s (Array a)
r of
            D.Yield Array a
x s
s -> do
                Initial s b
res <- m (Initial s b)
initial
                case Initial s b
res of
                    PRD.IPartial s
ps ->
                        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
D.Skip forall a b. (a -> b) -> a -> b
$ forall x inpBuf st pst.
inpBuf -> st -> inpBuf -> pst -> ParseChunksState x inpBuf st pst
ParseChunksBuf [Array a
x] s
s [] s
ps
                    PRD.IDone b
pb -> do
                        let next :: ParseChunksState x [Array a] s pst
next = forall x inpBuf st pst.
inpBuf -> st -> ParseChunksState x inpBuf st pst
ParseChunksInit [Array a
x] s
s
                        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
D.Skip forall a b. (a -> b) -> a -> b
$ forall x inpBuf st pst.
x
-> ParseChunksState x inpBuf st pst
-> ParseChunksState x inpBuf st pst
ParseChunksYield (forall a b. b -> Either a b
Right b
pb) forall {x} {pst}. ParseChunksState x [Array a] s pst
next
                    PRD.IError String
err -> do
                        let next :: ParseChunksState x [a] st pst
next = forall x inpBuf st pst. inpBuf -> ParseChunksState x inpBuf st pst
ParseChunksInitLeftOver []
                        forall (m :: * -> *) a. Monad m => a -> m a
return
                            forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
D.Skip
                            forall a b. (a -> b) -> a -> b
$ forall x inpBuf st pst.
x
-> ParseChunksState x inpBuf st pst
-> ParseChunksState x inpBuf st pst
ParseChunksYield (forall a b. a -> Either a b
Left (String -> ParseError
ParseError String
err)) forall {x} {a} {st} {pst}. ParseChunksState x [a] st pst
next
            D.Skip s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
D.Skip forall a b. (a -> b) -> a -> b
$ forall x inpBuf st pst.
inpBuf -> st -> ParseChunksState x inpBuf st pst
ParseChunksInit [] s
s
            Step s (Array a)
D.Stop   -> forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
D.Stop

    -- Buffer is not empty, go to buffered processing loop
    stepOuter State StreamK m a
_ (ParseChunksInit [Array a]
src s
st) = do
        Initial s b
res <- m (Initial s b)
initial
        case Initial s b
res of
            PRD.IPartial s
ps ->
                forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
D.Skip forall a b. (a -> b) -> a -> b
$ forall x inpBuf st pst.
inpBuf -> st -> inpBuf -> pst -> ParseChunksState x inpBuf st pst
ParseChunksBuf [Array a]
src s
st [] s
ps
            PRD.IDone b
pb ->
                let next :: ParseChunksState x [Array a] s pst
next = forall x inpBuf st pst.
inpBuf -> st -> ParseChunksState x inpBuf st pst
ParseChunksInit [Array a]
src s
st
                 in forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
D.Skip forall a b. (a -> b) -> a -> b
$ forall x inpBuf st pst.
x
-> ParseChunksState x inpBuf st pst
-> ParseChunksState x inpBuf st pst
ParseChunksYield (forall a b. b -> Either a b
Right b
pb) forall {x} {pst}. ParseChunksState x [Array a] s pst
next
            PRD.IError String
err -> do
                let next :: ParseChunksState x [a] st pst
next = forall x inpBuf st pst. inpBuf -> ParseChunksState x inpBuf st pst
ParseChunksInitLeftOver []
                forall (m :: * -> *) a. Monad m => a -> m a
return
                    forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
D.Skip
                    forall a b. (a -> b) -> a -> b
$ forall x inpBuf st pst.
x
-> ParseChunksState x inpBuf st pst
-> ParseChunksState x inpBuf st pst
ParseChunksYield (forall a b. a -> Either a b
Left (String -> ParseError
ParseError String
err)) forall {x} {a} {st} {pst}. ParseChunksState x [a] st pst
next

    -- This is a simplified ParseChunksInit
    stepOuter State StreamK m a
_ (ParseChunksInitBuf [Array a]
src) = do
        Initial s b
res <- m (Initial s b)
initial
        case Initial s b
res of
            PRD.IPartial s
ps ->
                forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
D.Skip forall a b. (a -> b) -> a -> b
$ forall x inpBuf st pst.
inpBuf -> inpBuf -> pst -> ParseChunksState x inpBuf st pst
ParseChunksExtract [Array a]
src [] s
ps
            PRD.IDone b
pb ->
                let next :: ParseChunksState x [Array a] st pst
next = forall x inpBuf st pst. inpBuf -> ParseChunksState x inpBuf st pst
ParseChunksInitBuf [Array a]
src
                 in forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
D.Skip forall a b. (a -> b) -> a -> b
$ forall x inpBuf st pst.
x
-> ParseChunksState x inpBuf st pst
-> ParseChunksState x inpBuf st pst
ParseChunksYield (forall a b. b -> Either a b
Right b
pb) forall {x} {st} {pst}. ParseChunksState x [Array a] st pst
next
            PRD.IError String
err -> do
                let next :: ParseChunksState x [a] st pst
next = forall x inpBuf st pst. inpBuf -> ParseChunksState x inpBuf st pst
ParseChunksInitLeftOver []
                forall (m :: * -> *) a. Monad m => a -> m a
return
                    forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
D.Skip
                    forall a b. (a -> b) -> a -> b
$ forall x inpBuf st pst.
x
-> ParseChunksState x inpBuf st pst
-> ParseChunksState x inpBuf st pst
ParseChunksYield (forall a b. a -> Either a b
Left (String -> ParseError
ParseError String
err)) forall {x} {a} {st} {pst}. ParseChunksState x [a] st pst
next

    -- XXX we just discard any leftover input at the end
    stepOuter State StreamK m a
_ (ParseChunksInitLeftOver [Array a]
_) = forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
D.Stop

    -- Buffer is empty, process elements from the stream
    stepOuter State StreamK m a
gst (ParseChunksStream s
st [Array a]
backBuf s
pst) = do
        Step s (Array a)
r <- State StreamK m (Array a) -> s -> m (Step s (Array a))
step (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m a
gst) s
st
        case Step s (Array a)
r of
            D.Yield Array a
x s
s -> do
                Step s b
pRes <- s -> Array a -> m (Step s b)
pstep s
pst Array a
x
                case Step s b
pRes of
                    PR.Partial Int
0 s
pst1 ->
                        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
D.Skip forall a b. (a -> b) -> a -> b
$ forall x inpBuf st pst.
st -> inpBuf -> pst -> ParseChunksState x inpBuf st pst
ParseChunksStream s
s [] s
pst1
                    PR.Partial Int
n s
pst1 -> do
                        forall a. HasCallStack => Bool -> a -> a
assert
                            (Int
n forall a. Ord a => a -> a -> Bool
<= forall (t :: * -> *) a. (Foldable t, Num a) => t a -> a
sum (forall a b. (a -> b) -> [a] -> [b]
map forall a. Unbox a => Array a -> Int
Array.length (Array a
xforall a. a -> [a] -> [a]
:[Array a]
backBuf)))
                            (forall (m :: * -> *) a. Monad m => a -> m a
return ())
                        let src0 :: [Array a]
src0 = forall a. Unbox a => Int -> [Array a] -> [Array a]
takeArrayListRev Int
n (Array a
xforall a. a -> [a] -> [a]
:[Array a]
backBuf)
                            src :: [Array a]
src  = forall a. [a] -> [a]
Prelude.reverse [Array a]
src0
                        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
D.Skip forall a b. (a -> b) -> a -> b
$ forall x inpBuf st pst.
inpBuf -> st -> inpBuf -> pst -> ParseChunksState x inpBuf st pst
ParseChunksBuf [Array a]
src s
s [] s
pst1
                    PR.Continue Int
0 s
pst1 ->
                        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
D.Skip forall a b. (a -> b) -> a -> b
$ forall x inpBuf st pst.
st -> inpBuf -> pst -> ParseChunksState x inpBuf st pst
ParseChunksStream s
s (Array a
xforall a. a -> [a] -> [a]
:[Array a]
backBuf) s
pst1
                    PR.Continue Int
n s
pst1 -> do
                        forall a. HasCallStack => Bool -> a -> a
assert
                            (Int
n forall a. Ord a => a -> a -> Bool
<= forall (t :: * -> *) a. (Foldable t, Num a) => t a -> a
sum (forall a b. (a -> b) -> [a] -> [b]
map forall a. Unbox a => Array a -> Int
Array.length (Array a
xforall a. a -> [a] -> [a]
:[Array a]
backBuf)))
                            (forall (m :: * -> *) a. Monad m => a -> m a
return ())
                        let ([Array a]
src0, [Array a]
buf1) = forall a. Unbox a => Int -> [Array a] -> ([Array a], [Array a])
splitAtArrayListRev Int
n (Array a
xforall a. a -> [a] -> [a]
:[Array a]
backBuf)
                            src :: [Array a]
src  = forall a. [a] -> [a]
Prelude.reverse [Array a]
src0
                        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
D.Skip forall a b. (a -> b) -> a -> b
$ forall x inpBuf st pst.
inpBuf -> st -> inpBuf -> pst -> ParseChunksState x inpBuf st pst
ParseChunksBuf [Array a]
src s
s [Array a]
buf1 s
pst1
                    PR.Done Int
0 b
b -> do
                        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
D.Skip forall a b. (a -> b) -> a -> b
$
                            forall x inpBuf st pst.
x
-> ParseChunksState x inpBuf st pst
-> ParseChunksState x inpBuf st pst
ParseChunksYield (forall a b. b -> Either a b
Right b
b) (forall x inpBuf st pst.
inpBuf -> st -> ParseChunksState x inpBuf st pst
ParseChunksInit [] s
s)
                    PR.Done Int
n b
b -> do
                        forall a. HasCallStack => Bool -> a -> a
assert
                            (Int
n forall a. Ord a => a -> a -> Bool
<= forall (t :: * -> *) a. (Foldable t, Num a) => t a -> a
sum (forall a b. (a -> b) -> [a] -> [b]
map forall a. Unbox a => Array a -> Int
Array.length (Array a
xforall a. a -> [a] -> [a]
:[Array a]
backBuf)))
                            (forall (m :: * -> *) a. Monad m => a -> m a
return ())
                        let src0 :: [Array a]
src0 = forall a. Unbox a => Int -> [Array a] -> [Array a]
takeArrayListRev Int
n (Array a
xforall a. a -> [a] -> [a]
:[Array a]
backBuf)
                            src :: [Array a]
src = forall a. [a] -> [a]
Prelude.reverse [Array a]
src0
                            next :: ParseChunksState x [Array a] s pst
next = forall x inpBuf st pst.
inpBuf -> st -> ParseChunksState x inpBuf st pst
ParseChunksInit [Array a]
src s
s
                        forall (m :: * -> *) a. Monad m => a -> m a
return
                            forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
D.Skip
                            forall a b. (a -> b) -> a -> b
$ forall x inpBuf st pst.
x
-> ParseChunksState x inpBuf st pst
-> ParseChunksState x inpBuf st pst
ParseChunksYield (forall a b. b -> Either a b
Right b
b) forall {x} {pst}. ParseChunksState x [Array a] s pst
next
                    PR.Error String
err -> do
                        let next :: ParseChunksState x [a] st pst
next = forall x inpBuf st pst. inpBuf -> ParseChunksState x inpBuf st pst
ParseChunksInitLeftOver []
                        forall (m :: * -> *) a. Monad m => a -> m a
return
                            forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
D.Skip
                            forall a b. (a -> b) -> a -> b
$ forall x inpBuf st pst.
x
-> ParseChunksState x inpBuf st pst
-> ParseChunksState x inpBuf st pst
ParseChunksYield (forall a b. a -> Either a b
Left (String -> ParseError
ParseError String
err)) forall {x} {a} {st} {pst}. ParseChunksState x [a] st pst
next

            D.Skip s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
D.Skip forall a b. (a -> b) -> a -> b
$ forall x inpBuf st pst.
st -> inpBuf -> pst -> ParseChunksState x inpBuf st pst
ParseChunksStream s
s [Array a]
backBuf s
pst
            Step s (Array a)
D.Stop -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
D.Skip forall a b. (a -> b) -> a -> b
$ forall x inpBuf st pst.
inpBuf -> pst -> ParseChunksState x inpBuf st pst
ParseChunksStop [Array a]
backBuf s
pst

    -- go back to stream processing mode
    stepOuter State StreamK m a
_ (ParseChunksBuf [] s
s [Array a]
buf s
pst) =
        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
D.Skip forall a b. (a -> b) -> a -> b
$ forall x inpBuf st pst.
st -> inpBuf -> pst -> ParseChunksState x inpBuf st pst
ParseChunksStream s
s [Array a]
buf s
pst

    -- buffered processing loop
    stepOuter State StreamK m a
_ (ParseChunksBuf (Array a
x:[Array a]
xs) s
s [Array a]
backBuf s
pst) = do
        Step s b
pRes <- s -> Array a -> m (Step s b)
pstep s
pst Array a
x
        case Step s b
pRes of
            PR.Partial Int
0 s
pst1 ->
                forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
D.Skip forall a b. (a -> b) -> a -> b
$ forall x inpBuf st pst.
inpBuf -> st -> inpBuf -> pst -> ParseChunksState x inpBuf st pst
ParseChunksBuf [Array a]
xs s
s [] s
pst1
            PR.Partial Int
n s
pst1 -> do
                forall a. HasCallStack => Bool -> a -> a
assert (Int
n forall a. Ord a => a -> a -> Bool
<= forall (t :: * -> *) a. (Foldable t, Num a) => t a -> a
sum (forall a b. (a -> b) -> [a] -> [b]
map forall a. Unbox a => Array a -> Int
Array.length (Array a
xforall a. a -> [a] -> [a]
:[Array a]
backBuf))) (forall (m :: * -> *) a. Monad m => a -> m a
return ())
                let src0 :: [Array a]
src0 = forall a. Unbox a => Int -> [Array a] -> [Array a]
takeArrayListRev Int
n (Array a
xforall a. a -> [a] -> [a]
:[Array a]
backBuf)
                    src :: [Array a]
src  = forall a. [a] -> [a]
Prelude.reverse [Array a]
src0 forall a. [a] -> [a] -> [a]
++ [Array a]
xs
                forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
D.Skip forall a b. (a -> b) -> a -> b
$ forall x inpBuf st pst.
inpBuf -> st -> inpBuf -> pst -> ParseChunksState x inpBuf st pst
ParseChunksBuf [Array a]
src s
s [] s
pst1
            PR.Continue Int
0 s
pst1 ->
                forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
D.Skip forall a b. (a -> b) -> a -> b
$ forall x inpBuf st pst.
inpBuf -> st -> inpBuf -> pst -> ParseChunksState x inpBuf st pst
ParseChunksBuf [Array a]
xs s
s (Array a
xforall a. a -> [a] -> [a]
:[Array a]
backBuf) s
pst1
            PR.Continue Int
n s
pst1 -> do
                forall a. HasCallStack => Bool -> a -> a
assert (Int
n forall a. Ord a => a -> a -> Bool
<= forall (t :: * -> *) a. (Foldable t, Num a) => t a -> a
sum (forall a b. (a -> b) -> [a] -> [b]
map forall a. Unbox a => Array a -> Int
Array.length (Array a
xforall a. a -> [a] -> [a]
:[Array a]
backBuf))) (forall (m :: * -> *) a. Monad m => a -> m a
return ())
                let ([Array a]
src0, [Array a]
buf1) = forall a. Unbox a => Int -> [Array a] -> ([Array a], [Array a])
splitAtArrayListRev Int
n (Array a
xforall a. a -> [a] -> [a]
:[Array a]
backBuf)
                    src :: [Array a]
src  = forall a. [a] -> [a]
Prelude.reverse [Array a]
src0 forall a. [a] -> [a] -> [a]
++ [Array a]
xs
                forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
D.Skip forall a b. (a -> b) -> a -> b
$ forall x inpBuf st pst.
inpBuf -> st -> inpBuf -> pst -> ParseChunksState x inpBuf st pst
ParseChunksBuf [Array a]
src s
s [Array a]
buf1 s
pst1
            PR.Done Int
0 b
b ->
                forall (m :: * -> *) a. Monad m => a -> m a
return
                    forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
D.Skip
                    forall a b. (a -> b) -> a -> b
$ forall x inpBuf st pst.
x
-> ParseChunksState x inpBuf st pst
-> ParseChunksState x inpBuf st pst
ParseChunksYield (forall a b. b -> Either a b
Right b
b) (forall x inpBuf st pst.
inpBuf -> st -> ParseChunksState x inpBuf st pst
ParseChunksInit [Array a]
xs s
s)
            PR.Done Int
n b
b -> do
                forall a. HasCallStack => Bool -> a -> a
assert (Int
n forall a. Ord a => a -> a -> Bool
<= forall (t :: * -> *) a. (Foldable t, Num a) => t a -> a
sum (forall a b. (a -> b) -> [a] -> [b]
map forall a. Unbox a => Array a -> Int
Array.length (Array a
xforall a. a -> [a] -> [a]
:[Array a]
backBuf))) (forall (m :: * -> *) a. Monad m => a -> m a
return ())
                let src0 :: [Array a]
src0 = forall a. Unbox a => Int -> [Array a] -> [Array a]
takeArrayListRev Int
n (Array a
xforall a. a -> [a] -> [a]
:[Array a]
backBuf)
                    src :: [Array a]
src = forall a. [a] -> [a]
Prelude.reverse [Array a]
src0 forall a. [a] -> [a] -> [a]
++ [Array a]
xs
                forall (m :: * -> *) a. Monad m => a -> m a
return
                    forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
D.Skip
                    forall a b. (a -> b) -> a -> b
$ forall x inpBuf st pst.
x
-> ParseChunksState x inpBuf st pst
-> ParseChunksState x inpBuf st pst
ParseChunksYield (forall a b. b -> Either a b
Right b
b) (forall x inpBuf st pst.
inpBuf -> st -> ParseChunksState x inpBuf st pst
ParseChunksInit [Array a]
src s
s)
            PR.Error String
err -> do
                let next :: ParseChunksState x [a] st pst
next = forall x inpBuf st pst. inpBuf -> ParseChunksState x inpBuf st pst
ParseChunksInitLeftOver []
                forall (m :: * -> *) a. Monad m => a -> m a
return
                    forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
D.Skip
                    forall a b. (a -> b) -> a -> b
$ forall x inpBuf st pst.
x
-> ParseChunksState x inpBuf st pst
-> ParseChunksState x inpBuf st pst
ParseChunksYield (forall a b. a -> Either a b
Left (String -> ParseError
ParseError String
err)) forall {x} {a} {st} {pst}. ParseChunksState x [a] st pst
next

    -- This is a simplified ParseChunksBuf
    stepOuter State StreamK m a
_ (ParseChunksExtract [] [Array a]
buf s
pst) =
        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
D.Skip forall a b. (a -> b) -> a -> b
$ forall x inpBuf st pst.
inpBuf -> pst -> ParseChunksState x inpBuf st pst
ParseChunksStop [Array a]
buf s
pst

    stepOuter State StreamK m a
_ (ParseChunksExtract (Array a
x:[Array a]
xs) [Array a]
backBuf s
pst) = do
        Step s b
pRes <- s -> Array a -> m (Step s b)
pstep s
pst Array a
x
        case Step s b
pRes of
            PR.Partial Int
0 s
pst1 ->
                forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
D.Skip forall a b. (a -> b) -> a -> b
$ forall x inpBuf st pst.
inpBuf -> inpBuf -> pst -> ParseChunksState x inpBuf st pst
ParseChunksExtract [Array a]
xs [] s
pst1
            PR.Partial Int
n s
pst1 -> do
                forall a. HasCallStack => Bool -> a -> a
assert (Int
n forall a. Ord a => a -> a -> Bool
<= forall (t :: * -> *) a. (Foldable t, Num a) => t a -> a
sum (forall a b. (a -> b) -> [a] -> [b]
map forall a. Unbox a => Array a -> Int
Array.length (Array a
xforall a. a -> [a] -> [a]
:[Array a]
backBuf))) (forall (m :: * -> *) a. Monad m => a -> m a
return ())
                let src0 :: [Array a]
src0 = forall a. Unbox a => Int -> [Array a] -> [Array a]
takeArrayListRev Int
n (Array a
xforall a. a -> [a] -> [a]
:[Array a]
backBuf)
                    src :: [Array a]
src  = forall a. [a] -> [a]
Prelude.reverse [Array a]
src0 forall a. [a] -> [a] -> [a]
++ [Array a]
xs
                forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
D.Skip forall a b. (a -> b) -> a -> b
$ forall x inpBuf st pst.
inpBuf -> inpBuf -> pst -> ParseChunksState x inpBuf st pst
ParseChunksExtract [Array a]
src [] s
pst1
            PR.Continue Int
0 s
pst1 ->
                forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
D.Skip forall a b. (a -> b) -> a -> b
$ forall x inpBuf st pst.
inpBuf -> inpBuf -> pst -> ParseChunksState x inpBuf st pst
ParseChunksExtract [Array a]
xs (Array a
xforall a. a -> [a] -> [a]
:[Array a]
backBuf) s
pst1
            PR.Continue Int
n s
pst1 -> do
                forall a. HasCallStack => Bool -> a -> a
assert (Int
n forall a. Ord a => a -> a -> Bool
<= forall (t :: * -> *) a. (Foldable t, Num a) => t a -> a
sum (forall a b. (a -> b) -> [a] -> [b]
map forall a. Unbox a => Array a -> Int
Array.length (Array a
xforall a. a -> [a] -> [a]
:[Array a]
backBuf))) (forall (m :: * -> *) a. Monad m => a -> m a
return ())
                let ([Array a]
src0, [Array a]
buf1) = forall a. Unbox a => Int -> [Array a] -> ([Array a], [Array a])
splitAtArrayListRev Int
n (Array a
xforall a. a -> [a] -> [a]
:[Array a]
backBuf)
                    src :: [Array a]
src  = forall a. [a] -> [a]
Prelude.reverse [Array a]
src0 forall a. [a] -> [a] -> [a]
++ [Array a]
xs
                forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
D.Skip forall a b. (a -> b) -> a -> b
$ forall x inpBuf st pst.
inpBuf -> inpBuf -> pst -> ParseChunksState x inpBuf st pst
ParseChunksExtract [Array a]
src [Array a]
buf1 s
pst1
            PR.Done Int
0 b
b ->
                forall (m :: * -> *) a. Monad m => a -> m a
return
                    forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
D.Skip
                    forall a b. (a -> b) -> a -> b
$ forall x inpBuf st pst.
x
-> ParseChunksState x inpBuf st pst
-> ParseChunksState x inpBuf st pst
ParseChunksYield (forall a b. b -> Either a b
Right b
b) (forall x inpBuf st pst. inpBuf -> ParseChunksState x inpBuf st pst
ParseChunksInitBuf [Array a]
xs)
            PR.Done Int
n b
b -> do
                forall a. HasCallStack => Bool -> a -> a
assert (Int
n forall a. Ord a => a -> a -> Bool
<= forall (t :: * -> *) a. (Foldable t, Num a) => t a -> a
sum (forall a b. (a -> b) -> [a] -> [b]
map forall a. Unbox a => Array a -> Int
Array.length (Array a
xforall a. a -> [a] -> [a]
:[Array a]
backBuf))) (forall (m :: * -> *) a. Monad m => a -> m a
return ())
                let src0 :: [Array a]
src0 = forall a. Unbox a => Int -> [Array a] -> [Array a]
takeArrayListRev Int
n (Array a
xforall a. a -> [a] -> [a]
:[Array a]
backBuf)
                    src :: [Array a]
src = forall a. [a] -> [a]
Prelude.reverse [Array a]
src0 forall a. [a] -> [a] -> [a]
++ [Array a]
xs
                forall (m :: * -> *) a. Monad m => a -> m a
return
                    forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
D.Skip
                    forall a b. (a -> b) -> a -> b
$ forall x inpBuf st pst.
x
-> ParseChunksState x inpBuf st pst
-> ParseChunksState x inpBuf st pst
ParseChunksYield (forall a b. b -> Either a b
Right b
b) (forall x inpBuf st pst. inpBuf -> ParseChunksState x inpBuf st pst
ParseChunksInitBuf [Array a]
src)
            PR.Error String
err -> do
                let next :: ParseChunksState x [a] st pst
next = forall x inpBuf st pst. inpBuf -> ParseChunksState x inpBuf st pst
ParseChunksInitLeftOver []
                forall (m :: * -> *) a. Monad m => a -> m a
return
                    forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
D.Skip
                    forall a b. (a -> b) -> a -> b
$ forall x inpBuf st pst.
x
-> ParseChunksState x inpBuf st pst
-> ParseChunksState x inpBuf st pst
ParseChunksYield (forall a b. a -> Either a b
Left (String -> ParseError
ParseError String
err)) forall {x} {a} {st} {pst}. ParseChunksState x [a] st pst
next


    -- This is a simplified ParseChunksExtract
    stepOuter State StreamK m a
_ (ParseChunksStop [Array a]
backBuf s
pst) = do
        Step s b
pRes <- s -> m (Step s b)
extract s
pst
        case Step s b
pRes of
            PR.Partial Int
_ s
_ -> forall a. HasCallStack => String -> a
error String
"runArrayFoldManyD: Partial in extract"
            PR.Continue Int
0 s
pst1 ->
                forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
D.Skip forall a b. (a -> b) -> a -> b
$ forall x inpBuf st pst.
inpBuf -> pst -> ParseChunksState x inpBuf st pst
ParseChunksStop [Array a]
backBuf s
pst1
            PR.Continue Int
n s
pst1 -> do
                forall a. HasCallStack => Bool -> a -> a
assert (Int
n forall a. Ord a => a -> a -> Bool
<= forall (t :: * -> *) a. (Foldable t, Num a) => t a -> a
sum (forall a b. (a -> b) -> [a] -> [b]
map forall a. Unbox a => Array a -> Int
Array.length [Array a]
backBuf)) (forall (m :: * -> *) a. Monad m => a -> m a
return ())
                let ([Array a]
src0, [Array a]
buf1) = forall a. Unbox a => Int -> [Array a] -> ([Array a], [Array a])
splitAtArrayListRev Int
n [Array a]
backBuf
                    src :: [Array a]
src  = forall a. [a] -> [a]
Prelude.reverse [Array a]
src0
                forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
D.Skip forall a b. (a -> b) -> a -> b
$ forall x inpBuf st pst.
inpBuf -> inpBuf -> pst -> ParseChunksState x inpBuf st pst
ParseChunksExtract [Array a]
src [Array a]
buf1 s
pst1
            PR.Done Int
0 b
b ->
                forall (m :: * -> *) a. Monad m => a -> m a
return
                    forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
D.Skip
                    forall a b. (a -> b) -> a -> b
$ forall x inpBuf st pst.
x
-> ParseChunksState x inpBuf st pst
-> ParseChunksState x inpBuf st pst
ParseChunksYield (forall a b. b -> Either a b
Right b
b) (forall x inpBuf st pst. inpBuf -> ParseChunksState x inpBuf st pst
ParseChunksInitLeftOver [])
            PR.Done Int
n b
b -> do
                forall a. HasCallStack => Bool -> a -> a
assert (Int
n forall a. Ord a => a -> a -> Bool
<= forall (t :: * -> *) a. (Foldable t, Num a) => t a -> a
sum (forall a b. (a -> b) -> [a] -> [b]
map forall a. Unbox a => Array a -> Int
Array.length [Array a]
backBuf)) (forall (m :: * -> *) a. Monad m => a -> m a
return ())
                let src0 :: [Array a]
src0 = forall a. Unbox a => Int -> [Array a] -> [Array a]
takeArrayListRev Int
n [Array a]
backBuf
                    src :: [Array a]
src = forall a. [a] -> [a]
Prelude.reverse [Array a]
src0
                forall (m :: * -> *) a. Monad m => a -> m a
return
                    forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
D.Skip
                    forall a b. (a -> b) -> a -> b
$ forall x inpBuf st pst.
x
-> ParseChunksState x inpBuf st pst
-> ParseChunksState x inpBuf st pst
ParseChunksYield (forall a b. b -> Either a b
Right b
b) (forall x inpBuf st pst. inpBuf -> ParseChunksState x inpBuf st pst
ParseChunksInitBuf [Array a]
src)
            PR.Error String
err -> do
                let next :: ParseChunksState x [a] st pst
next = forall x inpBuf st pst. inpBuf -> ParseChunksState x inpBuf st pst
ParseChunksInitLeftOver []
                forall (m :: * -> *) a. Monad m => a -> m a
return
                    forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
D.Skip
                    forall a b. (a -> b) -> a -> b
$ forall x inpBuf st pst.
x
-> ParseChunksState x inpBuf st pst
-> ParseChunksState x inpBuf st pst
ParseChunksYield (forall a b. a -> Either a b
Left (String -> ParseError
ParseError String
err)) forall {x} {a} {st} {pst}. ParseChunksState x [a] st pst
next

    stepOuter State StreamK m a
_ (ParseChunksYield Either ParseError b
a ParseChunksState (Either ParseError b) [Array a] s s
next) = forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
D.Yield Either ParseError b
a ParseChunksState (Either ParseError b) [Array a] s s
next

-- | Apply an 'ChunkFold' repeatedly on an array stream and emit the
-- fold outputs in the output stream.
--
-- See "Streamly.Data.Stream.foldMany" for more details.
--
-- /Pre-release/
{-# INLINE runArrayFoldMany #-}
runArrayFoldMany
    :: (Monad m, Unbox a)
    => ChunkFold m a b
    -> StreamK m (Array a)
    -> StreamK m (Either ParseError b)
runArrayFoldMany :: forall (m :: * -> *) a b.
(Monad m, Unbox a) =>
ChunkFold m a b
-> StreamK m (Array a) -> StreamK m (Either ParseError b)
runArrayFoldMany ChunkFold m a b
p StreamK m (Array a)
m = forall (m :: * -> *) a. Monad m => Stream m a -> StreamK m a
fromStream forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b.
(Monad m, Unbox a) =>
ChunkFold m a b
-> Stream m (Array a) -> Stream m (Either ParseError b)
runArrayFoldManyD ChunkFold m a b
p (forall (m :: * -> *) a. Applicative m => StreamK m a -> Stream m a
toStream StreamK m (Array a)
m)