A collection of useful common processes that are useful when plumbing together a process network. All the processes here rethrow poison when it is encountered, as this gives the user maximum flexibility (they can let it propagate it, or ignore it).

The names here overlap with standard Prelude names. This is deliberate, as the processes act in a similar manner to the corresponding Prelude versions. It is expected that you will do something like:

import qualified Control.Concurrent.CHP.Common as Common

or:

import qualified Control.Concurrent.CHP.Common as CHP

to circumvent this problem.

- id :: (ReadableChannel r, Poisonable (r a), WriteableChannel w, Poisonable (w a)) => r a -> w a -> CHP ()
- extId :: Chanin a -> Chanout a -> CHP ()
- tap :: Chanin a -> [Chanout a] -> CHP ()
- prefix :: a -> Chanin a -> Chanout a -> CHP ()
- tail :: Chanin a -> Chanout a -> CHP ()
- succ :: Enum a => Chanin a -> Chanout a -> CHP ()
- parDelta :: Chanin a -> [Chanout a] -> CHP ()
- map :: (a -> b) -> Chanin a -> Chanout b -> CHP ()
- map' :: NFData b => (a -> b) -> Chanin a -> Chanout b -> CHP ()
- filter :: (a -> Bool) -> Chanin a -> Chanout a -> CHP ()
- stream :: Traversable t => Chanin (t a) -> Chanout a -> CHP ()
- merger :: [Chanin a] -> Chanout a -> CHP ()
- replicate :: Int -> a -> Chanout a -> CHP ()
- repeat :: a -> Chanout a -> CHP ()
- consume :: Chanin a -> CHP ()
- consumeAlongside :: Chanin a -> CHP b -> CHP b
- join :: (a -> b -> c) -> Chanin a -> Chanin b -> Chanout c -> CHP ()
- joinList :: [Chanin a] -> Chanout [a] -> CHP ()
- split :: Chanin (a, b) -> Chanout a -> Chanout b -> CHP ()
- sorter :: Ord a => Chanin (Maybe a) -> Chanout (Maybe a) -> CHP ()
- sorter' :: forall a. (a -> a -> Bool) -> Chanin (Maybe a) -> Chanout (Maybe a) -> CHP ()
- valueStore :: (ReadableChannel r, Poisonable (r a), WriteableChannel w, Poisonable (w a)) => a -> r a -> w a -> CHP ()
- valueStore' :: (ReadableChannel r, Poisonable (r a), WriteableChannel w, Poisonable (w a)) => r a -> w a -> CHP ()
- advanceTime :: (Waitable c, Ord t) => (t -> t) -> Enrolled c t -> CHP ()

# Documentation

id :: (ReadableChannel r, Poisonable (r a), WriteableChannel w, Poisonable (w a)) => r a -> w a -> CHP ()Source

Forever forwards the value onwards, unchanged. Adding this to your process network effectively adds a single-place buffer.

extId :: Chanin a -> Chanout a -> CHP ()Source

Forever forwards the value onwards. This is
like `id`

but does not add any buffering to your network, and its presence
is indetectable to the process either side.

extId is a unit of the associative operator `Control.Concurrent.CHP.Utils.|->|`

.

The behaviour of this process was corrected in version 1.1.0 to work properly when the reader of its output channel was offering choice.

tap :: Chanin a -> [Chanout a] -> CHP ()Source

A process that waits for an input, then sends it out on *all* its output
channels (in order) during an extended rendezvous. This is often used to send the
output on to both the normal recipient (without introducing buffering) and
also to a listener process that wants to examine the value. If the listener
process is first in the list, and does not take the input immediately, the
value will not be sent to the other recipients until it does. The name
of the process derives from the notion of a wire-tap, since the listener
is hidden from the other processes (it does not visibly change the semantics
for them -- except when the readers of the channels are offering a choice).

prefix :: a -> Chanin a -> Chanout a -> CHP ()Source

Sends out a single value first (the prefix) then behaves like id.

tail :: Chanin a -> Chanout a -> CHP ()Source

Discards the first value it receives then act likes id.

Added in version 1.5.0.

succ :: Enum a => Chanin a -> Chanout a -> CHP ()Source

Forever reads in a value, and then sends out its successor (using `succ`

).

parDelta :: Chanin a -> [Chanout a] -> CHP ()Source

Reads in a value, and sends it out in parallel on all the given output channels.

map :: (a -> b) -> Chanin a -> Chanout b -> CHP ()Source

Forever reads in a value, transforms it using the given function, and sends it
out again. Note that the transformation is not applied strictly, so don't
assume that this process will actually perform the computation. If you
require a strict transformation, use `map'`

.

map' :: NFData b => (a -> b) -> Chanin a -> Chanout b -> CHP ()Source

Like `map`

, but applies the transformation strictly before sending on
the value.

Added in version 1.1.0.

filter :: (a -> Bool) -> Chanin a -> Chanout a -> CHP ()Source

Forever reads in a value, and then based on applying the given function either discards it (if the function returns False) or sends it on (if the function returns True).

stream :: Traversable t => Chanin (t a) -> Chanout a -> CHP ()Source

Streams all items in a `Data.Traversable.Traversable`

container out
in the order given by `Data.Traversable.mapM`

on the output channel (one at
a time). Lists, `Maybe`

, and `Data.Set.Set`

are all instances
of `Data.Traversable.Traversable`

, so this can be used for all of
those.

merger :: [Chanin a] -> Chanout a -> CHP ()Source

Forever waits for input from one of its many channels and sends it out again on the output channel.

replicate :: Int -> a -> Chanout a -> CHP ()Source

Sends out the specified value on the given channel the specified number of times, then finishes.

repeat :: a -> Chanout a -> CHP ()Source

Forever sends out the same value on the given channel, until poisoned. Similar to the white-hole processes in some other frameworks.

consume :: Chanin a -> CHP ()Source

Forever reads values from the channel and discards them, until poisoned. Similar to the black-hole processes in some other frameworks.

consumeAlongside :: Chanin a -> CHP b -> CHP bSource

For the duration of the given process, acts as a consume process, but stops when the given process stops. Note that there could be a timing issue where extra inputs are consumed at the end of the lifetime of the process. Note also that while poison from the given process will be propagated on the consumption channel, there is no mechanism to propagate poison from the consumption channel into the given process.

Added in version 1.2.0.

join :: (a -> b -> c) -> Chanin a -> Chanin b -> Chanout c -> CHP ()Source

Forever reads a value from both its input channels in parallel, then joins
the two values using the given function and sends them out again. For example,
`join (,) c d`

will pair the values read from `c`

and `d`

and send out the
pair on the output channel, whereas `join (&&)`

will send out the conjunction
of two boolean values, `join (==)`

will read two values and output whether
they are equal or not, etc.

joinList :: [Chanin a] -> Chanout [a] -> CHP ()Source

Forever reads a value from all its input channels in parallel, then joins the values into a list in the same order as the channels, and sends them out again.

split :: Chanin (a, b) -> Chanout a -> Chanout b -> CHP ()Source

Forever reads a pair from its input channel, then in parallel sends out the first and second parts of the pair on its output channels.

Added in version 1.0.2.

sorter :: Ord a => Chanin (Maybe a) -> Chanout (Maybe a) -> CHP ()Source

A sorter process. When it receives its first `Just x`

data item, it keeps
it. When it receieves a second, it keeps the lowest of the two, and sends
out the other one. When it receives Nothing, it sends out its data value,
then sends Nothing too. The overall effect when chaining these things together
is a sorting pump. You inject all the values with Just, then send in a
single Nothing to get the results out (in reverse order).

sorter' :: forall a. (a -> a -> Bool) -> Chanin (Maybe a) -> Chanout (Maybe a) -> CHP ()Source

Like sorter, but with a custom comparison method. You should pass in the equivalent of less-than: (<).

valueStore :: (ReadableChannel r, Poisonable (r a), WriteableChannel w, Poisonable (w a)) => a -> r a -> w a -> CHP ()Source

A shared variable process. Given an initial value and two channels, it continually offers to output its current value or read in a new one.

Added in version 1.1.1

Note that prior to version 1.2.0 (i.e. in version 1.1.1) there was a bug where poison would not be propagated between the input and output.

valueStore' :: (ReadableChannel r, Poisonable (r a), WriteableChannel w, Poisonable (w a)) => r a -> w a -> CHP ()Source

A shared variable process. The same as valueStore, but initially waits to read its starting value before then offering to either output its current value or read in a new one.

Added in version 1.1.1

Note that prior to version 1.2.0 (i.e. in version 1.1.1) there was a bug where poison would not be propagated between the input and output.