conduit-0.0.0.1: A pull-based approach to streaming data.

Data.Conduit

Contents

Description

The main module, exporting types, utility functions, and fuse and connect operators.

Synopsis

Types

Source

data SourceResult a Source

Result of pulling from a source. Either a new piece of data (Open), or indicates that the source is now Closed.

Constructors

Open a 
Closed 

Instances

data PreparedSource m a Source

A PreparedSource has two operations on it: pull some data, and close the PreparedSource. Since PreparedSource is built on top of ResourceT, all acquired resources should be automatically released anyway. Closing a PreparedSource early is merely an optimization to free scarce resources as soon as possible.

A PreparedSource has three invariants:

newtype Source m a Source

All but the simplest of PreparedSources (e.g., repeat) require some type of state to track their current status. This may be in the form of a mutable variable (e.g., IORef), or via opening a resource like a Handle. While a PreparedSource is given no opportunity to acquire such resources, this type is.

A Source is simply a monadic action that returns a PreparedSource. One nice consequence of this is the possibility of creating an efficient Monoid instance, which will only acquire one resource at a time, instead of bulk acquiring all resources at the beginning of running the Source.

Note that each time you "call" a Source, it is started from scratch. If you want a resumable source (e.g., one which can be passed to multiple Sinks), you likely want to use a BufferedSource.

Constructors

Source 

Instances

data BufferedSource m a Source

When actually interacting with Sources, we usually want to be able to buffer the output, in case any intermediate steps return leftover data. A BufferedSource allows for such buffering, via the bsourceUnpull function.

A BufferedSource, unlike a Source, is resumable, meaning it can be passed to multiple Sinks without restarting.

Finally, a BufferedSource relaxes one of the invariants of a Source: calling bsourcePull after an EOF will simply return another EOF.

A BufferedSource is also known as a resumable source, in that it can be called multiple times, and each time will provide new data. One caveat: while the types will allow you to use the buffered source in multiple threads, there is no guarantee that all BufferedSources will handle this correctly.

Instances

BufferSource BufferedSource

Note that this instance hides the bsourceClose record, so that a BufferedSource remains resumable. The correct way to handle closing of a resumable source would be to call bsourceClose on the originally BufferedSource, e.g.:

 bsrc <- bufferSource $ sourceFile "myfile.txt"
 bsrc $$ drop 5
 rest <- bsrc $$ consume
 bsourceClose bsrc

Note that the call to the $$ operator allocates a new BufferedSource internally, so that when $$ calls bsourceClose the first time, it does not close the actual file, thereby allowing us to pass the same bsrc to the consume function. Afterwards, we should call bsourceClose manually (though runResourceT will handle it for us eventually).

class BufferSource s whereSource

This typeclass allows us to unify operators on Source and BufferedSource.

Methods

bufferSource :: Resource m => s m a -> ResourceT m (BufferedSource m a)Source

Instances

BufferSource BufferedSource

Note that this instance hides the bsourceClose record, so that a BufferedSource remains resumable. The correct way to handle closing of a resumable source would be to call bsourceClose on the originally BufferedSource, e.g.:

 bsrc <- bufferSource $ sourceFile "myfile.txt"
 bsrc $$ drop 5
 rest <- bsrc $$ consume
 bsourceClose bsrc

Note that the call to the $$ operator allocates a new BufferedSource internally, so that when $$ calls bsourceClose the first time, it does not close the actual file, thereby allowing us to pass the same bsrc to the consume function. Afterwards, we should call bsourceClose manually (though runResourceT will handle it for us eventually).

BufferSource Source 
BufferSource PreparedSource 

Sink

data SinkResult input output Source

A Sink ultimately returns a single output value. Each time data is pushed to it, a Sink may indicate that it is still processing data, or that it is done, in which case it returns some optional leftover input and an output value.

Constructors

Processing 
Done (Maybe input) output 

Instances

data PreparedSink input m output Source

In general, a sink will consume data and eventually produce an output when it has consumed "enough" data. There are two caveats to that statement:

  • Some sinks do not actually require any data to produce an output. This is included with a sink in order to allow for a Monad instance.
  • Some sinks will consume all available data and only produce a result at the "end" of a data stream (e.g., sum).

To allow for the first caveat, we have the SinkNoData constructor. For the second, the SinkData constructor has two records: one for receiving more input, and the other to indicate the end of a stream. Note that, at the end of a stream, some output is required. If a specific Sink implementation cannot always produce output, this should be indicated in its return value, using something like a Maybe or Either.

Invariants:

  • After a PreparedSink produces a result (either via sinkPush or sinkClose), neither of those two functions may be called on the Sink again.
  • If a Sink needs to clean up any resources (e.g., close a file handle), it must do so whenever it returns a result, either via sinkPush or sinkClose. Note that, due to usage of ResourceT, this is merely an optimization.

Constructors

SinkNoData output 
SinkData 

Fields

sinkPush :: input -> ResourceT m (SinkResult input output)
 
sinkClose :: ResourceT m output
 

Instances

Monad m => Functor (PreparedSink input m) 

newtype Sink input m output Source

Most PreparedSinks require some type of state, similar to PreparedSources. Like a Source for a PreparedSource, a Sink is a simple monadic wrapper around a PreparedSink which allows initialization of such state. See Source for further caveats.

Note that this type provides a Monad instance, allowing you to easily compose Sinks together.

Constructors

Sink 

Fields

prepareSink :: ResourceT m (PreparedSink input m output)
 

Instances

(Base m ~ base, Resource m, Applicative base) => MonadBase base (Sink input m) 
MonadTrans (Sink input) 
Resource m => Monad (Sink input m) 
Monad m => Functor (Sink input m) 
Resource m => Applicative (Sink input m) 
(Resource m, MonadIO m) => MonadIO (Sink input m) 

Conduit

data ConduitResult input output Source

When data is pushed to a Conduit, it may either indicate that it is still producing output and provide some, or indicate that it is finished producing output, in which case it returns optional leftover input and some final output.

Constructors

Producing [output] 
Finished (Maybe input) [output] 

Instances

data PreparedConduit input m output Source

A conduit has two operations: it can receive new input (a push), and can be closed.

Invariants:

  • Neither a push nor close may be performed after a conduit returns a Finished from a push, or after a close is performed.

Constructors

PreparedConduit 

Fields

conduitPush :: input -> ResourceT m (ConduitResult input output)
 
conduitClose :: ResourceT m [output]
 

Instances

Monad m => Functor (PreparedConduit input m) 

newtype Conduit input m output Source

A monadic action generating a PreparedConduit. See Source and Sink for more motivation.

Constructors

Conduit 

Fields

prepareConduit :: ResourceT m (PreparedConduit input m output)
 

Instances

Monad m => Functor (Conduit input m) 

Connect/fuse operators

($$) :: (BufferSource bsrc, Resource m) => bsrc m a -> Sink a m b -> ResourceT m bSource

The connect operator, which pulls data from a source and pushes to a sink. There are three ways this process can terminate:

  1. In the case of a SinkNoData constructor, the source is not opened at all, and the output value is returned immediately.
  2. The sink returns Done, in which case any leftover input is returned via bsourceUnpull the source is closed.
  3. The source return Closed, in which case the sink is closed.

Note that the input source is converted to a BufferedSource via bufferSource. As such, if the input to this function is itself a BufferedSource, the call to bsourceClose will have no effect, as described in the comments on that instance.

($=) :: (Resource m, BufferSource bsrc) => bsrc m a -> Conduit a m b -> Source m bSource

Left fuse, combining a source and a conduit together into a new source.

(=$) :: Resource m => Conduit a m b -> Sink b m c -> Sink a m cSource

Right fuse, combining a conduit and a sink together into a new sink.

(=$=) :: Resource m => Conduit a m b -> Conduit b m c -> Conduit a m cSource

Middle fuse, combining two conduits together into a new conduit.

Utility functions

Source

sourceStateSource

Arguments

:: Resource m 
=> state

Initial state

-> (state -> ResourceT m (state, SourceResult output))

Pull function

-> Source m output 

Construct a Source with some stateful functions. This function address all mutable state for you.

sourceIOSource

Arguments

:: ResourceIO m 
=> IO state

resource and/or state allocation

-> (state -> IO ())

resource and/or state cleanup

-> (state -> m (SourceResult output))

Pull function. Note that this need not explicitly perform any cleanup.

-> Source m output 

Construct a Source based on some IO actions for alloc/release.

transSource :: (Base m ~ Base n, Monad m) => (forall a. m a -> n a) -> Source m output -> Source n outputSource

Transform the monad a Source lives in.

Sink

sinkStateSource

Arguments

:: Resource m 
=> state

initial state

-> (state -> input -> ResourceT m (state, SinkResult input output))

push

-> (state -> ResourceT m output)

Close. Note that the state is not returned, as it is not needed.

-> Sink input m output 

Construct a Sink with some stateful functions. This function address all mutable state for you.

sinkIOSource

Arguments

:: ResourceIO m 
=> IO state

resource and/or state allocation

-> (state -> IO ())

resource and/or state cleanup

-> (state -> input -> m (SinkResult input output))

push

-> (state -> m output)

close

-> Sink input m output 

Construct a Sink. Note that your push and close functions need not explicitly perform any cleanup.

transSink :: (Base m ~ Base n, Monad m) => (forall a. m a -> n a) -> Sink input m output -> Sink input n outputSource

Transform the monad a Sink lives in.

Conduit

conduitStateSource

Arguments

:: Resource m 
=> state

initial state

-> (state -> input -> ResourceT m (state, ConduitResult input output))

Push function.

-> (state -> ResourceT m [output])

Close function. The state need not be returned, since it will not be used again.

-> Conduit input m output 

Construct a Conduit with some stateful functions. This function address all mutable state for you.

conduitIOSource

Arguments

:: ResourceIO m 
=> IO state

resource and/or state allocation

-> (state -> IO ())

resource and/or state cleanup

-> (state -> input -> m (ConduitResult input output))

Push function. Note that this need not explicitly perform any cleanup.

-> (state -> m [output])

Close function. Note that this need not explicitly perform any cleanup.

-> Conduit input m output 

Construct a Conduit.

transConduit :: (Monad m, Base m ~ Base n) => (forall a. m a -> n a) -> Conduit input m output -> Conduit input n outputSource

Transform the monad a Conduit lives in.

Sequencing

type SequencedSink state input m output = state -> Sink input m (SequencedSinkResponse state input m output)Source

Helper type for constructing a Conduit based on Sinks. This allows you to write higher-level code that takes advantage of existing conduits and sinks, and leverages a sink's monadic interface.

sequenceSinkSource

Arguments

:: Resource m 
=> state

initial state

-> SequencedSink state input m output 
-> Conduit input m output 

Convert a SequencedSink into a Conduit.

data SequencedSinkResponse state input m output Source

Return value from a SequencedSink.

Constructors

Emit state [output]

Set a new state, and emit some new output.

Stop

End the conduit.

StartConduit (Conduit input m output)

Pass control to a new conduit.

Convenience re-exports

data ResourceT m a Source

The Resource transformer. This transformer keeps track of all registered actions, and calls them upon exit (via runResourceT). Actions may be registered via register, or resources may be allocated atomically via with or withIO. The with functions correspond closely to bracket.

Releasing may be performed before exit via the release function. This is a highly recommended optimization, as it will ensure that scarce resources are freed early. Note that calling release will deregister the action, so that a release action will only ever be called once.

class (HasRef (Base m), Monad m) => Resource m whereSource

A Monad with a base that has mutable references, and allows some way to run base actions and clean up properly.

Associated Types

type Base m :: * -> *Source

The base monad for the current monad stack. This will usually be IO or ST.

Methods

resourceLiftBase :: Base m a -> m aSource

Run some action in the Base monad. This function corresponds to liftBase, but due to various type issues, we need to have our own version here.

resourceBracket_Source

Arguments

:: Base m ()

init

-> Base m ()

cleanup

-> m c

body

-> m c 

Guarantee that some initialization and cleanup code is called before and after some action. Note that the initialization and cleanup lives in the base monad, while the body is in the top monad.

Instances

class (ResourceBaseIO (Base m), ResourceUnsafeIO m, ResourceThrow m, MonadIO m, MonadBaseControl IO m) => ResourceIO m Source

A Resource which can safely run IO calls.

Instances

class Resource m => ResourceUnsafeIO m Source

A Resource based on some monad which allows running of some IO actions, via unsafe calls. This applies to IO and ST, for instance.

runResourceT :: Resource m => ResourceT m a -> m aSource

Unwrap a ResourceT transformer, and call all registered release actions.

Note that there is some reference counting involved due to resourceForkIO. If multiple threads are sharing the same collection of resources, only the last call to runResourceT will deallocate the resources.

class Resource m => ResourceThrow m whereSource

A Resource which can throw exceptions. Note that this does not work in a vanilla ST monad. Instead, you should use the ExceptionT transformer on top of ST.

Methods

resourceThrow :: Exception e => e -> m aSource