pipes-concurrency-1.2.0: Concurrency for the pipes ecosystem

Safe HaskellTrustworthy




Asynchronous communication between proxies


Spawn mailboxes

spawn :: Buffer a -> IO (Input a, Output a)Source

Spawn a mailbox that has an Input and Output end, using the specified Buffer to store messages

data Buffer a Source

Buffer specifies how to store messages sent to the Input end until the Output receives them.



Store an Unbounded number of messages in a FIFO queue

Bounded Int

Store a Bounded number of messages, specified by the Int argument


Store a Single message (like Bounded 1, but more efficient)

Latest a

Store the Latest message, beginning with an initial value

Latest is never empty nor full.

data Input a Source

Accepts messages for the mailbox


data Output a Source

Retrieves messages from the mailbox

Send and receive messages

send :: Input a -> a -> STM BoolSource

Send a message to the mailbox

  • Fails and returns False if the mailbox's Output has been garbage collected (even if the mailbox is not full), otherwise it:
  • Retries if the mailbox is full, or:
  • Succeeds if the mailbox is not full and returns True.

recv :: Output a -> STM (Maybe a)Source

Receive a message from the mailbox

  • Succeeds and returns a Just if the mailbox is not empty, otherwise it:
  • Retries if mailbox's Input has not been garbage collected, or:
  • Fails if the mailbox's Input has been garbage collected and returns Nothing.

Proxy utilities

sendD :: Proxy p => Input a -> x -> p x a x a IO ()Source

Writes all messages flowing 'D'ownstream to the given Input

sendD terminates when the corresponding Output is garbage collected.

 sendD :: (P.Proxy p) => Input a -> () -> Pipe p a a IO ()

recvS :: Proxy p => Output a -> r -> p x' x y' a IO rSource

Convert an Output to a Producer

recvS terminates when the Buffer is empty and the corresponding Input is garbage collected.

 recvS :: (Proxy p) => Output a -> () -> Producer p a IO ()


Control.Concurrent re-exports forkIO, although I recommend using the async library instead.

Control.Concurrent.STM re-exports atomically and STM.

System.Mem re-exports performGC.

module System.Mem