pipes-concurrency-2.0.8: Concurrency for the pipes ecosystem

Safe HaskellSafe




Asynchronous communication between pipes


Inputs and Outputs

newtype Input a Source #

An exhaustible source of values

recv returns Nothing if the source is exhausted





Monad Input Source # 


(>>=) :: Input a -> (a -> Input b) -> Input b #

(>>) :: Input a -> Input b -> Input b #

return :: a -> Input a #

fail :: String -> Input a #

Functor Input Source # 


fmap :: (a -> b) -> Input a -> Input b #

(<$) :: a -> Input b -> Input a #

Applicative Input Source # 


pure :: a -> Input a #

(<*>) :: Input (a -> b) -> Input a -> Input b #

(*>) :: Input a -> Input b -> Input b #

(<*) :: Input a -> Input b -> Input a #

Alternative Input Source # 


empty :: Input a #

(<|>) :: Input a -> Input a -> Input a #

some :: Input a -> Input [a] #

many :: Input a -> Input [a] #

MonadPlus Input Source # 


mzero :: Input a #

mplus :: Input a -> Input a -> Input a #

Monoid (Input a) Source # 


mempty :: Input a #

mappend :: Input a -> Input a -> Input a #

mconcat :: [Input a] -> Input a #

newtype Output a Source #

An exhaustible sink of values

send returns False if the sink is exhausted





Divisible Output Source # 


divide :: (a -> (b, c)) -> Output b -> Output c -> Output a #

conquer :: Output a #

Decidable Output Source # 


lose :: (a -> Void) -> Output a #

choose :: (a -> Either b c) -> Output b -> Output c -> Output a #

Contravariant Output Source #

This instance is useful for creating new tagged address, similar to elm's Signal.forwardTo. In fact elm's forwardTo is just 'flip contramap'


contramap :: (a -> b) -> Output b -> Output a #

(>$) :: b -> Output b -> Output a #

Monoid (Output a) Source # 


mempty :: Output a #

mappend :: Output a -> Output a -> Output a #

mconcat :: [Output a] -> Output a #

Pipe utilities

fromInput :: MonadIO m => Input a -> Producer' a m () Source #

Convert an Input to a Producer

fromInput terminates when the Input is exhausted.

toOutput :: MonadIO m => Output a -> Consumer' a m () Source #

Convert an Output to a Consumer

toOutput terminates when the Output is exhausted.


spawn :: Buffer a -> IO (Output a, Input a) Source #

Spawn a mailbox using the specified Buffer to store messages

Using send on the Output

  • fails and returns False if the mailbox is sealed, otherwise it:
  • retries if the mailbox is full, or:
  • adds a message to the mailbox and returns True.

Using recv on the Input:

  • retrieves a message from the mailbox wrapped in Just if the mailbox is not empty, otherwise it:
  • retries if the mailbox is not sealed, or:
  • fails and returns Nothing.

If either the Input or Output is garbage collected the mailbox will become sealed.

spawn' :: Buffer a -> IO (Output a, Input a, STM ()) Source #

Like spawn, but also returns an action to manually seal the mailbox early:

(output, input, seal) <- spawn' buffer

Use the seal action to allow early cleanup of readers and writers to the mailbox without waiting for the next garbage collection cycle.

withSpawn :: Buffer a -> ((Output a, Input a) -> IO r) -> IO r Source #

withSpawn passes its enclosed action an Output and Input like you'd get from spawn, but automatically seals them after the action completes. This can be used when you need the sealing behavior available from 'spawn\'', but want to work at a bit higher level:

withSpawn buffer $ \(output, input) -> ...

withSpawn is exception-safe, since it uses bracket internally.

withBuffer :: Buffer a -> (Output a -> IO l) -> (Input a -> IO r) -> IO (l, r) Source #

A more restrictive alternative to withSpawn that prevents deadlocks

data Buffer a Source #

Buffer specifies how to buffer messages stored within the mailbox



Deprecated: Use unbounded instead

Bounded Int

Deprecated: Use bounded instead


Deprecated: Use bounded 1 instead

Latest a

Deprecated: Use latest instead

Newest Int

Deprecated: Use newest instead


Deprecated: Use newest 1 instead

unbounded :: Buffer a Source #

Store an unbounded number of messages in a FIFO queue

bounded :: Int -> Buffer a Source #

Store a bounded number of messages, specified by the Int argument

latest :: a -> Buffer a Source #

Only store the Latest message, beginning with an initial value

Latest is never empty nor full.

newest :: Int -> Buffer a Source #

Like Bounded, but send never fails (the buffer is never full). Instead, old elements are discarded to make room for new elements


Control.Concurrent re-exports forkIO, although I recommend using the async library instead.

Control.Concurrent.STM re-exports atomically and STM.

System.Mem re-exports performGC.

module System.Mem