Z-IO-0.7.0.0: Simple and high performance IO toolkit for Haskell
Copyright(c) Dong Han 2017-2020
LicenseBSD
Maintainerwinterland1989@gmail.com
Stabilityexperimental
Portabilitynon-portable
Safe HaskellNone
LanguageHaskell2010

Z.IO.BIO

Description

This module provides BIO (block IO) type to facilitate writing streaming programs. A BIO node usually:

  • Process input in unit of block(or item).
  • Running in constant spaces, which means the memory usage won't accumulate.
  • Keep some state in IO, which is sealed in BIO closure.

Some examples of such nodes are:

  • Compressor / decompressor, e.g. zlib, etc.
  • Codec, e.g. utf8 codec, base64 codec.
  • Ciphers.
  • Packet parsers.

We use BIO inp out type to represent all the objects above, BIO Void out to represent an IO source, and BIO inp Void to represent an IO sink, which can all be connected with >|> to build a larger BIO node.

import Z.Data.CBytes    (CBytes)
import Z.IO
import Z.IO.BIO
import Z.IO.BIO.Zlib

base64AndCompressFile :: HasCallStack => CBytes -> CBytes -> IO ()
base64AndCompressFile origin target = do
    base64Enc <- newBase64Encoder
    (_, zlibCompressor) <- newCompress defaultCompressConfig{compressWindowBits = 31}

    withResource (initSourceFromFile origin) $  src ->
        withResource (initSinkToFile target) $  sink ->
            runBIO $ src >|> base64Enc >|> zlibCompressor >|> sink

> base64AndCompressFile "test" "test.gz"
-- run 'zcat "test.gz" | base64 -d' will give you original file
Synopsis

The BIO type

data BIO inp out Source #

A BIO(blocked IO) node.

A BIO node consist of two functions: push and pull. It can be used to describe different kinds of IO devices:

  • BIO inp out describe an IO state machine(e.g. z_stream in zlib), which takes some input in block, then outputs.
  • type Source out = BIO Void out described an IO source, which never takes input, but gives output until EOF when pulled.
  • type Sink inp = BIO inp Void described an IO sink, which takes input and perform some IO effects, such as writing to terminal or files.

You can connect these BIO nodes with >|>, which connect left node's output to right node's input, and return a new BIO node with left node's input type and right node's output type.

You can run a BIO node in different ways:

  • runBIO will continuously pull value from source, push to sink until source reaches EOF.
  • runSource will continuously pull value from source, and perform effects along the way.
  • runBlock will supply a single block of input as whole input, and return output if there's any.
  • runBlocks will supply a list of blocks as whole input, and return a list of output blocks.

Note BIO usually contains some IO states, you can consider it as an opaque IORef:

  • You shouldn't use a BIO node across multiple BIO chain unless the state can be reset.
  • You shouldn't use a BIO node across multiple threads unless document states otherwise.

BIO is simply a convenient way to construct single-thread streaming computation, to use BIO in multiple threads, check Z.IO.BIO.Concurrent module.

Constructors

BIO 

Fields

  • push :: inp -> IO (Maybe out)

    Push a block of input, perform some effect, and return output, if input is not enough to produce any output yet, return Nothing.

  • pull :: IO (Maybe out)

    When input reaches EOF, there may be a finalize stage to output trailing output blocks. return Nothing to indicate current node reaches EOF too.

Instances

Instances details
Functor (BIO inp) Source # 
Instance details

Defined in Z.IO.BIO

Methods

fmap :: (a -> b) -> BIO inp a -> BIO inp b #

(<$) :: a -> BIO inp b -> BIO inp a #

type Source out = BIO Void out Source #

Type alias for BIO node which never takes input.

push is not available by type system, and pull return Nothing when reaches EOF.

type Sink inp = BIO inp Void Source #

Type alias for BIO node which only takes input and perform effects.

push doesn't produce any meaningful output, and pull usually does a flush.

Basic combinators

(>|>) :: HasCallStack => BIO a b -> BIO b c -> BIO a c infixl 3 Source #

Connect two BIO nodes, feed left one's output to right one's input.

(>~>) :: BIO a b -> (b -> c) -> BIO a c infixl 3 Source #

Flipped fmap for easier chaining.

(>!>) :: HasCallStack => BIO a b -> (b -> IO c) -> BIO a c Source #

Connect BIO to an effectful function.

appendSource :: HasCallStack => Source a -> Source a -> IO (Source a) Source #

Connect two BIO source, after first reach EOF, draw element from second.

concatSource :: HasCallStack => [Source a] -> IO (Source a) Source #

Connect list of BIO sources, after one reach EOF, draw element from next.

zipSource :: HasCallStack => Source a -> Source b -> IO (Source (a, b)) Source #

Zip two BIO source into one, reach EOF when either one reached EOF.

zipBIO :: HasCallStack => BIO a b -> BIO a c -> IO (BIO a (b, c)) Source #

Zip two BIO node into one, reach EOF when either one reached EOF.

The output item number should match, unmatched output will be discarded.

joinSink :: HasCallStack => Sink out -> Sink out -> Sink out Source #

Fuse two BIO sinks, i.e. everything written to the fused sink will be written to left and right sink.

Flush result BIO will effectively flush both sink.

fuseSink :: HasCallStack => [Sink out] -> Sink out Source #

Fuse a list of BIO sinks, everything written to the fused sink will be written to every sink in the list.

Flush result BIO will effectively flush every sink in the list.

Run BIO chain

runBIO :: HasCallStack => BIO Void Void -> IO () Source #

Run a BIO loop (source >|> ... >|> sink).

runSource :: HasCallStack => Source x -> IO [x] Source #

Drain a BIO source into a List in memory.

runSource_ :: HasCallStack => Source x -> IO () Source #

Drain a source without collecting result.

runBlock :: HasCallStack => BIO inp out -> inp -> IO [out] Source #

Supply a single block of input, then run BIO node until EOF.

Note many BIO node will be closed or not be able to take new input after drained.

runBlock_ :: HasCallStack => BIO inp out -> inp -> IO () Source #

Supply a single block of input, then run BIO node until EOF with collecting result.

Note many BIO node will be closed or not be able to take new input after drained.

unsafeRunBlock :: HasCallStack => IO (BIO inp out) -> inp -> [out] Source #

Wrap runBlock into a pure interface.

You can wrap a stateful BIO computation(including the creation of BIO node), when you can guarantee a computation is pure, e.g. compressing, decoding, etc.

runBlocks :: HasCallStack => BIO inp out -> [inp] -> IO [out] Source #

Supply blocks of input, then run BIO node until EOF.

Note many BIO node will be closed or not be able to take new input after drained.

runBlocks_ :: HasCallStack => BIO inp out -> [inp] -> IO () Source #

Supply blocks of input, then run BIO node until EOF with collecting result.

Note many BIO node will be closed or not be able to take new input after drained.

unsafeRunBlocks :: HasCallStack => IO (BIO inp out) -> [inp] -> [out] Source #

Wrap runBlocks into a pure interface.

Similar to unsafeRunBlock, but with a list of input blocks.

Make new BIO

pureBIO :: (a -> b) -> BIO a b Source #

BIO node from a pure function.

BIO node made with this funtion are stateless, thus can be reused across chains.

ioBIO :: (HasCallStack => a -> IO b) -> BIO a b Source #

BIO node from an IO function.

BIO node made with this funtion may not be stateless, it depends on if the IO function use IO state.

Source

sourceFromIO :: HasCallStack => IO (Maybe a) -> Source a Source #

Turn a IO action into Source

sourceFromList :: [a] -> IO (Source a) Source #

Source a list from memory.

initSourceFromFile :: HasCallStack => CBytes -> Resource (Source Bytes) Source #

Turn a file into a Bytes source.

sourceFromBuffered :: HasCallStack => BufferedInput -> Source Bytes Source #

Turn a BufferedInput into BIO source, map EOF to Nothing.

sourceTextFromBuffered :: HasCallStack => BufferedInput -> Source Text Source #

Turn a UTF8 encoded BufferedInput into BIO source, map EOF to Nothing.

sourceJSONFromBuffered :: forall a. (JSON a, HasCallStack) => BufferedInput -> Source a Source #

Turn a JSON encoded BufferedInput into BIO source, ignoring any whitespaces bewteen JSON objects. If EOF reached, then return Nothing. Throw OtherError with name EJSON if JSON value is not parsed or converted.

sourceParserFromBuffered :: HasCallStack => Parser a -> BufferedInput -> Source a Source #

Turn buffered input device into a packet source, throw OtherError with name EPARSE if parsing fail.

sourceParseChunksFromBuffered :: (HasCallStack, Print e) => ParseChunks IO Bytes e a -> BufferedInput -> Source a Source #

Turn buffered input device into a packet source, throw OtherError with name EPARSE if parsing fail.

Sink

sinkToIO :: HasCallStack => (a -> IO ()) -> Sink a Source #

Turn an IO action into BIO sink.

push will call IO action with input chunk, pull has no effect.

sinkToList :: IO (IORef [a], Sink a) Source #

Sink to a list in memory.

The list's IORef is not thread safe here, and list items are in reversed order during sinking(will be reversed when flushed, i.e. pulled), Please don't use it in multiple thread.

initSinkToFile :: HasCallStack => CBytes -> Resource (Sink Bytes) Source #

Turn a file into a Bytes sink.

Note the file will be opened in O_APPEND .|. O_CREAT .|. O_WRONLY mode, bytes will be written after the end of the original file if there'are old bytes.

Bytes specific

newParserNode :: HasCallStack => Parser a -> IO (BIO Bytes a) Source #

Read buffer and parse with Parser.

This function will continuously draw data from input before parsing finish. Unconsumed bytes will be returned to buffer.

Return Nothing if reach EOF before parsing, throw OtherError with name EPARSE if parsing fail.

newReChunk Source #

Arguments

:: Int

chunk granularity

-> IO (BIO Bytes Bytes) 

Make a chunk size divider.

A divider size divide each chunk's size to the nearest multiplier to granularity, last trailing chunk is directly returned.

newUTF8Decoder :: HasCallStack => IO (BIO Bytes Text) Source #

Make a new UTF8 decoder, which decode bytes streams into text streams.

If there're invalid UTF8 bytes, an OtherError with name EINVALIDUTF8 will be thrown.`

Note this node is supposed to be used with preprocess node such as decompressor, parser, etc. where bytes boundary cannot be controlled, UTF8 decoder will concat trailing bytes from last block to next one. Use this node directly with sourceFromBuffered will not be as efficient as directly use sourceTextFromBuffered, because BufferedInput provides push back capability, trailing bytes can be pushed back to reading buffer then returned with next block input together.

newMagicSplitter :: Word8 -> IO (BIO Bytes Bytes) Source #

Make a new stream splitter based on magic byte.

newLineSplitter :: IO (BIO Bytes Bytes) Source #

Make a new stream splitter based on linefeed(rn or n).

The result bytes doesn't contain linefeed.

newBase64Encoder :: IO (BIO Bytes Bytes) Source #

Make a new base64 encoder node.

newBase64Decoder :: HasCallStack => IO (BIO Bytes Bytes) Source #

Make a new base64 decoder node.

hexEncoder Source #

Arguments

:: Bool

uppercase?

-> BIO Bytes Bytes 

Make a hex encoder node.

Hex encoder is stateless, it can be reused across chains.

newHexDecoder :: IO (BIO Bytes Bytes) Source #

Make a new hex decoder node.

Generic BIO

newCounterNode :: IO (Counter, BIO a a) Source #

Make a new BIO node which counts items flow throught it.

Returned Counter is increased atomically, it's safe to read / reset the counter from other threads.

newSeqNumNode :: IO (Counter, BIO a (Int, a)) Source #

Make a new BIO node which counts items, and label item with a sequence number.

Returned Counter is increased atomically, it's safe to read / reset the counter from other threads.

newGroupingNode :: Int -> IO (BIO a (SmallArray a)) Source #

Make a BIO node grouping items into fixed size arrays.