Z-IO-2.0.0.0: Simple and high performance IO toolkit for Haskell
Copyright(c) Dong Han 2017-2020
LicenseBSD
Maintainerwinterland1989@gmail.com
Stabilityexperimental
Portabilitynon-portable
Safe HaskellSafe-Inferred
LanguageHaskell2010

Z.IO.BIO.Base

Description

This module provides BIO (block IO) type to facilitate writing streaming programs. A BIO node usually:

  • Process input in unit of block(or item).
  • Running in constant spaces, which means the memory usage won't accumulate.
  • Keep some state in IO, which is sealed in BIO closure.
Synopsis

The BIO type

type BIO inp out Source #

Arguments

 = (Maybe out -> IO ())

Pass EOF to indicate current node reaches EOF

-> Maybe inp

EOF indicates upstream reaches EOF

-> IO () 

A BIO(blocked IO) node.

A BIO node is a push based stream transformer. It can be used to describe different kinds of IO devices:

  • BIO inp out describe an IO state machine(e.g. z_stream in zlib), which takes some input in block, then outputs.
  • type Source out = BIO Void out described an IO source, which never takes input, but gives output until EOF by looping.
  • type Sink inp = BIO inp Void described an IO sink, which takes input and perform some IO effects, such as writing to terminal or files.

You can connect these BIO nodes with . which connect left node's output to right node's input, and return a new BIO node with left node's input type and right node's output type.

You can run a BIO node in different ways:

  • step/step_ to supply a single chunk of input and step the BIO node.
  • run/run_ will supply EOF directly, which will effectively pull all values from source, and push to sink until source reaches EOF.
  • runBlock/runBlock_ will supply a single block of input as whole input and run the BIO node.
  • runBlocks/runBlocks_ will supply a list of blocks as whole input and run the BIO node.

Note BIO usually contains some IO states, you can consider it as an opaque IORef:

  • You shouldn't use a BIO node across multiple BIO chain unless the state can be reset.
  • You shouldn't use a BIO node across multiple threads unless document states otherwise.

BIO is simply a convenient way to construct single-thread streaming computation, to use BIO in multiple threads, check Z.IO.BIO.Concurrent module.

pattern EOF :: Maybe a Source #

Patterns for more meaningful pattern matching.

type Source x = BIO Void x Source #

Type alias for BIO node which never takes input.

Note when implement a Source, you should assume EOF argument is supplied only once, and you should loop to call downstream continuation with all available chunks, then write a final EOF to indicate EOF.

type Sink x = BIO x () Source #

Type alias for BIO node which only takes input and perform effects.

Note when implement a Sink, you should assume EOF argument is supplied only once(when upstream reaches EOF), you do not need to call downstream continuation before EOF, and do a flush(also write a final EOF) when upstream reach EOF.

Basic combinators

appendSource :: HasCallStack => Source a -> Source a -> Source a Source #

Connect two BIO source, after first reach EOF, draw elements from second.

concatSource :: HasCallStack => [Source a] -> Source a Source #

Connect list of BIO sources, after one reach EOF, draw element from next.

concatSource' :: HasCallStack => Source (Source a) -> Source a Source #

Connect list of BIO sources, after one reach EOF, draw element from next.

joinSink :: HasCallStack => Sink out -> Sink out -> Sink out Source #

Fuse two BIO sinks, i.e. everything written to the fused sink will be written to left and right sink.

Flush result BIO will effectively flush both sink.

fuseSink :: HasCallStack => [Sink out] -> Sink out Source #

Fuse a list of BIO sinks, everything written to the fused sink will be written to every sink in the list.

Flush result BIO will effectively flush every sink in the list.

Run BIO chain

discard :: a -> IO () Source #

Discards a value.

step :: HasCallStack => BIO inp out -> inp -> IO [out] Source #

Supply a single chunk of input to a BIO and collect result.

step_ :: HasCallStack => BIO inp out -> inp -> IO () Source #

Supply a single chunk of input to a BIO without collecting result.

run :: HasCallStack => BIO inp out -> IO [out] Source #

Run a BIO loop without providing input, and collect result.

When used on Source, it will collect all input chunks.

run_ :: HasCallStack => BIO inp out -> IO () Source #

Run a BIO loop without providing input.

When used on Source, it starts the streaming loop. When used on Sink, it performs a flush.

runBlock :: HasCallStack => BIO inp out -> inp -> IO [out] Source #

Run a BIO loop with a single chunk of input and EOF, and collect result.

runBlock_ :: HasCallStack => BIO inp out -> inp -> IO () Source #

Run a BIO loop with a single chunk of input and EOF, without collecting result.

unsafeRunBlock :: HasCallStack => IO (BIO inp out) -> inp -> [out] Source #

Wrap runBlock into a pure interface.

You can wrap a stateful BIO computation(including the creation of BIO node), when you can guarantee a computation is pure, e.g. compressing, decoding, etc.

runBlocks :: HasCallStack => BIO inp out -> [inp] -> IO [out] Source #

Supply blocks of input and EOF to a BIO, and collect results.

Note many BIO node will be closed or not be able to take new input after drained.

runBlocks_ :: HasCallStack => BIO inp out -> [inp] -> IO () Source #

Supply blocks of input and EOF to a BIO, without collecting results.

Note many BIO node will be closed or not be able to take new input after drained.

unsafeRunBlocks :: HasCallStack => IO (BIO inp out) -> [inp] -> [out] Source #

Wrap runBlocks into a pure interface.

Similar to unsafeRunBlock, but with a list of input blocks.

Make new BIO

fromPure :: (a -> b) -> BIO a b Source #

BIO node from a pure function.

BIO node made with this funtion are stateless, thus can be reused across chains.

fromIO :: HasCallStack => (a -> IO b) -> BIO a b Source #

BIO node from an IO function.

BIO node made with this funtion may not be stateless, it depends on if the IO function use IO state.

filter :: (a -> Bool) -> BIO a a Source #

BIO node from a pure filter.

BIO node made with this funtion are stateless, thus can be reused across chains.

filterIO :: (a -> IO Bool) -> BIO a a Source #

BIO node from an impure filter.

BIO node made with this funtion may not be stateless, it depends on if the IO function use

Use with fold

fold' :: Fold a b -> Source a -> IO b Source #

Run a strict fold over a source with Fold.

foldIO' :: FoldM IO a b -> Source a -> IO b Source #

Run a strict fold over a source with FoldM.

Source

initSourceFromFile :: HasCallStack => CBytes -> Resource (Source Bytes) Source #

Turn a file into a Bytes source.

initSourceFromFile' :: HasCallStack => CBytes -> Int -> Resource (Source Bytes) Source #

Turn a file into a Bytes source with given chunk size.

sourceFromIO :: HasCallStack => IO (Maybe a) -> Source a Source #

Turn a IO action into Source

sourceFromList :: Foldable f => f a -> Source a Source #

Source a list(or any Foldable) from memory.

sourceFromBuffered :: HasCallStack => BufferedInput -> Source Bytes Source #

Turn a BufferedInput into BIO source, map EOF to EOF.

sourceTextFromBuffered :: HasCallStack => BufferedInput -> Source Text Source #

Turn a UTF8 encoded BufferedInput into BIO source, map EOF to EOF.

sourceJSONFromBuffered :: forall a. (JSON a, HasCallStack) => BufferedInput -> Source a Source #

Turn a JSON encoded BufferedInput into BIO source, ignoring any whitespaces bewteen JSON objects. If EOF reached, then return EOF. Throw OtherError with name EJSON if JSON value is not parsed or converted.

sourceParserFromBuffered :: HasCallStack => Parser a -> BufferedInput -> Source a Source #

Turn buffered input device into a packet source, throw OtherError with name EPARSE if parsing fail.

sourceParseChunkFromBuffered :: (HasCallStack, Print e) => (Bytes -> Result e a) -> BufferedInput -> Source a Source #

Turn buffered input device into a packet source, throw OtherError with name EPARSE if parsing fail.

Sink

sinkToIO :: HasCallStack => (a -> IO ()) -> Sink a Source #

Turn an IO action into BIO sink.

sinkToList :: IO (MVar [a], Sink a) Source #

Sink to a list in memory.

The MVar will be empty during sinking, and will be filled after sink receives an EOF.

initSinkToFile :: HasCallStack => CBytes -> Resource (Sink Bytes) Source #

Turn a file into a Bytes sink.

Note the file will be opened in O_APPEND .|. O_CREAT .|. O_WRONLY mode, bytes will be written after the end of the original file if there'are old bytes.

Bytes specific

newReChunk Source #

Arguments

:: Int

chunk granularity

-> IO (BIO Bytes Bytes) 

Make a chunk size divider.

A divider size divide each chunk's size to the nearest multiplier to granularity, last trailing chunk is directly returned.

newUTF8Decoder :: HasCallStack => IO (BIO Bytes Text) Source #

Make a new UTF8 decoder, which decode bytes streams into text streams.

If there're invalid UTF8 bytes, an OtherError with name EINVALIDUTF8 will be thrown.`

Note this node is supposed to be used with preprocess node such as decompressor, parser, etc. where bytes boundary cannot be controlled, UTF8 decoder will concat trailing bytes from last block to next one. Use this node directly with sourceFromBuffered will not be as efficient as directly use sourceTextFromBuffered, because BufferedInput provides push back capability, trailing bytes can be pushed back to reading buffer then returned with next block input together.

newParser :: HasCallStack => Parser a -> IO (BIO Bytes a) Source #

Read buffer and parse with Parser.

This function will turn a Parser into a BIO, throw OtherError with name EPARSE if parsing fail.

newMagicSplitter :: Word8 -> IO (BIO Bytes Bytes) Source #

Make a new stream splitter based on magic byte.

newLineSplitter :: IO (BIO Bytes Bytes) Source #

Make a new stream splitter based on linefeed(rn or n).

The result bytes doesn't contain linefeed.

newBase64Encoder :: IO (BIO Bytes Bytes) Source #

Make a new base64 encoder node.

newBase64Decoder :: HasCallStack => IO (BIO Bytes Bytes) Source #

Make a new base64 decoder node.

hexEncode Source #

Arguments

:: Bool

uppercase?

-> BIO Bytes Bytes 

Make a hex encoder node.

Hex encoder is stateless, it can be reused across chains.

newHexDecoder :: IO (BIO Bytes Bytes) Source #

Make a new hex decoder node.

Generic BIO

counter :: Counter -> BIO a a Source #

Make a new BIO node which counts items flow throught it.

Counter is increased atomically, it's safe to read / reset the counter from other threads.

seqNum :: Counter -> BIO a (Int, a) Source #

Make a new BIO node which counts items, and label item with a sequence number.

Counter is increased atomically, it's safe to read / reset the counter from other threads.

newGrouping :: Vec v a => Int -> IO (BIO a (v a)) Source #

Make a BIO node grouping items into fixed size arrays.

Trailing items are directly returned.

ungrouping :: BIO (Vector a) a Source #

A BIO node flatten items.

consumed :: TVar Bool -> BIO a a Source #

A BIO node which write True to IORef when EOF is reached.