-- |
-- Module      : Streamly.Internal.Data.Array.Stream.Mut.Foreign
-- Copyright   : (c) 2019 Composewell Technologies
-- License     : BSD3-3-Clause
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC
--
-- Combinators to efficiently manipulate streams of mutable arrays.
--
module Streamly.Internal.Data.Array.Stream.Mut.Foreign
    (
    -- * Generation
      arraysOf

    -- * Compaction
    , packArraysChunksOf
    , SpliceState (..)
    , lpackArraysChunksOf
    , compact
    , compactLE
    , compactEQ
    , compactGE
    )
where

#include "inline.hs"
#include "ArrayMacros.h"

import Control.Monad.IO.Class (MonadIO(..))
import Control.Monad (when)
import Control.Monad.Catch (MonadThrow)
import Data.Bifunctor (first)
import Foreign.Storable (Storable(..))
import Streamly.Internal.Data.Array.Foreign.Mut.Type (Array(..))
import Streamly.Internal.Data.Fold.Type (Fold(..))
import Streamly.Internal.Data.Stream.Serial (SerialT(..))
import Streamly.Internal.Data.Tuple.Strict (Tuple'(..))

import qualified Streamly.Internal.Data.Array.Foreign.Mut.Type as MArray
import qualified Streamly.Internal.Data.Fold.Type as FL
import qualified Streamly.Internal.Data.Stream.StreamD as D
import qualified Streamly.Internal.Data.Parser.ParserD as ParserD

-- | @arraysOf n stream@ groups the elements in the input stream into arrays of
-- @n@ elements each.
--
-- Same as the following but may be more efficient:
--
-- > arraysOf n = Stream.foldMany (MArray.writeN n)
--
-- /Pre-release/
{-# INLINE arraysOf #-}
arraysOf :: (MonadIO m, Storable a)
    => Int -> SerialT m a -> SerialT m (Array a)
arraysOf :: forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Int -> SerialT m a -> SerialT m (Array a)
arraysOf Int
n (SerialT Stream m a
xs) =
    forall (m :: * -> *) a. Stream m a -> SerialT m a
SerialT forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Monad m => Stream m a -> Stream m a
D.toStreamK forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Int -> Stream m a -> Stream m (Array a)
MArray.arraysOf Int
n forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Applicative m => Stream m a -> Stream m a
D.fromStreamK Stream m a
xs

-------------------------------------------------------------------------------
-- Compact
-------------------------------------------------------------------------------

data SpliceState s arr
    = SpliceInitial s
    | SpliceBuffering s arr
    | SpliceYielding arr (SpliceState s arr)
    | SpliceFinish

-- XXX This can be removed once compactLEFold/compactLE are implemented.
--
-- | This mutates the first array (if it has space) to append values from the
-- second one. This would work for immutable arrays as well because an
-- immutable array never has space so a new array is allocated instead of
-- mutating it.
--
-- | Coalesce adjacent arrays in incoming stream to form bigger arrays of a
-- maximum specified size. Note that if a single array is bigger than the
-- specified size we do not split it to fit. When we coalesce multiple arrays
-- if the size would exceed the specified size we do not coalesce therefore the
-- actual array size may be less than the specified chunk size.
--
-- @since 0.7.0
{-# INLINE_NORMAL packArraysChunksOf #-}
packArraysChunksOf :: (MonadIO m, Storable a)
    => Int -> D.Stream m (Array a) -> D.Stream m (Array a)
packArraysChunksOf :: forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Int -> Stream m (Array a) -> Stream m (Array a)
packArraysChunksOf Int
n (D.Stream State Stream m (Array a) -> s -> m (Step s (Array a))
step s
state) =
    forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream State Stream m (Array a)
-> SpliceState s (Array a)
-> m (Step (SpliceState s (Array a)) (Array a))
step' (forall s arr. s -> SpliceState s arr
SpliceInitial s
state)

    where

    {-# INLINE_LATE step' #-}
    step' :: State Stream m (Array a)
-> SpliceState s (Array a)
-> m (Step (SpliceState s (Array a)) (Array a))
step' State Stream m (Array a)
gst (SpliceInitial s
st) = do
        forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
n forall a. Ord a => a -> a -> Bool
<= Int
0) forall a b. (a -> b) -> a -> b
$
            -- XXX we can pass the module string from the higher level API
            forall a. HasCallStack => [Char] -> a
error forall a b. (a -> b) -> a -> b
$ [Char]
"Streamly.Internal.Data.Array.Foreign.Mut.Type.packArraysChunksOf: the size of "
                 forall a. [a] -> [a] -> [a]
++ [Char]
"arrays [" forall a. [a] -> [a] -> [a]
++ forall a. Show a => a -> [Char]
show Int
n forall a. [a] -> [a] -> [a]
++ [Char]
"] must be a natural number"
        Step s (Array a)
r <- State Stream m (Array a) -> s -> m (Step s (Array a))
step State Stream m (Array a)
gst s
st
        case Step s (Array a)
r of
            D.Yield Array a
arr s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$
                let len :: Int
len = forall a. Array a -> Int
MArray.byteLength Array a
arr
                 in if Int
len forall a. Ord a => a -> a -> Bool
>= Int
n
                    then forall s a. s -> Step s a
D.Skip (forall s arr. arr -> SpliceState s arr -> SpliceState s arr
SpliceYielding Array a
arr (forall s arr. s -> SpliceState s arr
SpliceInitial s
s))
                    else forall s a. s -> Step s a
D.Skip (forall s arr. s -> arr -> SpliceState s arr
SpliceBuffering s
s Array a
arr)
            D.Skip s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
D.Skip (forall s arr. s -> SpliceState s arr
SpliceInitial s
s)
            Step s (Array a)
D.Stop -> forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
D.Stop

    step' State Stream m (Array a)
gst (SpliceBuffering s
st Array a
buf) = do
        Step s (Array a)
r <- State Stream m (Array a) -> s -> m (Step s (Array a))
step State Stream m (Array a)
gst s
st
        case Step s (Array a)
r of
            D.Yield Array a
arr s
s -> do
                let len :: Int
len = forall a. Array a -> Int
MArray.byteLength Array a
buf forall a. Num a => a -> a -> a
+ forall a. Array a -> Int
MArray.byteLength Array a
arr
                if Int
len forall a. Ord a => a -> a -> Bool
> Int
n
                then forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$
                    forall s a. s -> Step s a
D.Skip (forall s arr. arr -> SpliceState s arr -> SpliceState s arr
SpliceYielding Array a
buf (forall s arr. s -> arr -> SpliceState s arr
SpliceBuffering s
s Array a
arr))
                else do
                    Array a
buf' <- if forall a. Array a -> Int
MArray.byteCapacity Array a
buf forall a. Ord a => a -> a -> Bool
< Int
n
                            then forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Int -> Array a -> m (Array a)
MArray.realloc Int
n Array a
buf
                            else forall (m :: * -> *) a. Monad m => a -> m a
return Array a
buf
                    Array a
buf'' <- forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Array a -> Array a -> m (Array a)
MArray.splice Array a
buf' Array a
arr
                    forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
D.Skip (forall s arr. s -> arr -> SpliceState s arr
SpliceBuffering s
s Array a
buf'')
            D.Skip s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
D.Skip (forall s arr. s -> arr -> SpliceState s arr
SpliceBuffering s
s Array a
buf)
            Step s (Array a)
D.Stop -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
D.Skip (forall s arr. arr -> SpliceState s arr -> SpliceState s arr
SpliceYielding Array a
buf forall s arr. SpliceState s arr
SpliceFinish)

    step' State Stream m (Array a)
_ SpliceState s (Array a)
SpliceFinish = forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
D.Stop

    step' State Stream m (Array a)
_ (SpliceYielding Array a
arr SpliceState s (Array a)
next) = forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
D.Yield Array a
arr SpliceState s (Array a)
next

-- XXX Remove this once compactLEFold is implemented
-- lpackArraysChunksOf = Fold.many compactLEFold
--
{-# INLINE_NORMAL lpackArraysChunksOf #-}
lpackArraysChunksOf :: (MonadIO m, Storable a)
    => Int -> Fold m (Array a) () -> Fold m (Array a) ()
lpackArraysChunksOf :: forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Int -> Fold m (Array a) () -> Fold m (Array a) ()
lpackArraysChunksOf Int
n (Fold s -> Array a -> m (Step s ())
step1 m (Step s ())
initial1 s -> m ()
extract1) =
    forall (m :: * -> *) a b s.
(s -> a -> m (Step s b))
-> m (Step s b) -> (s -> m b) -> Fold m a b
Fold Tuple' (Maybe (Array a)) s
-> Array a -> m (Step (Tuple' (Maybe (Array a)) s) ())
step forall {a}. m (Step (Tuple' (Maybe a) s) ())
initial Tuple' (Maybe (Array a)) s -> m ()
extract

    where

    initial :: m (Step (Tuple' (Maybe a) s) ())
initial = do
        forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
n forall a. Ord a => a -> a -> Bool
<= Int
0) forall a b. (a -> b) -> a -> b
$
            -- XXX we can pass the module string from the higher level API
            forall a. HasCallStack => [Char] -> a
error forall a b. (a -> b) -> a -> b
$ [Char]
"Streamly.Internal.Data.Array.Foreign.Mut.Type.packArraysChunksOf: the size of "
                 forall a. [a] -> [a] -> [a]
++ [Char]
"arrays [" forall a. [a] -> [a] -> [a]
++ forall a. Show a => a -> [Char]
show Int
n forall a. [a] -> [a] -> [a]
++ [Char]
"] must be a natural number"

        Step s ()
r <- m (Step s ())
initial1
        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall (p :: * -> * -> *) a b c.
Bifunctor p =>
(a -> b) -> p a c -> p b c
first (forall a b. a -> b -> Tuple' a b
Tuple' forall a. Maybe a
Nothing) Step s ()
r

    extract :: Tuple' (Maybe (Array a)) s -> m ()
extract (Tuple' Maybe (Array a)
Nothing s
r1) = s -> m ()
extract1 s
r1
    extract (Tuple' (Just Array a
buf) s
r1) = do
        Step s ()
r <- s -> Array a -> m (Step s ())
step1 s
r1 Array a
buf
        case Step s ()
r of
            FL.Partial s
rr -> s -> m ()
extract1 s
rr
            FL.Done ()
_ -> forall (m :: * -> *) a. Monad m => a -> m a
return ()

    step :: Tuple' (Maybe (Array a)) s
-> Array a -> m (Step (Tuple' (Maybe (Array a)) s) ())
step (Tuple' Maybe (Array a)
Nothing s
r1) Array a
arr =
            let len :: Int
len = forall a. Array a -> Int
MArray.byteLength Array a
arr
             in if Int
len forall a. Ord a => a -> a -> Bool
>= Int
n
                then do
                    Step s ()
r <- s -> Array a -> m (Step s ())
step1 s
r1 Array a
arr
                    case Step s ()
r of
                        FL.Done ()
_ -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s b. b -> Step s b
FL.Done ()
                        FL.Partial s
s -> do
                            s -> m ()
extract1 s
s
                            Step s ()
res <- m (Step s ())
initial1
                            forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall (p :: * -> * -> *) a b c.
Bifunctor p =>
(a -> b) -> p a c -> p b c
first (forall a b. a -> b -> Tuple' a b
Tuple' forall a. Maybe a
Nothing) Step s ()
res
                else forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s b. s -> Step s b
FL.Partial forall a b. (a -> b) -> a -> b
$ forall a b. a -> b -> Tuple' a b
Tuple' (forall a. a -> Maybe a
Just Array a
arr) s
r1

    step (Tuple' (Just Array a
buf) s
r1) Array a
arr = do
            let len :: Int
len = forall a. Array a -> Int
MArray.byteLength Array a
buf forall a. Num a => a -> a -> a
+ forall a. Array a -> Int
MArray.byteLength Array a
arr
            Array a
buf' <- if forall a. Array a -> Int
MArray.byteCapacity Array a
buf forall a. Ord a => a -> a -> Bool
< Int
len
                    then forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Int -> Array a -> m (Array a)
MArray.realloc (forall a. Ord a => a -> a -> a
max Int
n Int
len) Array a
buf
                    else forall (m :: * -> *) a. Monad m => a -> m a
return Array a
buf
            Array a
buf'' <- forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Array a -> Array a -> m (Array a)
MArray.splice Array a
buf' Array a
arr

            -- XXX this is common in both the equations of step
            if Int
len forall a. Ord a => a -> a -> Bool
>= Int
n
            then do
                Step s ()
r <- s -> Array a -> m (Step s ())
step1 s
r1 Array a
buf''
                case Step s ()
r of
                    FL.Done ()
_ -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s b. b -> Step s b
FL.Done ()
                    FL.Partial s
s -> do
                        s -> m ()
extract1 s
s
                        Step s ()
res <- m (Step s ())
initial1
                        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall (p :: * -> * -> *) a b c.
Bifunctor p =>
(a -> b) -> p a c -> p b c
first (forall a b. a -> b -> Tuple' a b
Tuple' forall a. Maybe a
Nothing) Step s ()
res
            else forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s b. s -> Step s b
FL.Partial forall a b. (a -> b) -> a -> b
$ forall a b. a -> b -> Tuple' a b
Tuple' (forall a. a -> Maybe a
Just Array a
buf'') s
r1

-- XXX Same as compactLE, to be removed once that is implemented.
--
-- | Coalesce adjacent arrays in incoming stream to form bigger arrays of a
-- maximum specified size in bytes.
--
-- /Internal/
{-# INLINE compact #-}
compact :: (MonadIO m, Storable a)
    => Int -> SerialT m (Array a) -> SerialT m (Array a)
compact :: forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Int -> SerialT m (Array a) -> SerialT m (Array a)
compact Int
n (SerialT Stream m (Array a)
xs) =
    forall (m :: * -> *) a. Stream m a -> SerialT m a
SerialT forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Monad m => Stream m a -> Stream m a
D.toStreamK forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Int -> Stream m (Array a) -> Stream m (Array a)
packArraysChunksOf Int
n (forall (m :: * -> *) a. Applicative m => Stream m a -> Stream m a
D.fromStreamK Stream m (Array a)
xs)

-- | Coalesce adjacent arrays in incoming stream to form bigger arrays of a
-- maximum specified size. Note that if a single array is bigger than the
-- specified size we do not split it to fit. When we coalesce multiple arrays
-- if the size would exceed the specified size we do not coalesce therefore the
-- actual array size may be less than the specified chunk size.
--
-- /Internal/
{-# INLINE_NORMAL compactLEParserD #-}
compactLEParserD ::
       forall m a. (MonadThrow m, MonadIO m, Storable a)
    => Int -> ParserD.Parser m (Array a) (Array a)
compactLEParserD :: forall (m :: * -> *) a.
(MonadThrow m, MonadIO m, Storable a) =>
Int -> Parser m (Array a) (Array a)
compactLEParserD Int
n = forall (m :: * -> *) a b s.
(s -> a -> m (Step s b))
-> m (Initial s b) -> (s -> m b) -> Parser m a b
ParserD.Parser forall {m :: * -> *} {a}.
(MonadIO m, Storable a) =>
Maybe (Array a) -> Array a -> m (Step (Maybe (Array a)) (Array a))
step forall {a} {b}. m (Initial (Maybe a) b)
initial forall {m :: * -> *} {a}. Monad m => Maybe (Array a) -> m (Array a)
extract

    where

    nBytes :: Int
nBytes = Int
n forall a. Num a => a -> a -> a
* SIZE_OF(a)

    initial :: m (Initial (Maybe a) b)
initial =
        forall (m :: * -> *) a. Monad m => a -> m a
return
            forall a b. (a -> b) -> a -> b
$ if Int
n forall a. Ord a => a -> a -> Bool
<= Int
0
              then forall a. HasCallStack => [Char] -> a
error
                       forall a b. (a -> b) -> a -> b
$ [Char]
functionPath
                       forall a. [a] -> [a] -> [a]
++ [Char]
": the size of arrays ["
                       forall a. [a] -> [a] -> [a]
++ forall a. Show a => a -> [Char]
show Int
n forall a. [a] -> [a] -> [a]
++ [Char]
"] must be a natural number"
              else forall s b. s -> Initial s b
ParserD.IPartial forall a. Maybe a
Nothing

    step :: Maybe (Array a) -> Array a -> m (Step (Maybe (Array a)) (Array a))
step Maybe (Array a)
Nothing Array a
arr =
        forall (m :: * -> *) a. Monad m => a -> m a
return
            forall a b. (a -> b) -> a -> b
$ let len :: Int
len = forall a. Array a -> Int
MArray.byteLength Array a
arr
               in if Int
len forall a. Ord a => a -> a -> Bool
>= Int
nBytes
                  then forall s b. Int -> b -> Step s b
ParserD.Done Int
0 Array a
arr
                  else forall s b. Int -> s -> Step s b
ParserD.Partial Int
0 (forall a. a -> Maybe a
Just Array a
arr)
    step (Just Array a
buf) Array a
arr =
        let len :: Int
len = forall a. Array a -> Int
MArray.byteLength Array a
buf forall a. Num a => a -> a -> a
+ forall a. Array a -> Int
MArray.byteLength Array a
arr
         in if Int
len forall a. Ord a => a -> a -> Bool
> Int
nBytes
            then forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s b. Int -> b -> Step s b
ParserD.Done Int
1 Array a
buf
            else do
                Array a
buf1 <-
                    if forall a. Array a -> Int
MArray.byteCapacity Array a
buf forall a. Ord a => a -> a -> Bool
< Int
nBytes
                    then forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Int -> Array a -> m (Array a)
MArray.realloc Int
nBytes Array a
buf
                    else forall (m :: * -> *) a. Monad m => a -> m a
return Array a
buf
                Array a
buf2 <- forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Array a -> Array a -> m (Array a)
MArray.splice Array a
buf1 Array a
arr
                forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s b. Int -> s -> Step s b
ParserD.Partial Int
0 (forall a. a -> Maybe a
Just Array a
buf2)

    extract :: Maybe (Array a) -> m (Array a)
extract Maybe (Array a)
Nothing = forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Array a
MArray.nil
    extract (Just Array a
buf) = forall (m :: * -> *) a. Monad m => a -> m a
return Array a
buf

    functionPath :: [Char]
functionPath =
        [Char]
"Streamly.Internal.Data.Array.Stream.Mut.Foreign.compactLEParserD"

-- | Coalesce adjacent arrays in incoming stream to form bigger arrays of a
-- minimum specified size. Note that if all the arrays in the stream together
-- are smaller than the specified size the resulting array will be smaller than
-- the specified size. When we coalesce multiple arrays if the size would exceed
-- the specified size we stop coalescing further.
--
-- /Internal/
{-# INLINE_NORMAL compactGEFold #-}
compactGEFold ::
       forall m a. (MonadIO m, Storable a)
    => Int -> FL.Fold m (Array a) (Array a)
compactGEFold :: forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Int -> Fold m (Array a) (Array a)
compactGEFold Int
n = forall (m :: * -> *) a b s.
(s -> a -> m (Step s b))
-> m (Step s b) -> (s -> m b) -> Fold m a b
Fold forall {m :: * -> *} {a}.
(MonadIO m, Storable a) =>
Maybe (Array a) -> Array a -> m (Step (Maybe (Array a)) (Array a))
step forall {a} {b}. m (Step (Maybe a) b)
initial forall {m :: * -> *} {a}. Monad m => Maybe (Array a) -> m (Array a)
extract

    where

    nBytes :: Int
nBytes = Int
n forall a. Num a => a -> a -> a
* SIZE_OF(a)

    initial :: m (Step (Maybe a) b)
initial =
        forall (m :: * -> *) a. Monad m => a -> m a
return
            forall a b. (a -> b) -> a -> b
$ if Int
n forall a. Ord a => a -> a -> Bool
< Int
0
              then forall a. HasCallStack => [Char] -> a
error
                       forall a b. (a -> b) -> a -> b
$ [Char]
functionPath
                       forall a. [a] -> [a] -> [a]
++ [Char]
": the size of arrays ["
                       forall a. [a] -> [a] -> [a]
++ forall a. Show a => a -> [Char]
show Int
n forall a. [a] -> [a] -> [a]
++ [Char]
"] must be a natural number"
              else forall s b. s -> Step s b
FL.Partial forall a. Maybe a
Nothing

    step :: Maybe (Array a) -> Array a -> m (Step (Maybe (Array a)) (Array a))
step Maybe (Array a)
Nothing Array a
arr =
        forall (m :: * -> *) a. Monad m => a -> m a
return
            forall a b. (a -> b) -> a -> b
$ let len :: Int
len = forall a. Array a -> Int
MArray.byteLength Array a
arr
               in if Int
len forall a. Ord a => a -> a -> Bool
>= Int
nBytes
                  then forall s b. b -> Step s b
FL.Done Array a
arr
                  else forall s b. s -> Step s b
FL.Partial (forall a. a -> Maybe a
Just Array a
arr)
    step (Just Array a
buf) Array a
arr = do
        let len :: Int
len = forall a. Array a -> Int
MArray.byteLength Array a
buf forall a. Num a => a -> a -> a
+ forall a. Array a -> Int
MArray.byteLength Array a
arr
        Array a
buf1 <-
            if forall a. Array a -> Int
MArray.byteCapacity Array a
buf forall a. Ord a => a -> a -> Bool
< Int
len
            then forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Int -> Array a -> m (Array a)
MArray.realloc (forall a. Ord a => a -> a -> a
max Int
len Int
nBytes) Array a
buf
            else forall (m :: * -> *) a. Monad m => a -> m a
return Array a
buf
        Array a
buf2 <- forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Array a -> Array a -> m (Array a)
MArray.splice Array a
buf1 Array a
arr
        if Int
len forall a. Ord a => a -> a -> Bool
>= Int
n
        then forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s b. b -> Step s b
FL.Done Array a
buf2
        else forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s b. s -> Step s b
FL.Partial (forall a. a -> Maybe a
Just Array a
buf2)

    extract :: Maybe (Array a) -> m (Array a)
extract Maybe (Array a)
Nothing = forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Array a
MArray.nil
    extract (Just Array a
buf) = forall (m :: * -> *) a. Monad m => a -> m a
return Array a
buf

    functionPath :: [Char]
functionPath =
        [Char]
"Streamly.Internal.Data.Array.Stream.Mut.Foreign.compactGEFold"

-- | Coalesce adjacent arrays in incoming stream to form bigger arrays of a
-- maximum specified size in bytes.
--
-- /Internal/
compactLE :: (MonadThrow m, MonadIO m, Storable a) =>
    Int -> SerialT m (Array a) -> SerialT m (Array a)
compactLE :: forall (m :: * -> *) a.
(MonadThrow m, MonadIO m, Storable a) =>
Int -> SerialT m (Array a) -> SerialT m (Array a)
compactLE Int
n (SerialT Stream m (Array a)
xs) =
    forall (m :: * -> *) a. Stream m a -> SerialT m a
SerialT forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Monad m => Stream m a -> Stream m a
D.toStreamK forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b.
MonadThrow m =>
Parser m a b -> Stream m a -> Stream m b
D.parseMany (forall (m :: * -> *) a.
(MonadThrow m, MonadIO m, Storable a) =>
Int -> Parser m (Array a) (Array a)
compactLEParserD Int
n) (forall (m :: * -> *) a. Applicative m => Stream m a -> Stream m a
D.fromStreamK Stream m (Array a)
xs)

-- | Like 'compactLE' but generates arrays of exactly equal to the size
-- specified except for the last array in the stream which could be shorter.
--
-- /Unimplemented/
{-# INLINE compactEQ #-}
compactEQ :: -- (MonadIO m, Storable a) =>
    Int -> SerialT m (Array a) -> SerialT m (Array a)
compactEQ :: forall (m :: * -> *) a.
Int -> SerialT m (Array a) -> SerialT m (Array a)
compactEQ Int
_n SerialT m (Array a)
_xs = forall a. HasCallStack => a
undefined
    -- IsStream.fromStreamD $ D.foldMany (compactEQFold n) (IsStream.toStreamD xs)

-- | Like 'compactLE' but generates arrays of size greater than or equal to the
-- specified except for the last array in the stream which could be shorter.
--
-- /Internal/
{-# INLINE compactGE #-}
compactGE ::
       (MonadIO m, Storable a)
    => Int -> SerialT m (Array a) -> SerialT m (Array a)
compactGE :: forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Int -> SerialT m (Array a) -> SerialT m (Array a)
compactGE Int
n (SerialT Stream m (Array a)
xs) =
     forall (m :: * -> *) a. Stream m a -> SerialT m a
SerialT forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Monad m => Stream m a -> Stream m a
D.toStreamK forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> Stream m b
D.foldMany (forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Int -> Fold m (Array a) (Array a)
compactGEFold Int
n) (forall (m :: * -> *) a. Applicative m => Stream m a -> Stream m a
D.fromStreamK Stream m (Array a)
xs)