pipes-concurrency-2.0.1: Concurrency for the pipes ecosystem

Safe HaskellTrustworthy




Asynchronous communication between pipes


Inputs and Outputs

newtype Input a Source

An exhaustible source of values

recv returns Nothing if the source is exhausted




recv :: STM (Maybe a)

newtype Output a Source

An exhaustible sink of values

send returns False if the sink is exhausted




send :: a -> STM Bool


Pipe utilities

fromInput :: MonadIO m => Input a -> Producer' a m ()Source

Convert an Input to a Producer

fromInput terminates when the Input is exhausted.

toOutput :: MonadIO m => Output a -> Consumer' a m ()Source

Convert an Output to a Consumer

toOutput terminates when the Output is exhausted.


spawn :: Buffer a -> IO (Output a, Input a)Source

Spawn a mailbox using the specified Buffer to store messages

Using send on the Output

  • fails and returns False if the mailbox is sealed, otherwise it:
  • retries if the mailbox is full, or:
  • adds a message to the mailbox and returns True.

Using recv on the Input:

  • retrieves a message from the mailbox wrapped in Just if the mailbox is not empty, otherwise it:
  • retries if the mailbox is not sealed, or:
  • fails and returns Nothing.

If either the Input or Output is garbage collected the mailbox will become sealed.

spawn' :: Buffer a -> IO (Output a, Input a, STM ())Source

Like spawn, but also returns an action to manually seal the mailbox early:

 (output, input, seal) <- spawn' buffer

Use the seal action to allow early cleanup of readers and writers to the mailbox without waiting for the next garbage collection cycle.

data Buffer a Source

Buffer specifies how to buffer messages stored within the mailbox



Store an Unbounded number of messages in a FIFO queue

Bounded Int

Store a Bounded number of messages, specified by the Int argument


Store a Single message (like Bounded 1, but more efficient)

Latest a

Only store the Latest message, beginning with an initial value

Latest is never empty nor full.


Control.Concurrent re-exports forkIO, although I recommend using the async library instead.

Control.Concurrent.STM re-exports atomically and STM.

System.Mem re-exports performGC.

module System.Mem