streamly-core-0.2.1: Streaming, parsers, arrays, serialization and more
Copyright(c) 2019 Composewell Technologies
LicenseBSD3-3-Clause
Maintainerstreamly@composewell.com
PortabilityGHC
Safe HaskellSafe-Inferred
LanguageHaskell2010

Streamly.Internal.Data.Array.Stream

Description

Combinators to efficiently manipulate streams of immutable arrays.

We can either push these in the MutArray module with a "chunks" prefix or keep this as a separate module and release it.

Synopsis

Creation

chunksOf :: forall m a. (MonadIO m, Unbox a) => Int -> Stream m a -> Stream m (Array a) Source #

chunksOf n stream groups the elements in the input stream into arrays of n elements each.

Same as the following but may be more efficient:

>>> chunksOf n = Stream.foldMany (Array.writeN n)

Pre-release

pinnedChunksOf :: forall m a. (MonadIO m, Unbox a) => Int -> Stream m a -> Stream m (Array a) Source #

Like chunksOf but creates pinned arrays.

bufferChunks :: (MonadIO m, Unbox a) => Stream m a -> m (StreamK m (Array a)) Source #

Flattening to elements

concat :: (Monad m, Unbox a) => Stream m (Array a) -> Stream m a Source #

Convert a stream of arrays into a stream of their elements.

Same as the following:

concat = Stream.unfoldMany Array.read

Since: 0.7.0

flattenArrays :: forall m a. (MonadIO m, Unbox a) => Stream m (Array a) -> Stream m a Source #

Use the "read" unfold instead.

flattenArrays = unfoldMany read

We can try this if there are any fusion issues in the unfold.

concatRev :: (Monad m, Unbox a) => Stream m (Array a) -> Stream m a Source #

Convert a stream of arrays into a stream of their elements reversing the contents of each array before flattening.

concatRev = Stream.unfoldMany Array.readerRev

Since: 0.7.0

flattenArraysRev :: forall m a. (MonadIO m, Unbox a) => Stream m (Array a) -> Stream m a Source #

Use the "readRev" unfold instead.

flattenArrays = unfoldMany readRev

We can try this if there are any fusion issues in the unfold.

interpose :: (Monad m, Unbox a) => a -> Stream m (Array a) -> Stream m a Source #

Flatten a stream of arrays after inserting the given element between arrays.

Pre-release

interposeSuffix :: (Monad m, Unbox a) => a -> Stream m (Array a) -> Stream m a Source #

Flatten a stream of arrays appending the given element after each array.

Since: 0.7.0

intercalateSuffix :: (Monad m, Unbox a) => Array a -> Stream m (Array a) -> Stream m a Source #

unlines :: forall m a. (MonadIO m, Unbox a) => a -> Stream m (Array a) -> Stream m a Source #

Elimination

Element Folds

foldBreak :: (MonadIO m, Unbox a) => Fold m a b -> StreamK m (Array a) -> m (b, StreamK m (Array a)) Source #

Fold an array stream using the supplied Fold. Returns the fold result and the unconsumed stream.

foldBreak f = runArrayFoldBreak (ChunkFold.fromFold f)

Internal

foldBreakD :: forall m a b. (MonadIO m, Unbox a) => Fold m a b -> Stream m (Array a) -> m (b, Stream m (Array a)) Source #

parseBreak :: (MonadIO m, Unbox a) => Parser a m b -> StreamK m (Array a) -> m (Either ParseError b, StreamK m (Array a)) Source #

Parse an array stream using the supplied Parser. Returns the parse result and the unconsumed stream. Throws ParseError if the parse fails.

Internal

parseBreakChunks :: (Monad m, Unbox a) => ParserK (Array a) m b -> StreamK m (Array a) -> m (Either ParseError b, StreamK m (Array a)) Source #

Run a ParserK over a chunked StreamK and return the parse result and the remaining Stream.

parseChunks :: (Monad m, Unbox a) => ParserK (Array a) m b -> StreamK m (Array a) -> m (Either ParseError b) Source #

Array Folds

runArrayFold :: (MonadIO m, Unbox a) => ChunkFold m a b -> StreamK m (Array a) -> m (Either ParseError b) Source #

Fold an array stream using the supplied array stream Fold.

Pre-release

runArrayFoldBreak :: (MonadIO m, Unbox a) => ChunkFold m a b -> StreamK m (Array a) -> m (Either ParseError b, StreamK m (Array a)) Source #

Like fold but also returns the remaining stream.

Pre-release

runArrayParserDBreak :: forall m a b. (MonadIO m, Unbox a) => Parser (Array a) m b -> Stream m (Array a) -> m (Either ParseError b, Stream m (Array a)) Source #

Note that this is not the same as using a Parser (Array a) m b with the regular "Streamly.Internal.Data.IsStream.parse" function. The regular parse would consume the input arrays as single unit. This parser parses in the way as described in the ChunkFold module. The input arrays are treated as n element units and can be consumed partially. The remaining elements are inserted in the source stream as an array.

runArrayFoldMany :: (Monad m, Unbox a) => ChunkFold m a b -> StreamK m (Array a) -> StreamK m (Either ParseError b) Source #

Apply an ChunkFold repeatedly on an array stream and emit the fold outputs in the output stream.

See "Streamly.Data.Stream.foldMany" for more details.

Pre-release

toArray :: (MonadIO m, Unbox a) => Stream m (Array a) -> m (Array a) Source #

Given a stream of arrays, splice them all together to generate a single array. The stream must be finite.

Since: 0.7.0

Compaction

lpackArraysChunksOf :: (MonadIO m, Unbox a) => Int -> Fold m (Array a) () -> Fold m (Array a) () Source #

compact :: (MonadIO m, Unbox a) => Int -> Stream m (Array a) -> Stream m (Array a) Source #

Coalesce adjacent arrays in incoming stream to form bigger arrays of a maximum specified size in bytes.

Since: 0.7.0

Splitting

splitOn :: MonadIO m => Word8 -> Stream m (Array Word8) -> Stream m (Array Word8) Source #

Split a stream of arrays on a given separator byte, dropping the separator and coalescing all the arrays between two separators into a single array.

Since: 0.7.0