conduit-0.3.0: Streaming data processing library.

Safe HaskellSafe-Infered

Data.Conduit

Contents

Description

The main module, exporting types, utility functions, and fuse and connect operators.

Synopsis

Types

The three core types to this package are Source (the data producer), Sink (the data consumer), and Conduit (the data transformer). For all three types, a result will provide the next value to be used. For example, the Open constructor includes a new Source in it. This leads to the main invariant for all conduit code: these three types may never be reused. While some specific values may work fine with reuse, the result is generally unpredictable and should no be relied upon.

The user-facing API provided by the connect and fuse operators automatically addresses the low level details of pulling, pushing, and closing, and there should rarely be need to perform these actions in user code.

Source

data Source m a Source

A Source has two operations on it: pull some data, and close the Source. A Source should free any resources it allocated when either it returns Closed or when it is explicitly closed (the second record on either the Open or SourceM constructors).

Since 0.3.0

Constructors

Open (Source m a) (m ()) a

A Source providing more data. Provides records for the next Source in the stream, a close action, and the data provided.

Closed

A Source which has no more data available.

SourceM (m (Source m a)) (m ())

Requires a monadic action to retrieve the next Source in the stream. Second record allows you to close the Source.

Instances

Monad m => IsSource Source m 
Monad m => Functor (Source m) 
Monad m => Monoid (Source m a) 

Buffering

data BufferedSource m a Source

When actually interacting with Sources, we sometimes want to be able to buffer the output, in case any intermediate steps return leftover data. A BufferedSource allows for such buffering.

A BufferedSource, unlike a Source, is resumable, meaning it can be passed to multiple Sinks without restarting. Therefore, a BufferedSource relaxes the main invariant of this package: the same value may be used multiple times.

The intention of a BufferedSource is to be used internally by an application or library, not to be part of its user-facing API. For example, the Warp webserver uses a BufferedSource internally for parsing the request headers, but then passes a normal Source to the web application for reading the request body.

One caveat: while the types will allow you to use the buffered source in multiple threads, there is no guarantee that all BufferedSources will handle this correctly.

Since 0.3.0

Instances

bufferSource :: MonadIO m => Source m a -> m (BufferedSource m a)Source

Places the given Source and a buffer into a mutable variable. Note that you should manually call bsourceClose when the BufferedSource is no longer in use.

Since 0.3.0

unbufferSource :: MonadIO m => BufferedSource m a -> Source m aSource

Turn a BufferedSource into a Source. Note that in general this will mean your original BufferedSource will be closed. Additionally, all leftover data from usage of the returned Source will be discarded. In other words: this is a no-going-back move.

Note: bufferSource . unbufferSource is not the identity function.

Since 0.3.0

bsourceClose :: MonadIO m => BufferedSource m a -> m ()Source

Close the underlying Source for the given BufferedSource. Note that this function can safely be called multiple times, as it will first check if the Source was previously closed.

Since 0.3.0

Unifying

class IsSource src m Source

A typeclass allowing us to unify operators for Source and BufferedSource.

Since 0.3.0

Sink

data Sink input m output Source

In general, a sink will consume data and eventually produce an output when it has consumed "enough" data. There are two caveats to that statement:

  • Some sinks do not actually require any data to produce an output. This is included with a sink in order to allow for a Monad instance.
  • Some sinks will consume all available data and only produce a result at the "end" of a data stream (e.g., sum).

Note that you can indicate any leftover data from processing via the Maybe input field of the Done constructor. However, it is a violation of the Sink invariants to return leftover data when no input has been consumed. Concrete, that means that a function like yield is invalid:

 yield :: input -> Sink input m ()
 yield input = Done (Just input) ()

A Sink should clean up any resources it has allocated when it returns a value.

Since 0.3.0

Constructors

Processing (SinkPush input m output) (SinkClose m output)

Awaiting more input.

Done (Maybe input) output

Processing complete.

SinkM (m (Sink input m output))

Perform some monadic action to continue.

Instances

MonadBase base m => MonadBase base (Sink input m) 
MonadTrans (Sink input) 
Monad m => Monad (Sink input m) 
Monad m => Functor (Sink input m) 
Monad m => Applicative (Sink input m) 
MonadIO m => MonadIO (Sink input m) 

type SinkPush input m output = input -> Sink input m outputSource

Push a value into a Sink and get a new Sink as a result.

Since 0.3.0

type SinkClose m output = m outputSource

Closing a Sink returns the final output.

Since 0.3.0

Conduit

data Conduit input m output Source

A Conduit allows data to be pushed to it, and for each new input, can produce a stream of output values (possibly an empty stream). It can be considered a hybrid of a Sink and a Source.

A Conduit has four constructors, corresponding to four distinct states of operation.

Since 0.3.0

Constructors

NeedInput (ConduitPush input m output) (ConduitClose m output)

Indicates that the Conduit needs more input in order to produce output. It also provides an action to close the Conduit early, for cases when there is no more input available, or when no more output is requested. Closing at this point returns a Source to allow for either consuming or ignoring the new stream.

HaveOutput (Conduit input m output) (m ()) output

Indicates that the Conduit has more output available. It has three records: the next Conduit to continue the stream, a close action for early termination, and the output currently available. Note that, unlike NeedInput, the close action here returns () instead of Source. The reasoning is that HaveOutput will only be closed early if no more output is requested, since no input is required.

Finished (Maybe input)

Indicates that no more output is available, and no more input may be sent. It provides an optional leftover input record. Note: It is a violation of Conduit's invariants to return leftover output that was never consumed, similar to the invariants of a Sink.

ConduitM (m (Conduit input m output)) (m ())

Indicates that a monadic action must be taken to determine the next Conduit. It also provides an early close action. Like HaveOutput, this action returns (), since it should only be used when no more output is requested.

Instances

Monad m => Functor (Conduit input m) 

type ConduitPush input m output = input -> Conduit input m outputSource

Pushing new data to a Conduit produces a new Conduit.

Since 0.3.0

type ConduitClose m output = Source m outputSource

When closing a Conduit, it can produce a final stream of values.

Since 0.3.0

Connect/fuse operators

($$) :: IsSource src m => src m a -> Sink a m b -> m bSource

The connect operator, which pulls data from a source and pushes to a sink. There are two ways this process can terminate:

  1. If the Sink is a Done constructor, the Source is closed.
  2. If the Source is a Closed constructor, the Sink is closed.

This function will automatically close any Sources, but will not close any BufferedSources, allowing them to be reused. Also, leftover data will be discarded when connecting a Source, but will be buffered when using a BufferedSource.

Since 0.3.0

($=) :: IsSource src m => src m a -> Conduit a m b -> Source m bSource

Left fuse, combining a source and a conduit together into a new source.

Any Source passed in will be automatically closed, while a BufferedSource will be left open. Leftover input will be discarded for a Source, and buffered for a BufferedSource.

Since 0.3.0

(=$) :: Monad m => Conduit a m b -> Sink b m c -> Sink a m cSource

Right fuse, combining a conduit and a sink together into a new sink.

Any leftover data returns from the Sink will be discarded.

Since 0.3.0

(=$=) :: Monad m => Conduit a m b -> Conduit b m c -> Conduit a m cSource

Middle fuse, combining two conduits together into a new conduit.

Any leftovers provided by the inner Conduit will be discarded.

Since 0.3.0

Utility functions

Source

sourceStateSource

Arguments

:: Monad m 
=> state

Initial state

-> (state -> m (SourceStateResult state output))

Pull function

-> Source m output 

Construct a Source with some stateful functions. This function addresses threading the state value for you.

Since 0.3.0

sourceStateIOSource

Arguments

:: MonadResource m 
=> IO state

resource and/or state allocation

-> (state -> IO ())

resource and/or state cleanup

-> (state -> m (SourceStateResult state output))

Pull function. Note that this need not explicitly perform any cleanup.

-> Source m output 

A combination of sourceIO and sourceState.

Since 0.3.0

data SourceStateResult state output Source

The return value when pulling in the sourceState function. Either indicates no more data, or the next value and an updated state.

Since 0.3.0

Constructors

StateOpen state output 
StateClosed 

sourceIOSource

Arguments

:: MonadResource m 
=> IO state

resource and/or state allocation

-> (state -> IO ())

resource and/or state cleanup

-> (state -> m (SourceIOResult output))

Pull function. Note that this should not perform any cleanup.

-> Source m output 

Construct a Source based on some IO actions for alloc/release.

Since 0.3.0

data SourceIOResult output Source

The return value when pulling in the sourceIO function. Either indicates no more data, or the next value.

Since 0.3.0

Constructors

IOOpen output 
IOClosed 

transSource :: Monad m => (forall a. m a -> n a) -> Source m output -> Source n outputSource

Transform the monad a Source lives in.

Note that this will not thread the individual monads together, meaning side effects will be lost. This function is most useful for transformers only providing context and not producing side-effects, such as ReaderT.

Since 0.3.0

sourceClose :: Monad m => Source m a -> m ()Source

Close a Source, regardless of its current state.

Since 0.3.0

Sink

sinkStateSource

Arguments

:: Monad m 
=> state

initial state

-> (state -> input -> m (SinkStateResult state input output))

push

-> (state -> m output)

Close. Note that the state is not returned, as it is not needed.

-> Sink input m output 

Construct a Sink with some stateful functions. This function addresses threading the state value for you.

Since 0.3.0

data SinkStateResult state input output Source

A helper type for sinkState, indicating the result of being pushed to. It can either indicate that processing is done, or to continue with the updated state.

Since 0.3.0

Constructors

StateDone (Maybe input) output 
StateProcessing state 

sinkIOSource

Arguments

:: MonadResource m 
=> IO state

resource and/or state allocation

-> (state -> IO ())

resource and/or state cleanup

-> (state -> input -> m (SinkIOResult input output))

push

-> (state -> m output)

close

-> Sink input m output 

Construct a Sink. Note that your push and close functions need not explicitly perform any cleanup.

Since 0.3.0

data SinkIOResult input output Source

A helper type for sinkIO, indicating the result of being pushed to. It can either indicate that processing is done, or to continue.

Since 0.3.0

Constructors

IODone (Maybe input) output 
IOProcessing 

transSink :: Monad m => (forall a. m a -> n a) -> Sink input m output -> Sink input n outputSource

Transform the monad a Sink lives in.

See transSource for more information.

Since 0.3.0

sinkClose :: Monad m => Sink input m output -> m ()Source

Close a Sink if it is still open, discarding any output it produces.

Since 0.3.0

Conduit

haveMoreSource

Arguments

:: Conduit a m b

The next Conduit to return after the list has been exhausted.

-> m ()

A close action for early termination.

-> [b]

The values to send down the stream.

-> Conduit a m b 

A helper function for returning a list of values from a Conduit.

Since 0.3.0

conduitStateSource

Arguments

:: Monad m 
=> state

initial state

-> (state -> input -> m (ConduitStateResult state input output))

Push function.

-> (state -> m [output])

Close function. The state need not be returned, since it will not be used again.

-> Conduit input m output 

Construct a Conduit with some stateful functions. This function addresses threading the state value for you.

Since 0.3.0

data ConduitStateResult state input output Source

A helper type for conduitState, indicating the result of being pushed to. It can either indicate that processing is done, or to continue with the updated state.

Since 0.3.0

Constructors

StateFinished (Maybe input) [output] 
StateProducing state [output] 

Instances

Functor (ConduitStateResult state input) 

conduitIOSource

Arguments

:: MonadResource m 
=> IO state

resource and/or state allocation

-> (state -> IO ())

resource and/or state cleanup

-> (state -> input -> m (ConduitIOResult input output))

Push function. Note that this need not explicitly perform any cleanup.

-> (state -> m [output])

Close function. Note that this need not explicitly perform any cleanup.

-> Conduit input m output 

Construct a Conduit.

Since 0.3.0

data ConduitIOResult input output Source

A helper type for conduitIO, indicating the result of being pushed to. It can either indicate that processing is done, or to continue.

Since 0.3.0

Constructors

IOFinished (Maybe input) [output] 
IOProducing [output] 

Instances

transConduit :: Monad m => (forall a. m a -> n a) -> Conduit input m output -> Conduit input n outputSource

Transform the monad a Conduit lives in.

See transSource for more information.

Since 0.3.0

conduitClose :: Monad m => Conduit input m output -> m ()Source

Close a Conduit early, discarding any output.

Since 0.3.0

Sequencing

type SequencedSink state input m output = state -> Sink input m (SequencedSinkResponse state input m output)Source

Helper type for constructing a Conduit based on Sinks. This allows you to write higher-level code that takes advantage of existing conduits and sinks, and leverages a sink's monadic interface.

Since 0.3.0

sequenceSinkSource

Arguments

:: Monad m 
=> state

initial state

-> SequencedSink state input m output 
-> Conduit input m output 

Convert a SequencedSink into a Conduit.

Since 0.3.0

sequence :: Monad m => Sink input m output -> Conduit input m outputSource

Specialised version of sequenceSink

Note that this function will return an infinite stream if provided a SinkNoData constructor. In other words, you probably don't want to do sequence . return.

Since 0.3.0

data SequencedSinkResponse state input m output Source

Return value from a SequencedSink.

Since 0.3.0

Constructors

Emit state [output]

Set a new state, and emit some new output.

Stop

End the conduit.

StartConduit (Conduit input m output)

Pass control to a new conduit.

Flushing

data Flush a Source

Provide for a stream of data that can be flushed.

A number of Conduits (e.g., zlib compression) need the ability to flush the stream at some point. This provides a single wrapper datatype to be used in all such circumstances.

Since 0.3.0

Constructors

Chunk a 
Flush 

Instances

Functor Flush 
Eq a => Eq (Flush a) 
Ord a => Ord (Flush a) 
Show a => Show (Flush a) 

Convenience re-exports

data ResourceT m a

The Resource transformer. This transformer keeps track of all registered actions, and calls them upon exit (via runResourceT). Actions may be registered via register, or resources may be allocated atomically via allocate. allocate corresponds closely to bracket.

Releasing may be performed before exit via the release function. This is a highly recommended optimization, as it will ensure that scarce resources are freed early. Note that calling release will deregister the action, so that a release action will only ever be called once.

Since 0.3.0

class (MonadThrow m, MonadUnsafeIO m, MonadIO m) => MonadResource m

A Monad which allows for safe resource allocation. In theory, any monad transformer stack included a ResourceT can be an instance of MonadResource.

Note: runResourceT has a requirement for a MonadBaseControl IO m monad, which allows control operations to be lifted. A MonadResource does not have this requirement. This means that transformers such as ContT can be an instance of MonadResource. However, the ContT wrapper will need to be unwrapped before calling runResourceT.

Since 0.3.0

class Monad m => MonadThrow m where

A Monad which can throw exceptions. Note that this does not work in a vanilla ST or Identity monad. Instead, you should use the ExceptionT transformer in your stack if you are dealing with a non-IO base monad.

Since 0.3.0

Methods

monadThrow :: Exception e => e -> m a

class Monad m => MonadUnsafeIO m where

A Monad based on some monad which allows running of some IO actions, via unsafe calls. This applies to IO and ST, for instance.

Since 0.3.0

Methods

unsafeLiftIO :: IO a -> m a

runResourceT :: MonadBaseControl IO m => ResourceT m a -> m a

Unwrap a ResourceT transformer, and call all registered release actions.

Note that there is some reference counting involved due to resourceForkIO. If multiple threads are sharing the same collection of resources, only the last call to runResourceT will deallocate the resources.

Since 0.3.0