split-channel- Control.Concurrent.Chan split into sending and receiving halves.

Safe HaskellSafe-Infered



This package provides an unbounded, imperative queue that is supposed to be thread safe and asynchronous exception safe. It is essentially the same communication mechanism as Chan, except that each channel is split into separate sending and receiving ends, called SendPort and ReceivePort respectively. This has at least two advantages:

  1. Program behavior can be more finely constrained via the type system.
  2. Channels can have zero ReceivePorts associated with them. Messages written to such a channel disappear into the aether and can be garbage collected. Note that ReceivePorts can be subsequently attached to such a channel via listen, and can be detached via garbage collection.

By contrast, Chan couples a SendPort and a ReceivePort together in a pair. Thus keeping a reference to a SendPort implies that there is at least one reference to a ReceivePort, which means that messages cannot be garbage collected from an active channel if nobody is listening.

A channel can have multiple ReceivePorts. This results in a publish- subscribe pattern of communication: every message will be delivered to every port. Alternatively, multiple threads can read from a single port. This results in a push-pull pattern of communication, similar to ZeroMQ: every message will be delivered to exactly one thread. Of course both can be used together to form hybrid patterns of communication.

A channel can only have one SendPort. However multiple threads can can safely write to a single port, allowing effects similar to multiple SendPorts.

Some of the tradeoffs of split-channel compared to Cloud Haskell's Remote.Channel are:

  1. split-channel is restricted to in-process communications only.
  2. split-channel has no restriction on the type of values that may be communicated.
  3. There is a quasi-duality between the two approaches: Cloud Haskell's ReceivePorts are special whereas split-channel's SendPorts are special.
  4. ReceivePorts can be duplicated, which allows for considerably more efficient publish-subscribe communications than supported by Cloud Haskell at the present time.



data SendPort a Source

SendPorts represent one end of the channel. There is only one SendPort per channel, though it can be used from multiple threads. Messages can be sent to the channel using send.


data ReceivePort a Source

ReceivePorts represent the other end of a channel. A channel can have many ReceivePorts, which all receive the same messages in a publish/subscribe like manner. A single ReceivePort can be used from multiple threads, where every message will be delivered to a single thread in a push/pull like manner. Use receive to fetch messages from the channel.

new :: IO (SendPort a, ReceivePort a)Source

Creates a new channel and a (SendPort, ReceivePort) pair representing the two sides of the channel.

newSendPort :: IO (SendPort a)Source

Produces a new channel that initially has zero ReceivePorts. Any elements written to this channel before a reader is listening will be eligible for garbage collection. Note that one can one can implement newSendPort in terms of new by throwing away the ReceivePort and letting it be garbage collected, and that one can implement new in terms of newSendPort and listen.

send :: SendPort a -> a -> IO ()Source

Send an element to a channel. This is asynchronous and does not block.

receive :: ReceivePort a -> IO aSource

Fetch an element from a channel. If no element is available, it blocks until one is. Can be used in conjunction with System.Timeout.

sendMany :: SendPort a -> [a] -> IO ()Source

Atomically send many messages at once. Note that this function forces the spine of the list beforehand to minimize the critical section, which also helps prevent exceptions at inopportune times. Trying to send an infinite list will never send anything, though it will allocate and retain a lot of memory trying to do so.

listen :: SendPort a -> IO (ReceivePort a)Source

Create a new ReceivePort attached the same channel as a given SendPort. This ReceivePort starts out empty, and remains so until more elements are written to the SendPort.

duplicate :: ReceivePort a -> IO (ReceivePort a)Source

Create a new ReceivePort attached to the same channel as another ReceivePort. These two ports will receive the same messages. Any messages in the channel that have not been consumed by the existing port will also appear in the new port.

split :: SendPort a -> IO (ReceivePort a, SendPort a)Source

This function associates a brand new channel with a existing send port, returning a new receive port associated with the existing send port and a new send port associated with the existing receive ports of the existing send port.

A possible use case is to transparently replace the backend of a service without affecting the clients of that service. For example, split might be used along the following lines:

 data Service = Service { sp :: SendPort Request, .. }

swapService :: Service -> IO ()
 swapService s = do
     (rp', sp') <- split (sp s)
     send sp' ShutdownRequest
     forkNewService rp'

This is not a good solution in all cases. For example, the service might consist of multiple threads, and maybe some of those send internal messages on the same channel as the clients. It would probably be a bug to change the destination of those internal messages.

Wrapping the SendPort in MVar would introduce an extra layer of indirection, but also allows you to be selective about which senders observe the effect. The clients would use an MVar (SendPort RequestOrInternalMessage) whereas the internal threads would use the SendPort RequestOrInternalMessage directly, without going through the MVar. So instead we have something that looks like:

 data Service = Service { spRef :: MVar (SendPort Request), .. }

swapService :: Service -> IO ()
 swapService s = do
     (sp', rp') <- new
     sp <- swapMVar (spRef s) sp'
     send sp ShutdownRequest
     forkNewService rp'

Note that this alternative does not use split at all.

fold :: (a -> b -> b) -> ReceivePort a -> IO bSource

A right fold over a receiver, a generalization of getChanContents where getChanContents = fold (:). Note that the type of fold implies that the folding function needs to be sufficienctly non-strict, otherwise the result cannot be productive.

unsafeFold :: (a -> b -> b) -> ReceivePort a -> IO bSource

unsafeFold should usually be called only on readers that are not subsequently used in other channel operations. Otherwise it may be possible that the (non-)evaluation of pure values can cause race conditions inside IO computations. The safer fold uses duplicate to avoid this issue.