stm-conduit-3.0.0: Introduces conduits to channels, and promotes using conduits concurrently.

Data.Conduit.TMChan

Description

• Introduction

Contains a simple source and sink for linking together conduits in in different threads. Usage is so easy, it's best explained with an example:

We first create a channel for communication...

do chan <- atomically $newTBMChan 16 Then we fork a new thread loading a wackton of pictures into memory. The data (pictures, in this case) will be streamed down the channel to whatever is on the other side.  _ <- forkIO . runResourceT$ loadTextures lotsOfPictures $$sinkTBMChan chan Finally, we connect something to the other end of the channel. In this case, we connect a sink which uploads the textures one by one to the graphics card.  runResourceT  sourceTBMChan chan$$ Conduit.mapM_ (liftIO . uploadToGraphicsCard)

By running the two tasks in parallel, we no longer have to wait for one texture to upload to the graphics card before reading the next one from disk. This avoids the common switching of bottlenecks (such as between the disk and graphics memory) that most loading processes seem to love.

Control.Concurrent.STM.TMChan and Control.Concurrent.STM.TBMChan are re-exported for convenience.

• Caveats

It is recommended to use TBMChan as much as possible, and generally avoid TMChan usage. TMChans are unbounded, and if used, the conduit pipeline will no longer use a bounded amount of space. They will essentially leak memory if the writer is faster than the reader.

Therefore, use bounded channels as much as possible, preferably with a high bound so it will be hit infrequently.

Synopsis

# Bounded Channel Connectors

sourceTBMChan :: MonadIO m => TBMChan a -> Source m a Source #

A simple wrapper around a TBMChan. As data is pushed into the channel, the source will read it and pass it down the conduit pipeline. When the channel is closed, the source will close also.

If the channel fills up, the pipeline will stall until values are read.

Arguments

 :: MonadIO m => TBMChan a -> Bool Should the channel be closed when the sink is closed? -> Sink a m ()

A simple wrapper around a TBMChan. As data is pushed into the sink, it will magically begin to appear in the channel. If the channel is full, the sink will block until space frees up.

# Unbounded Channel Connectors

sourceTMChan :: MonadIO m => TMChan a -> Source m a Source #

A simple wrapper around a TMChan. As data is pushed into the channel, the source will read it and pass it down the conduit pipeline. When the channel is closed, the source will close also.

Arguments

 :: MonadIO m => TMChan a -> Bool Should the channel be closed when the sink is closed? -> Sink a m ()

A simple wrapper around a TMChan. As data is pushed into this sink, it will magically begin to appear in the channel.

# Parallel Combinators

(>=<) :: (MonadResource mi, MonadIO mo, MonadBaseControl IO mi) => Source mi a -> Source mi a -> mi (Source mo a) infixl 5 Source #

Combines two sources with an unbounded channel, creating a new source which pulls data from a mix of the two sources: whichever produces first.

The order of the new source's data is undefined, but it will be some combination of the two given sources.

Arguments

 :: (MonadResource mi, MonadIO mo, MonadBaseControl IO mi) => [Source mi a] The sources to merge. -> Int The bound of the intermediate channel. -> mi (Source mo a)

Merges a list of sources, putting them all into a bounded channel, and returns a source which can be pulled from to pull from all the given sources in a first-come-first-serve basis.

The order of the new source's data is undefined, but it will be some combination of the given sources. The monad of the resultant source (mo) is independent of the monads of the input sources (mi).

All spawned threads will be removed when source is closed or upon an exit from ResourceT region. This means that result can only be used within a runResourceT scope.

@before 3.0 Spawned threads are not guaranteed to be closed. This may happen if Source was closed before all it's input were closed.

Since: 3.0

(<=>) :: (MonadIO mi, MonadThrow mi, MonadIO mo, MonadBaseControl IO mi) => Show i => Conduit i (ResourceT mi) i -> Conduit i (ResourceT mi) i -> ResourceT mi (Conduit i mo i) Source #

Combines two conduits with unbounded channels, creating a new conduit which pulls data from a mix of the two: whichever produces first.

The order of the new conduit's output is undefined, but it will be some combination of the two given conduits.

Arguments

 :: (MonadIO mi, MonadIO mi, MonadThrow mi, MonadIO mo, MonadBaseControl IO mi) => [Conduit i (ResourceT mi) o] The conduits to merge. -> Int The bound for the channels. -> ResourceT mi (Conduit i mo o)

Provide an input across several conduits, putting them all into a bounded channel. Returns a conduit which can be pulled from to pull from all the given conduits in a first-come-first-serve basis.

The order of the new conduits's outputs is undefined, but it will be some combination of the given conduits. The monad of the resultant conduit (mo) is independent of the monads of the input conduits (mi).

Closes all worker processes when resulting conduit is closed or when execution leaves ResourceT context. This means that conduit is only valid inside runResouceT scope.

@before 3.0 Spawned threads are not guaranteed to be closed, This may happen if threads Conduit was closed before all threads have finished execution.

Since: 3.0