conduino-0.2.0.0: Lightweight composable continuation-based stream processors

Copyright(c) Justin Le 2019
LicenseBSD3
Maintainerjustin@jle.im
Stabilityexperimental
Portabilitynon-portable
Safe HaskellNone
LanguageHaskell2010

Data.Conduino.Combinators

Contents

Description

A basic collection of base Pipes that serve as a "prelude" for the package. This module is meant to be imported qualified.

import qualified Data.Conduino.Combinators as C
Synopsis

Sources

Pure

Infinite

unfold :: (s -> (o, s)) -> s -> Pipe i o u m a Source #

Repeatedly apply an "unfolding" function to a given initial state, yielding the first item in the tuple as output and updating the state as the second item in the tuple. Goes on forever. See unfoldMaybe for a version that stops.

iterate :: (o -> o) -> o -> Pipe i o u m a Source #

Repeatedly apply a function to a given starting value and yield each result forever.

>>> runPipePure $ iterate succ 0
      .| take 5
      .| sinkList
1,2,3,4,5

This doesn't yield the original starting value. However, you can yield it iterate after:

>>> runPipePure $ (yield 0 >> iterate succ 0)
      .| take 5
      .| sinkList
0,1,2,3,4,5

repeat :: o -> Pipe i o u m a Source #

Repeatedly yield a given item forever.

Finite

unfoldMaybe :: (s -> Maybe (o, s)) -> s -> Pipe i o u m () Source #

A version of unfold that can terminate and end by returning Nothing.

unfoldEither :: (s -> Either a (o, s)) -> s -> Pipe i o u m a Source #

A version of unfoldMaybe that can choose the "result" value by passing it in as Left.

iterateMaybe :: (o -> Maybe o) -> o -> Pipe i o u m () Source #

A version of iterate that can choose to terminate and stop by returning Nothing.

iterateEither :: (o -> Either a o) -> o -> Pipe i o u m a Source #

A version of iterateMaybe that can specify a result value by providing it in the Left.

sourceList :: Foldable t => t a -> Pipe i a u m () Source #

Yield every item in a foldable container.

replicate :: Int -> o -> Pipe i o u m () Source #

Yield a given item a certain number of times.

Monadic

Infinite

repeatM :: Monad m => m o -> Pipe i o u m a Source #

Repeat a monadic action forever, yielding each output.

Remember that each item will only be "executed" when something downstream requests output.

Finite

repeatMaybeM :: Monad m => m (Maybe o) -> Pipe i o u m () Source #

Repeat a monadic action, yielding the item in the Just every time. As soon as it sees Nothing, stop producing forever.

Remember that each item will only be "executed" when something downstream requests output.

repeatEitherM :: Monad m => m (Either a o) -> Pipe i o u m a Source #

Like repeatMaybeM, but allow specification of a final result type.

replicateM :: Monad m => Int -> m o -> Pipe i o u m () Source #

Repeat a monadic action a given number of times, yielding each result, and then stop producing forever.

Remember that each item will only be "executed" when something downstream requests output.

sourceHandleLines :: MonadIO m => Handle -> Pipe i String u m () Source #

Source from a given I/O handle, yielding each line drawn as a string. To draw raw bytes, use sourceHandle.

This stop as soon as end-of-file is reached, or an empty line is seen.

stdinLines :: MonadIO m => Pipe i String u m () Source #

Source from each line received from stdin. This stops as soon as end-of-file is reached, or an empty line is seen.

sourceHandle :: MonadIO m => Handle -> Pipe i ByteString u m () Source #

Source from a given I/O handle, yielding bytestrings as they are pulled. If you want to retrieve each line as a string, see sourceHandleLines.

stdin :: MonadIO m => Pipe i ByteString u m () Source #

Source from stdin, yielding bytestrings as they are drawn. If you want to retrieve each line as a string, see stdinLines.

Pipes

map :: (i -> o) -> Pipe i o u m u Source #

Process every incoming item with a pure function, and yield its output.

mapM :: Monad m => (i -> m o) -> Pipe i o u m u Source #

Map a monadic function to process every input, and yield its output.

scan :: (o -> i -> o) -> o -> Pipe i o u m u Source #

Like foldl, but yields every accumulator value downstream.

>>> runPipePure $ sourceList [1..10]
      .| scan (+) 0
      .| sinkList
[1,3,6,10,15,21,28,36,45,55]
@

mapAccum Source #

Arguments

:: (i -> s -> (s, o))

update state and output

-> s

initial state

-> Pipe i o u m u 

Map a pure "stateful" function over each incoming item. Give a function to update the state and return an output and an initial state.

take :: Int -> Pipe i i u m () Source #

Let a given number of items pass through the stream uninhibited, and then stop producing forever.

This is most useful if you sequence a second conduit after it.

>>> runPipePure $ sourceList [1..8]
      .| (do take 3 .| map (*2)         -- double the first 3 items
             map negate                 -- negate the rest
         )
      .| sinkList
[2,4,6,-4,-5,-6,-7,-8]

takeWhile :: (i -> Bool) -> Pipe i i u m () Source #

Let elements pass until an element is received that does not satisfy the predicate, then stop producing forever.

Like take, is most useful if you sequence a second conduit after it.

filter :: (i -> Bool) -> Pipe i i u m u Source #

Only allow values satisfying a predicate to pass.

concatMap :: Foldable t => (i -> t o) -> Pipe i o u m u Source #

Map a function returning a container onto every incoming item, and yield all of the outputs from that function.

concat :: Foldable t => Pipe (t i) i u m u Source #

Take an input of containers and output each of their elements successively.

pairs :: Pipe i (i, i) u m u Source #

Yield consecutive pairs of values.

>>> runPipePure $ sourceList [1..5]
      .| pairs
      .| sinkList
[(1,2),(2,3),(3,4),(4,5)]

consecutive :: Int -> Pipe i (Seq i) u m u Source #

Yield consecutive runs of at most n of values, starting with an empty sequence.

To get only "full" sequences, pipe with filter.

>>> runPipePure $ sourceList [1..6]
      .| consecutive 3
      .| map toList
      .| sinkList
[[],[1],[1,2],[1,2,3],[2,3,4],[3,4,5],[4,5,6]]
>>> runPipePure $ sourceList [1..6]
      .| consecutive 3
      .| filter ((== 3) . Seq.length)
      .| map toList
      .| sinkList
[[1,2,3],[2,3,4],[3,4,5],[4,5,6]]

Sinks

Pure

drop :: Int -> Pipe i o u m () Source #

Ignore a certain amount of items from the input stream, and then stop producing forever.

This is most useful if you sequence a second consumer after it:

>>> runPipePure $ sourceList [1..8]
      .| (drop 3 >> 'sinkList')
[4,5,6,7,8]

dropWhile :: (i -> Bool) -> Pipe i o u m () Source #

Ignore items from an input stream as long as they match a predicate. Afterwards, stop producing forever.

Like for drop, is most useful of you sequence a second consumer after it.

foldr :: (a -> b -> b) -> b -> Pipe a o u m b Source #

Right-fold every input into an accumulated value.

Essentially this builds up a giant continuation that will be run all at once on the final result.

foldl :: (b -> a -> b) -> b -> Pipe a o u m b Source #

Left-fold every input into an accumulated value.

Essentially this maintains a state and modifies that state with every input, using the given accumulating function.

foldMap :: Monoid a => (i -> a) -> Pipe i o u m a Source #

Fold every incoming item according to a monoidal projection, and return the result once finished.

This can be used to implement many useful consumers, like ones that find the sum or the maximum item:

sum :: Num i => Pipe i o u m i
sum = getSum $ foldMap Sum

maximum :: Ord i => Pipe i o u m (Maybe i)
maximum = fmap getMax $ foldMap (Just . Max)

fold :: Monoid a => Pipe a o u m a Source #

Fold every incoming item monoidally, and return the result once finished.

sinkNull :: Pipe i o u m () Source #

Consume an entire input stream and ignore all of its outputs.

sinkList :: Pipe i o u m [i] Source #

Sink every incoming item into a list.

Note that this keeps the entire list in memory until it is all eventually read.

last :: Pipe i o u m (Maybe i) Source #

Get the last item emitted by a stream.

To get the first item ("head"), use await or awaitSurely.

Monadic

sinkHandle :: MonadIO m => Handle -> Pipe ByteString o u m () Source #

Sink into a given I/O handle, writing each input to the handle.

stdout :: MonadIO m => Pipe ByteString o u m () Source #

A sink into stdout.

stderr :: MonadIO m => Pipe ByteString o u m () Source #

A sink into stderr.