distributed-process-execution-0.1.1: Execution Framework for The Cloud Haskell Application Platform

Copyright(c) Tim Watson 2012 - 2013
LicenseBSD3 (see the file LICENSE)
MaintainerTim Watson <watson.timothy@gmail.com>
Portabilitynon-portable (requires concurrency)
Safe HaskellNone




Generic process that acts as an external mailbox and message buffer.


For use when rate limiting is not possible (or desired), this module provides a buffer process that receives mail via its post API, buffers the received messages and delivers them when its owning process asks for them. A mailbox has to be started with a maximum buffer size - the so called limit - and will discard messages once its internal storage reaches this user defined threshold.

The usual behaviour of the buffer process is to accumulate messages in its internal memory. When a client evaluates notify, the buffer will send a NewMail message to the (real) mailbox of its owning process as soon as it has any message(s) ready to deliver. If the buffer already contains undelivered mail, the NewMail message will be dispatched immediately.

When the owning process wishes to receive mail, evaluating deliver (from any process) will cause the buffer to send its owner a Delivery message containing the accumulated messages and additional information about the number of messages it is delivering, the number of messages dropped since the last delivery and a handle for the mailbox (so that processes can have multiple mailboxes if required, and distinguish between them).

Overflow Handling

A mailbox handles overflow - when the number of messages it is holding reaches the limit - differently depending on the BufferType selected when it starts. The Queue buffer will, once the limit is reached, drop older messages first (i.e., the head of the queue) to make space for newer ones. The Ring buffer works similarly, but blocks new messages so as to preserve existing ones instead. Finally, the Stack buffer will drop the last (i.e., most recently received) message to make room for new mail.

Mailboxes can be resized by evaluating resize with a new value for the limit. If the new limit is older that the current/previous one, messages are dropped as though the mailbox had previously seen a volume of mail equal to the difference (in size) between the limits. In this situation, the Queue will drop as many older messages as neccessary to come within the limit, whilst the other two buffer types will drop as many newer messages as needed.

Ordering Guarantees

When messages are delivered to the owner, they arrive as a list of raw Message entries, given in descending age order (i.e., eldest first). Whilst this approximates the FIFO ordering a process' mailbox would usually offer, the Stack buffer will appear to offer no ordering at all, since it always deletes the most recent message(s). The Queue and Ring buffers will maintain a more queue-like (i.e., FIFO) view of received messages, with the obvious constraint the newer or older data might have been deleted.

Post API and Relaying

For messages to be properly handled by the mailbox, they can either be sent via the post API or directly to the Mailbox. Messages sent directly to the mailbox will still be handled via the internal buffers and subjected to the mailbox limits. The post API is really just a means to ensure that the conversion from Serializable a -> Message is done in the caller's process and uses the safe wrapMessage variant.


This API is based on the work of Erlang programmers Fred Hebert and Geoff Cant, its design closely mirroring that of the the pobox library application.


Creating, Starting, Configuring and Running a Mailbox

startMailbox :: ProcessId -> BufferType -> Limit -> Process Mailbox Source

Start a mailbox for the supplied ProcessId.

start = spawnLocal $ run

startSupervised :: ProcessId -> BufferType -> Limit -> SupervisorPid -> Process (ProcessId, Message) Source

As startMailbox, but suitable for use in supervisor child specs. This variant is for use when you want to access to the underlying Mailbox handle in your supervised child refs. See supervisor's ChildRef data type for more information.

Example: > childSpec = toChildStart $ startSupervised pid bufferType mboxLimit

See Control.Distributed.Process.Supervisor

startSupervisedMailbox :: ProcessId -> BufferType -> Limit -> SupervisorPid -> Process Mailbox Source

As startMailbox, but suitable for use in supervisor child specs.

Example: > childSpec = toChildStart $ startSupervisedMailbox pid bufferType mboxLimit

See Control.Distributed.Process.Supervisor

createMailbox :: BufferType -> Limit -> Process Mailbox Source

Start a mailbox for the calling process.

create = getSelfPid >>= start

resize :: Mailbox -> Integer -> Process () Source

Alters the mailbox's limit - this might cause messages to be dropped!

statistics :: Mailbox -> Process MailboxStats Source

Obtain statistics (from/to anywhere) about a mailbox.

monitor :: Mailbox -> Process MonitorRef Source

Monitor a mailbox.

type Limit = Integer Source

Represents the maximum number of messages the internal buffer can hold.

data BufferType Source

Describes the different types of buffer.



FIFO buffer, limiter drops the eldest message (queue head)


unordered buffer, limiter drops the newest (top) message


FIFO buffer, limiter refuses (i.e., drops) new messages

data MailboxStats Source

Bundle of statistics data, available on request via the mailboxStats API call.

Posting Mail

post :: Serializable a => Mailbox -> a -> Process () Source

Posts a message to someone's mailbox.

Obtaining Mail and Notifications

notify :: Mailbox -> Process () Source

Instructs the mailbox to send a NewMail signal as soon as any mail is available for delivery. Once the signal is sent, it will not be resent, even when further mail arrives, until notify is called again.

NB: signals are only delivered to the mailbox's owning process.

deliver :: Mailbox -> Process () Source

Instructs the mailbox to deliver all pending messages to the owner.

active :: Mailbox -> Filter -> Process () Source

Instructs the mailbox to send a Delivery as soon as any mail is available, or immediately (if the buffer already contains data).

NB: signals are only delivered to the mailbox's owning process.

data NewMail Source

Marker message indicating to the owning process that mail has arrived.


NewMail !Mailbox !Integer 

data Delivery Source

Mail delivery.




box :: Mailbox

handle to the sending mailbox

messages :: [Message]

list of raw messages

count :: Integer

number of messages delivered

totalDropped :: Integer

total dropped/skipped messages

acceptEverything :: Closure (Message -> Process FilterResult) Source

A do-nothing filter that accepts all messages (i.e., returns Keep for any input).

acceptMatching :: Closure (Closure (Message -> Process FilterResult) -> Message -> Process FilterResult) Source

A filter that takes a Closure (Message -> Process FilterResult) holding the filter function and applies it remotely (i.e., in the mailbox's own managed process).

Remote Table