conduit-0.2.2: Streaming data processing library.

Safe HaskellSafe-Infered

Data.Conduit

Contents

Description

The main module, exporting types, utility functions, and fuse and connect operators.

Synopsis

Types

The three core types to this package are Source (the data producer), Sink (the data consumer), and Conduit (the data transformer). For all three types, a result will provide the next value to be used. For example, the Open constructor includes a new Source in it. This leads to the main invariant for all conduit code: these three types may never be reused. While some specific values may work fine with reuse, the result is generally unpredictable and should no be relied upon.

The user-facing API provided by the connect and fuse operators automatically addresses the low level details of pulling, pushing, and closing, and there should rarely be need to perform these actions in user code.

Source

data SourceResult m a Source

Result of pulling from a source. Either a new piece of data (Open), or indicates that the source is now Closed.

The Open constructor returns both a new value, as well as a new Source, which should be used in place of the previous Source.

Since 0.2.0

Constructors

Open (Source m a) a 
Closed 

Instances

data Source m a Source

A Source has two operations on it: pull some data, and close the Source. Since Source is built on top of ResourceT, all acquired resources should be automatically released anyway. Closing a Source early is merely an optimization to free scarce resources as soon as possible.

A Source is should free any resources it allocated when either sourceClose is called or a Closed is returned. However, based on the usage of ResourceT, this is simply an optimization.

Since 0.2.0

Constructors

Source 

Instances

Buffering

data BufferedSource m a Source

When actually interacting with Sources, we sometimes want to be able to buffer the output, in case any intermediate steps return leftover data. A BufferedSource allows for such buffering.

A BufferedSource, unlike a Source, is resumable, meaning it can be passed to multiple Sinks without restarting. Therefore, a BufferedSource relaxes the main invariant of this package: the same value may be used multiple times.

The intention of a BufferedSource is to be used internally by an application or library, not to be part of its user-facing API. For example, the Warp webserver uses a BufferedSource internally for parsing the request headers, but then passes a normal Source to the web application for reading the request body.

One caveat: while the types will allow you to use the buffered source in multiple threads, there is no guarantee that all BufferedSources will handle this correctly.

Since 0.2.0

bufferSource :: Resource m => Source m a -> ResourceT m (BufferedSource m a)Source

Places the given Source and a buffer into a mutable variable. Note that you should manually call bsourceClose when the BufferedSource is no longer in use.

Since 0.2.0

unbufferSource :: Resource m => BufferedSource m a -> Source m aSource

Turn a BufferedSource into a Source. Note that in general this will mean your original BufferedSource will be closed. Additionally, all leftover data from usage of the returned Source will be discarded. In other words: this is a no-going-back move.

Note: bufferSource . unbufferSource is not the identity function.

Since 0.2.0

bsourceClose :: Resource m => BufferedSource m a -> ResourceT m ()Source

Close the underlying Source for the given BufferedSource. Note that this function can safely be called multiple times, as it will first check if the Source was previously closed.

Since 0.2.0

Unifying

class IsSource src Source

A typeclass allowing us to unify operators for Source and BufferedSource.

Since 0.2.0

Sink

data SinkResult input m output Source

A Sink ultimately returns a single output value. Each time data is pushed to it, a Sink may indicate that it is still processing data, or that it is done, in which case it returns some optional leftover input and an output value.

The Processing constructors provides updated push and close functions to be used in place of the original Sink.

Since 0.2.0

Constructors

Processing (SinkPush input m output) (SinkClose m output) 
Done (Maybe input) output 

Instances

Monad m => Functor (SinkResult input m) 

data Sink input m output Source

In general, a sink will consume data and eventually produce an output when it has consumed "enough" data. There are two caveats to that statement:

  • Some sinks do not actually require any data to produce an output. This is included with a sink in order to allow for a Monad instance.
  • Some sinks will consume all available data and only produce a result at the "end" of a data stream (e.g., sum).

To allow for the first caveat, we have the SinkNoData constructor. For the second, the SinkData constructor has two records: one for receiving more input, and the other to indicate the end of a stream. Note that, at the end of a stream, some output is required. If a specific Sink implementation cannot always produce output, this should be indicated in its return value, using something like a Maybe or Either.

A Sink should clean up any resources it has allocated when it returns a value, whether that be via sinkPush or sinkClose.

Since 0.2.0

Constructors

SinkNoData output 
SinkData 

Fields

sinkPush :: SinkPush input m output
 
sinkClose :: SinkClose m output
 
SinkLift (ResourceT m (Sink input m output))

This constructor is provided to allow us to create an efficient MonadTrans instance.

Instances

(Resource m, ~ (* -> *) (Base m) base, Applicative base) => MonadBase base (Sink input m) 
MonadTrans (Sink input) 
Resource m => Monad (Sink input m) 
Monad m => Functor (Sink input m) 
Resource m => Applicative (Sink input m) 
(Resource m, MonadIO m) => MonadIO (Sink input m) 

type SinkPush input m output = input -> ResourceT m (SinkResult input m output)Source

The value of the sinkPush record.

type SinkClose m output = ResourceT m outputSource

The value of the sinkClose record.

Conduit

data ConduitResult input m output Source

When data is pushed to a Conduit, it may either indicate that it is still producing output and provide some, or indicate that it is finished producing output, in which case it returns optional leftover input and some final output.

The Producing constructor provides a new Conduit to be used in place of the previous one.

Since 0.2.0

Constructors

Producing (Conduit input m output) [output] 
Finished (Maybe input) [output] 

Instances

Monad m => Functor (ConduitResult input m) 

data Conduit input m output Source

A conduit has two operations: it can receive new input (a push), and can be closed.

Since 0.2.0

Constructors

Conduit 

Fields

conduitPush :: ConduitPush input m output
 
conduitClose :: ConduitClose m output
 

Instances

Monad m => Functor (Conduit input m) 

type ConduitPush input m output = input -> ResourceT m (ConduitResult input m output)Source

The value of the conduitPush record.

type ConduitClose m output = ResourceT m [output]Source

The value of the conduitClose record.

Connect/fuse operators

($$) :: (IsSource src, Resource m) => src m a -> Sink a m b -> ResourceT m bSource

The connect operator, which pulls data from a source and pushes to a sink. There are three ways this process can terminate:

  1. In the case of a SinkNoData constructor, the source is not opened at all, and the output value is returned immediately.
  2. The sink returns Done. If the input was a BufferedSource, any leftover input is put in the buffer. For a normal Source, the leftover value is discarded, and the source is closed.
  3. The source return Closed, in which case the sink is closed.

Note that this function will automatically close any Sources, but will not close any BufferedSources, allowing them to be reused.

Since 0.2.0

($=) :: (IsSource src, Resource m) => src m a -> Conduit a m b -> Source m bSource

Left fuse, combining a source and a conduit together into a new source.

Note that any Source passed in will be automatically closed, while a BufferedSource will be left open.

Since 0.2.0

(=$) :: Resource m => Conduit a m b -> Sink b m c -> Sink a m cSource

Right fuse, combining a conduit and a sink together into a new sink.

Since 0.2.0

(=$=) :: Resource m => Conduit a m b -> Conduit b m c -> Conduit a m cSource

Middle fuse, combining two conduits together into a new conduit.

Since 0.2.0

Utility functions

Source

sourceStateSource

Arguments

:: Resource m 
=> state

Initial state

-> (state -> ResourceT m (SourceStateResult state output))

Pull function

-> Source m output 

Construct a Source with some stateful functions. This function addresses threading the state value for you.

Since 0.2.0

sourceStateIOSource

Arguments

:: ResourceIO m 
=> IO state

resource and/or state allocation

-> (state -> IO ())

resource and/or state cleanup

-> (state -> m (SourceStateResult state output))

Pull function. Note that this need not explicitly perform any cleanup.

-> Source m output 

A combination of sourceIO and sourceState.

Since 0.2.1

data SourceStateResult state output Source

The return value when pulling in the sourceState function. Either indicates no more data, or the next value and an updated state.

Since 0.2.0

Constructors

StateOpen state output 
StateClosed 

sourceIOSource

Arguments

:: ResourceIO m 
=> IO state

resource and/or state allocation

-> (state -> IO ())

resource and/or state cleanup

-> (state -> m (SourceIOResult output))

Pull function. Note that this need not explicitly perform any cleanup.

-> Source m output 

Construct a Source based on some IO actions for alloc/release.

Since 0.2.0

data SourceIOResult output Source

The return value when pulling in the sourceIO function. Either indicates no more data, or the next value.

Constructors

IOOpen output 
IOClosed 

transSource :: (Base m ~ Base n, Monad m) => (forall a. m a -> n a) -> Source m output -> Source n outputSource

Transform the monad a Source lives in.

Note that this will not thread the individual monads together, meaning side effects will be lost. This function is most useful for transformers only providing context and not producing side-effects, such as ReaderT.

Since 0.2.0

Sink

sinkStateSource

Arguments

:: Resource m 
=> state

initial state

-> (state -> input -> ResourceT m (SinkStateResult state input output))

push

-> (state -> ResourceT m output)

Close. Note that the state is not returned, as it is not needed.

-> Sink input m output 

Construct a Sink with some stateful functions. This function addresses threading the state value for you.

Since 0.2.0

data SinkStateResult state input output Source

A helper type for sinkState, indicating the result of being pushed to. It can either indicate that processing is done, or to continue with the updated state.

Since 0.2.0

Constructors

StateDone (Maybe input) output 
StateProcessing state 

sinkIOSource

Arguments

:: ResourceIO m 
=> IO state

resource and/or state allocation

-> (state -> IO ())

resource and/or state cleanup

-> (state -> input -> m (SinkIOResult input output))

push

-> (state -> m output)

close

-> Sink input m output 

Construct a Sink. Note that your push and close functions need not explicitly perform any cleanup.

Since 0.2.0

data SinkIOResult input output Source

A helper type for sinkIO, indicating the result of being pushed to. It can either indicate that processing is done, or to continue.

Since 0.2.0

Constructors

IODone (Maybe input) output 
IOProcessing 

transSink :: (Base m ~ Base n, Monad m) => (forall a. m a -> n a) -> Sink input m output -> Sink input n outputSource

Transform the monad a Sink lives in.

See transSource for more information.

Since 0.2.0

Conduit

conduitStateSource

Arguments

:: Resource m 
=> state

initial state

-> (state -> input -> ResourceT m (ConduitStateResult state input output))

Push function.

-> (state -> ResourceT m [output])

Close function. The state need not be returned, since it will not be used again.

-> Conduit input m output 

Construct a Conduit with some stateful functions. This function addresses threading the state value for you.

Since 0.2.0

data ConduitStateResult state input output Source

A helper type for conduitState, indicating the result of being pushed to. It can either indicate that processing is done, or to continue with the updated state.

Since 0.2.0

Constructors

StateFinished (Maybe input) [output] 
StateProducing state [output] 

Instances

Functor (ConduitStateResult state input) 

conduitIOSource

Arguments

:: ResourceIO m 
=> IO state

resource and/or state allocation

-> (state -> IO ())

resource and/or state cleanup

-> (state -> input -> m (ConduitIOResult input output))

Push function. Note that this need not explicitly perform any cleanup.

-> (state -> m [output])

Close function. Note that this need not explicitly perform any cleanup.

-> Conduit input m output 

Construct a Conduit.

Since 0.2.0

data ConduitIOResult input output Source

A helper type for conduitIO, indicating the result of being pushed to. It can either indicate that processing is done, or to continue.

Since 0.2.0

Constructors

IOFinished (Maybe input) [output] 
IOProducing [output] 

Instances

transConduit :: (Monad m, Base m ~ Base n) => (forall a. m a -> n a) -> Conduit input m output -> Conduit input n outputSource

Transform the monad a Conduit lives in.

See transSource for more information.

Since 0.2.0

Sequencing

type SequencedSink state input m output = state -> Sink input m (SequencedSinkResponse state input m output)Source

Helper type for constructing a Conduit based on Sinks. This allows you to write higher-level code that takes advantage of existing conduits and sinks, and leverages a sink's monadic interface.

Since 0.2.0

sequenceSinkSource

Arguments

:: Resource m 
=> state

initial state

-> SequencedSink state input m output 
-> Conduit input m output 

Convert a SequencedSink into a Conduit.

Since 0.2.0

sequence :: Resource m => Sink input m output -> Conduit input m outputSource

Specialised version of sequenceSink

Note that this function will return an infinite stream if provided a SinkNoData constructor. In other words, you probably don't want to do sequence . return.

Since 0.2.1

data SequencedSinkResponse state input m output Source

Return value from a SequencedSink.

Since 0.2.0

Constructors

Emit state [output]

Set a new state, and emit some new output.

Stop

End the conduit.

StartConduit (Conduit input m output)

Pass control to a new conduit.

Flushing

data Flush a Source

Provide for a stream of data that can be flushed.

A number of Conduits (e.g., zlib compression) need the ability to flush the stream at some point. This provides a single wrapper datatype to be used in all such circumstances.

Since 0.2.0

Constructors

Chunk a 
Flush 

Instances

Functor Flush 
Eq a => Eq (Flush a) 
Ord a => Ord (Flush a) 
Show a => Show (Flush a) 

Convenience re-exports

data ResourceT m a Source

The Resource transformer. This transformer keeps track of all registered actions, and calls them upon exit (via runResourceT). Actions may be registered via register, or resources may be allocated atomically via with or withIO. The with functions correspond closely to bracket.

Releasing may be performed before exit via the release function. This is a highly recommended optimization, as it will ensure that scarce resources are freed early. Note that calling release will deregister the action, so that a release action will only ever be called once.

class (HasRef (Base m), Monad m) => Resource m whereSource

A Monad with a base that has mutable references, and allows some way to run base actions and clean up properly.

Associated Types

type Base m :: * -> *Source

The base monad for the current monad stack. This will usually be IO or ST.

Methods

resourceLiftBase :: Base m a -> m aSource

Run some action in the Base monad. This function corresponds to liftBase, but due to various type issues, we need to have our own version here.

resourceBracket_Source

Arguments

:: Base m ()

init

-> Base m ()

cleanup

-> m c

body

-> m c 

Guarantee that some initialization and cleanup code is called before and after some action. Note that the initialization and cleanup lives in the base monad, while the body is in the top monad.

Instances

class (ResourceBaseIO (Base m), ResourceUnsafeIO m, ResourceThrow m, MonadIO m, MonadBaseControl IO m) => ResourceIO m Source

A Resource which can safely run IO calls.

Instances

class Resource m => ResourceUnsafeIO m Source

A Resource based on some monad which allows running of some IO actions, via unsafe calls. This applies to IO and ST, for instance.

runResourceT :: Resource m => ResourceT m a -> m aSource

Unwrap a ResourceT transformer, and call all registered release actions.

Note that there is some reference counting involved due to resourceForkIO. If multiple threads are sharing the same collection of resources, only the last call to runResourceT will deallocate the resources.

class Resource m => ResourceThrow m whereSource

A Resource which can throw exceptions. Note that this does not work in a vanilla ST monad. Instead, you should use the ExceptionT transformer on top of ST.

Methods

resourceThrow :: Exception e => e -> m aSource