stm-conduit-2.5.1: Introduces conduits to channels, and promotes using conduits concurrently.

Safe HaskellNone

Data.Conduit.TMChan

Contents

Description

  • Introduction

Contains a simple source and sink for linking together conduits in in different threads. Usage is so easy, it's best explained with an example:

We first create a channel for communication...

 do chan <- atomically $ newTBMChan 16

Then we fork a new thread loading a wackton of pictures into memory. The data (pictures, in this case) will be streamed down the channel to whatever is on the other side.

    _ <- forkIO . runResourceT $ loadTextures lotsOfPictures $$ sinkTBMChan chan

Finally, we connect something to the other end of the channel. In this case, we connect a sink which uploads the textures one by one to the graphics card.

    runResourceT $ sourceTBMChan chan $$ Conduit.mapM_ (liftIO . uploadToGraphicsCard)

By running the two tasks in parallel, we no longer have to wait for one texture to upload to the graphics card before reading the next one from disk. This avoids the common switching of bottlenecks (such as between the disk and graphics memory) that most loading processes seem to love.

Control.Concurrent.STM.TMChan and Control.Concurrent.STM.TBMChan are re-exported for convenience.

  • Caveats

It is recommended to use TBMChan as much as possible, and generally avoid TMChan usage. TMChans are unbounded, and if used, the conduit pipeline will no longer use a bounded amount of space. They will essentially leak memory if the writer is faster than the reader.

Therefore, use bounded channels as much as possible, preferably with a high bound so it will be hit infrequently.

Synopsis

Bounded Channel Connectors

sourceTBMChan :: MonadIO m => TBMChan a -> Source m aSource

A simple wrapper around a TBMChan. As data is pushed into the channel, the source will read it and pass it down the conduit pipeline. When the channel is closed, the source will close also.

If the channel fills up, the pipeline will stall until values are read.

sinkTBMChanSource

Arguments

:: MonadIO m 
=> TBMChan a 
-> Bool

Should the channel be closed when the sink is closed?

-> Sink a m () 

A simple wrapper around a TBMChan. As data is pushed into the sink, it will magically begin to appear in the channel. If the channel is full, the sink will block until space frees up.

Unbounded Channel Connectors

sourceTMChan :: MonadIO m => TMChan a -> Source m aSource

A simple wrapper around a TMChan. As data is pushed into the channel, the source will read it and pass it down the conduit pipeline. When the channel is closed, the source will close also.

sinkTMChanSource

Arguments

:: MonadIO m 
=> TMChan a 
-> Bool

Should the channel be closed when the sink is closed?

-> Sink a m () 

A simple wrapper around a TMChan. As data is pushed into this sink, it will magically begin to appear in the channel.

Parallel Combinators

(>=<) :: (MonadResource mi, MonadIO mo, MonadBaseControl IO mi) => Source mi a -> Source mi a -> mi (Source mo a)Source

Combines two sources with an unbounded channel, creating a new source which pulls data from a mix of the two sources: whichever produces first.

The order of the new source's data is undefined, but it will be some combination of the two given sources.

mergeSourcesSource

Arguments

:: (MonadResource mi, MonadIO mo, MonadBaseControl IO mi) 
=> [Source mi a]

The sources to merge.

-> Int

The bound of the intermediate channel.

-> mi (Source mo a) 

Merges a list of sources, putting them all into a bounded channel, and returns a source which can be pulled from to pull from all the given sources in a first-come-first-serve basis.

The order of the new source's data is undefined, but it will be some combination of the given sources. The monad of the resultant source (mo) is independent of the monads of the input sources (mi).

(<=>) :: (MonadIO mi, MonadIO mo, MonadBaseControl IO mi) => Show i => Conduit i (ResourceT mi) i -> Conduit i (ResourceT mi) i -> ResourceT mi (Conduit i mo i)Source

Combines two conduits with unbounded channels, creating a new conduit which pulls data from a mix of the two: whichever produces first.

The order of the new conduit's output is undefined, but it will be some combination of the two given conduits.

mergeConduitsSource

Arguments

:: (MonadIO mi, MonadIO mo, MonadBaseControl IO mi) 
=> [Conduit i (ResourceT mi) o]

The conduits to merge.

-> Int

The bound for the channels.

-> ResourceT mi (Conduit i mo o) 

Provide an input across several conduits, putting them all into a bounded channel. Returns a conduit which can be pulled from to pull from all the given conduits in a first-come-first-serve basis.

The order of the new conduits's outputs is undefined, but it will be some combination of the given conduits. The monad of the resultant conduit (mo) is independent of the monads of the input conduits (mi).