conduit-0.4.0: Streaming data processing library.

Safe HaskellSafe-Infered

Data.Conduit

Contents

Description

The main module, exporting types, utility functions, and fuse and connect operators.

There are three main types in this package: Source (data producer), Sink (data consumer), and Conduit (data transformer). All three are in fact type synonyms for the underlying Pipe data type.

The typical approach to use of this package is:

  • Compose multiple Sinks together using its Monad instance.
  • Left-fuse Sources and Conduits into new Conduits.
  • Right-fuse Conduits and Sinks into new Sinks.
  • Middle-fuse two Conduits into a new Conduit.
  • Connect a Source to a Sink to obtain a result.

Synopsis

Types

data Pipe i o m r Source

The underlying datatype for all the types in this package. In has four type parameters:

  • i is the type of values for this Pipe's input stream.
  • o is the type of values for this Pipe's output stream.
  • m is the underlying monad.
  • r is the result type.

Note that o and r are inherently different. o is the type of the stream of values this Pipe will produce and send downstream. r is the final output of this Pipe.

Pipes can be composed via the pipe function. To do so, the output type of the left pipe much match the input type of the left pipe, and the result type of the left pipe must be unit (). This is due to the fact that any result produced by the left pipe must be discarded in favor of the result of the right pipe.

Since 0.4.0

Constructors

HaveOutput (Pipe i o m r) (m r) o

Provide new output to be sent downstream. This constructor has three records: the next Pipe to be used, an early-closed function, and the output value.

NeedInput (i -> Pipe i o m r) (Pipe i o m r)

Request more input from upstream. The first record takes a new input value and provides a new Pipe. The second is for early termination. It gives a new Pipe which takes no input from upstream. This allows a Pipe to provide a final stream of output values after no more input is available from upstream.

Done (Maybe i) r

Processing with this Pipe is complete. Provides an optional leftover input value and and result.

PipeM (m (Pipe i o m r)) (m r)

Require running of a monadic action to get the next Pipe. Second record is an early cleanup function. Technically, this second record could be skipped, but doing so would require extra operations to be performed in some cases. For example, for a Pipe pulling data from a file, it may be forced to pull an extra, unneeded chunk before closing the Handle.

Instances

MonadBase base m => MonadBase base (Pipe i o m) 
MonadTrans (Pipe i o) 
Monad m => Monad (Pipe i o m) 
Monad m => Functor (Pipe i o m) 
Monad m => Applicative (Pipe i o m) 
MonadIO m => MonadIO (Pipe i o m) 
Monad m => Monoid (Pipe i o m ()) 

type Source m a = Pipe Void a m ()Source

A Pipe which provides a stream of output values, without consuming any input. The input parameter is set to () instead of Void since there is no way to statically guarantee that the NeedInput constructor will not be used. A Source is not used to produce a final result, and thus the result parameter is set to () as well.

Since 0.4.0

type Conduit i m o = Pipe i o m ()Source

A Pipe which consumes a stream of input values and produces a stream of output values. It does not produce a result value, and thus the result parameter is set to ().

Since 0.4.0

type Sink i m r = Pipe i Void m rSource

A Pipe which consumes a stream of input values and produces a final result. It cannot produce any output values, and thus the output parameter is set to Void. In other words, it is impossible to create a HaveOutput constructor for a Sink.

Since 0.4.0

Connect/fuse operators

($$) :: Monad m => Source m a -> Sink a m b -> m bSource

The connect operator, which pulls data from a source and pushes to a sink. There are two ways this process can terminate:

  1. If the Sink is a Done constructor, the Source is closed.
  2. If the Source is a Done constructor, the Sink is closed.

In other words, both the Source and Sink will always be closed. If you would like to keep the Source open to be used for another operations, use the connect-and-resume operators $$+.

Since 0.4.0

($$+) :: Monad m => Source m a -> Sink a m b -> m (Source m a, b)Source

The connect-and-resume operator. Does not close the Source, but instead returns it to be used again. This allows a Source to be used incrementally in a large program, without forcing the entire program to live in the Sink monad.

Mnemonic: connect + do more.

Since 0.4.0

($=) :: Monad m => Source m a -> Conduit a m b -> Source m bSource

Left fuse, combining a source and a conduit together into a new source.

Both the Source and Conduit will be closed when the newly-created Source is closed.

Leftover data from the Conduit will be discarded.

Since 0.4.0

(=$) :: Monad m => Conduit a m b -> Sink b m c -> Sink a m cSource

Right fuse, combining a conduit and a sink together into a new sink.

Both the Conduit and Sink will be closed when the newly-created Sink is closed.

Leftover data returned from the Sink will be discarded.

Since 0.4.0

(=$=) :: Monad m => Pipe a b m () -> Pipe b c m r -> Pipe a c m rSource

Fusion operator, combining two Pipes together into a new Pipe.

Both Pipes will be closed when the newly-created Pipe is closed.

Leftover data returned from the right Pipe will be discarded.

Note: in previous versions, this operator would only fuse together two Conduits (known as middle fusion). This operator is generalized to work on all Pipes, including Sources and Sinks.

Since 0.4.0

Utility functions

General

await :: Pipe i o m (Maybe i)Source

Wait for a single input value from upstream, and remove it from the stream. Returns Nothing if no more data is available.

Since 0.4.0

yield :: Monad m => o -> Pipe i o m ()Source

Send a single output value downstream.

Since 0.4.0

hasInput :: Pipe i o m BoolSource

Check if input is available from upstream. Will not remove the data from the stream.

Since 0.4.0

transPipe :: Monad m => (forall a. m a -> n a) -> Pipe i o m r -> Pipe i o n rSource

Transform the monad that a Pipe lives in.

Since 0.4.0

Source

sourceStateSource

Arguments

:: Monad m 
=> state

Initial state

-> (state -> m (SourceStateResult state output))

Pull function

-> Source m output 

Construct a Source with some stateful functions. This function addresses threading the state value for you.

Since 0.3.0

sourceStateIOSource

Arguments

:: MonadResource m 
=> IO state

resource and/or state allocation

-> (state -> IO ())

resource and/or state cleanup

-> (state -> m (SourceStateResult state output))

Pull function. Note that this need not explicitly perform any cleanup.

-> Source m output 

A combination of sourceIO and sourceState.

Since 0.3.0

data SourceStateResult state output Source

The return value when pulling in the sourceState function. Either indicates no more data, or the next value and an updated state.

Since 0.3.0

Constructors

StateOpen state output 
StateClosed 

sourceIOSource

Arguments

:: MonadResource m 
=> IO state

resource and/or state allocation

-> (state -> IO ())

resource and/or state cleanup

-> (state -> m (SourceIOResult output))

Pull function. Note that this should not perform any cleanup.

-> Source m output 

Construct a Source based on some IO actions for alloc/release.

Since 0.3.0

data SourceIOResult output Source

The return value when pulling in the sourceIO function. Either indicates no more data, or the next value.

Since 0.3.0

Constructors

IOOpen output 
IOClosed 

Sink

sinkStateSource

Arguments

:: Monad m 
=> state

initial state

-> (state -> input -> m (SinkStateResult state input output))

push

-> (state -> m output)

Close. Note that the state is not returned, as it is not needed.

-> Sink input m output 

Construct a Sink with some stateful functions. This function addresses threading the state value for you.

Since 0.3.0

data SinkStateResult state input output Source

A helper type for sinkState, indicating the result of being pushed to. It can either indicate that processing is done, or to continue with the updated state.

Since 0.3.0

Constructors

StateDone (Maybe input) output 
StateProcessing state 

sinkIOSource

Arguments

:: MonadResource m 
=> IO state

resource and/or state allocation

-> (state -> IO ())

resource and/or state cleanup

-> (state -> input -> m (SinkIOResult input output))

push

-> (state -> m output)

close

-> Sink input m output 

Construct a Sink. Note that your push and close functions need not explicitly perform any cleanup.

Since 0.3.0

data SinkIOResult input output Source

A helper type for sinkIO, indicating the result of being pushed to. It can either indicate that processing is done, or to continue.

Since 0.3.0

Constructors

IODone (Maybe input) output 
IOProcessing 

Conduit

haveMoreSource

Arguments

:: Conduit a m b

The next Conduit to return after the list has been exhausted.

-> m ()

A close action for early termination.

-> [b]

The values to send down the stream.

-> Conduit a m b 

A helper function for returning a list of values from a Conduit.

Since 0.3.0

conduitStateSource

Arguments

:: Monad m 
=> state

initial state

-> (state -> input -> m (ConduitStateResult state input output))

Push function.

-> (state -> m [output])

Close function. The state need not be returned, since it will not be used again.

-> Conduit input m output 

Construct a Conduit with some stateful functions. This function addresses threading the state value for you.

Since 0.3.0

data ConduitStateResult state input output Source

A helper type for conduitState, indicating the result of being pushed to. It can either indicate that processing is done, or to continue with the updated state.

Since 0.3.0

Constructors

StateFinished (Maybe input) [output] 
StateProducing state [output] 

Instances

Functor (ConduitStateResult state input) 

conduitIOSource

Arguments

:: MonadResource m 
=> IO state

resource and/or state allocation

-> (state -> IO ())

resource and/or state cleanup

-> (state -> input -> m (ConduitIOResult input output))

Push function. Note that this need not explicitly perform any cleanup.

-> (state -> m [output])

Close function. Note that this need not explicitly perform any cleanup.

-> Conduit input m output 

Construct a Conduit.

Since 0.3.0

data ConduitIOResult input output Source

A helper type for conduitIO, indicating the result of being pushed to. It can either indicate that processing is done, or to continue.

Since 0.3.0

Constructors

IOFinished (Maybe input) [output] 
IOProducing [output] 

Instances

Sequencing

type SequencedSink state input m output = state -> Sink input m (SequencedSinkResponse state input m output)Source

Helper type for constructing a Conduit based on Sinks. This allows you to write higher-level code that takes advantage of existing conduits and sinks, and leverages a sink's monadic interface.

Since 0.3.0

sequenceSinkSource

Arguments

:: Monad m 
=> state

initial state

-> SequencedSink state input m output 
-> Conduit input m output 

Convert a SequencedSink into a Conduit.

Since 0.3.0

sequence :: Monad m => Sink input m output -> Conduit input m outputSource

Specialised version of sequenceSink

Note that this function will return an infinite stream if provided a SinkNoData constructor. In other words, you probably don't want to do sequence . return.

Since 0.3.0

data SequencedSinkResponse state input m output Source

Return value from a SequencedSink.

Since 0.3.0

Constructors

Emit state [output]

Set a new state, and emit some new output.

Stop

End the conduit.

StartConduit (Conduit input m output)

Pass control to a new conduit.

Flushing

data Flush a Source

Provide for a stream of data that can be flushed.

A number of Conduits (e.g., zlib compression) need the ability to flush the stream at some point. This provides a single wrapper datatype to be used in all such circumstances.

Since 0.3.0

Constructors

Chunk a 
Flush 

Instances

Functor Flush 
Eq a => Eq (Flush a) 
Ord a => Ord (Flush a) 
Show a => Show (Flush a) 

Convenience re-exports

data ResourceT m a

The Resource transformer. This transformer keeps track of all registered actions, and calls them upon exit (via runResourceT). Actions may be registered via register, or resources may be allocated atomically via allocate. allocate corresponds closely to bracket.

Releasing may be performed before exit via the release function. This is a highly recommended optimization, as it will ensure that scarce resources are freed early. Note that calling release will deregister the action, so that a release action will only ever be called once.

Since 0.3.0

class (MonadThrow m, MonadUnsafeIO m, MonadIO m, Applicative m) => MonadResource m

A Monad which allows for safe resource allocation. In theory, any monad transformer stack included a ResourceT can be an instance of MonadResource.

Note: runResourceT has a requirement for a MonadBaseControl IO m monad, which allows control operations to be lifted. A MonadResource does not have this requirement. This means that transformers such as ContT can be an instance of MonadResource. However, the ContT wrapper will need to be unwrapped before calling runResourceT.

Since 0.3.0

class Monad m => MonadThrow m where

A Monad which can throw exceptions. Note that this does not work in a vanilla ST or Identity monad. Instead, you should use the ExceptionT transformer in your stack if you are dealing with a non-IO base monad.

Since 0.3.0

Methods

monadThrow :: Exception e => e -> m a

class Monad m => MonadUnsafeIO m where

A Monad based on some monad which allows running of some IO actions, via unsafe calls. This applies to IO and ST, for instance.

Since 0.3.0

Methods

unsafeLiftIO :: IO a -> m a

runResourceT :: MonadBaseControl IO m => ResourceT m a -> m a

Unwrap a ResourceT transformer, and call all registered release actions.

Note that there is some reference counting involved due to resourceForkIO. If multiple threads are sharing the same collection of resources, only the last call to runResourceT will deallocate the resources.

Since 0.3.0