pipes-concurrency-2.0.3: Concurrency for the pipes ecosystem

Safe HaskellSafe
LanguageHaskell98

Pipes.Concurrent

Contents

Description

Asynchronous communication between pipes

Synopsis

Inputs and Outputs

newtype Input a Source

An exhaustible source of values

recv returns Nothing if the source is exhausted

Constructors

Input 

Fields

recv :: STM (Maybe a)
 

newtype Output a Source

An exhaustible sink of values

send returns False if the sink is exhausted

Constructors

Output 

Fields

send :: a -> STM Bool
 

Instances

Pipe utilities

fromInput :: MonadIO m => Input a -> Producer' a m () Source

Convert an Input to a Producer

fromInput terminates when the Input is exhausted.

toOutput :: MonadIO m => Output a -> Consumer' a m () Source

Convert an Output to a Consumer

toOutput terminates when the Output is exhausted.

Actors

spawn :: Buffer a -> IO (Output a, Input a) Source

Spawn a mailbox using the specified Buffer to store messages

Using send on the Output

  • fails and returns False if the mailbox is sealed, otherwise it:
    • retries if the mailbox is full, or:
      • adds a message to the mailbox and returns True.

    Using recv on the Input:

    • retrieves a message from the mailbox wrapped in Just if the mailbox is not empty, otherwise it:
      • retries if the mailbox is not sealed, or:
      • fails and returns Nothing.

    If either the Input or Output is garbage collected the mailbox will become sealed.

spawn' :: Buffer a -> IO (Output a, Input a, STM ()) Source

Like spawn, but also returns an action to manually seal the mailbox early:

(output, input, seal) <- spawn' buffer
...

Use the seal action to allow early cleanup of readers and writers to the mailbox without waiting for the next garbage collection cycle.

data Buffer a Source

Buffer specifies how to buffer messages stored within the mailbox

Constructors

Unbounded

Deprecated: Use unbounded instead

Bounded Int

Deprecated: Use bounded instead

Single

Deprecated: Use bounded 1 instead

Latest a

Deprecated: Use latest instead

Newest Int

Deprecated: Use newest instead

New

Deprecated: Use newest 1 instead

unbounded :: Buffer a Source

Store an unbounded number of messages in a FIFO queue

bounded :: Int -> Buffer a Source

Store a bounded number of messages, specified by the Int argument

latest :: a -> Buffer a Source

Only store the Latest message, beginning with an initial value

Latest is never empty nor full.

newest :: Int -> Buffer a Source

Like Bounded, but send never fails (the buffer is never full). Instead, old elements are discard to make room for new elements

Re-exports

Control.Concurrent re-exports forkIO, although I recommend using the async library instead.

Control.Concurrent.STM re-exports atomically and STM.

System.Mem re-exports performGC.

module System.Mem