| Copyright | (c) Tim Watson 2012 - 2014 | 
|---|---|
| License | BSD3 (see the file LICENSE) | 
| Maintainer | Tim Watson <watson.timothy@gmail.com> | 
| Stability | experimental | 
| Portability | non-portable (requires concurrency) | 
| Safe Haskell | None | 
| Language | Haskell98 | 
Control.Distributed.Process.Execution.Exchange
Contents
Description
- Message Exchanges
 
The concept of a message exchange is borrowed from the world of messaging and enterprise integration. The exchange acts like a kind of mailbox, accepting inputs from producers and forwarding these messages to one or more consumers, depending on the implementation's semantics.
This module provides some basic types of message exchange and exposes an API for defining your own custom exchange types.
- Broadcast Exchanges
 
The broadcast exchange type, started via broadcastExchange, forward their
 inputs to all registered consumers (as the name suggests). This exchange type
 is highly optimised for local (intra-node) traffic and provides two different
 kinds of client binding, one which causes messages to be delivered directly
 to the client's mailbox (viz bindToBroadcaster), the other providing a
 separate stream of messages that can be obtained using the expect and
 receiveX family of messaging primitives (and thus composed with other forms
 of input selection, such as typed channels and selective reads on the process
 mailbox).
Important: When a ProcessId is registered via bindToBroadcaster, only
 the payload of the Message (i.e., the underlying Serializable datum) is
 forwarded to the consumer, not the whole Message itself.
- Router Exchanges
 
The router API provides a means to selectively route messages to one or
 more clients, depending on the content of the Message. Two modes of binding
 (and client selection) are provided out of the box, one of which matches the
 message key, the second of which matches on a name and value from the
 headers. Alternative mechanisms for content based routing can be derived
 by modifying the BindingSelector expression passed to router
See messageKeyRouter and headerContentRouter for the built-in routing
 exchanges, and router for the extensible routing API.
- Custom Exchange Types
 
Both the broadcast and router exchanges are implemented as custom
 exchange types. The mechanism for defining custom exchange behaviours
 such as these is very simple. Raw exchanges are started by evaluating
 startExchange with a specific ExchangeType record. This type is
 parameterised by the internal state it holds, and defines two API callbacks
 in its configureEx and routeEx fields. The former is evaluated whenever a
 client process evaluates configureExchange, the latter whenever a client
 evaluates post or postMessage. The configureEx callback takes a raw
 Message (from Control.Distributed.Process) and is responsible for
 decoding the message and updating its own state (if required). It is via
 this callback that custom exchange types can receive information about
 clients and handle it in their own way. The routeEx callback is evaluated
 with the exchange type's own internal state and the Message originally
 sent to the exchange process (via post) and is responsible for delivering
 the message to its clients in whatever way makes sense for that exchange
 type.
- data Exchange
 - data Message = Message {}
 - startExchange :: forall s. ExchangeType s -> Process Exchange
 - startSupervised :: forall s. ExchangeType s -> SupervisorPid -> Process Exchange
 - startSupervisedRef :: forall s. ExchangeType s -> SupervisorPid -> Process (ProcessId, Message)
 - runExchange :: forall s. ExchangeType s -> MVar (ControlPort ControlMessage) -> Process ()
 - post :: Serializable a => Exchange -> a -> Process ()
 - postMessage :: Exchange -> Message -> Process ()
 - configureExchange :: Serializable m => Exchange -> m -> Process ()
 - createMessage :: Serializable m => String -> [(String, String)] -> m -> Message
 - broadcastExchange :: Process Exchange
 - broadcastExchangeT :: Process BroadcastExchange
 - broadcastClient :: Exchange -> Process (InputStream Message)
 - bindToBroadcaster :: Exchange -> Process ()
 - type BroadcastExchange = ExchangeType BroadcastEx
 - type HeaderName = String
 - data Binding
- = BindKey { 
- bindingKey :: !String
 
 - | BindHeader { 
- bindingKey :: !String
 - headerName :: !HeaderName
 
 - | BindNone
 
 - = BindKey { 
 - class (Hashable k, Eq k, Serializable k) => Bindable k
 - type BindingSelector k = Message -> Process k
 - data RelayType
 - router :: Bindable k => RelayType -> BindingSelector k -> Process Exchange
 - supervisedRouter :: Bindable k => RelayType -> BindingSelector k -> SupervisorPid -> Process Exchange
 - route :: Serializable m => Exchange -> m -> Process ()
 - routeMessage :: Exchange -> Message -> Process ()
 - messageKeyRouter :: RelayType -> Process Exchange
 - bindKey :: String -> Exchange -> Process ()
 - headerContentRouter :: RelayType -> HeaderName -> Process Exchange
 - bindHeader :: HeaderName -> String -> Exchange -> Process ()
 - data ExchangeType s = ExchangeType {}
 - applyHandlers :: a -> Message -> [Message -> Process (Maybe a)] -> Process a
 
Fundamental API
Opaque handle to an exchange.
Messages sent to an exchange can optionally provide a routing key and a list of (key, value) headers in addition to the underlying payload.
Constructors
| Message | |
Starting/Running an Exchange
startExchange :: forall s. ExchangeType s -> Process Exchange Source
Starts an exchange process with the given ExchangeType.
startSupervised :: forall s. ExchangeType s -> SupervisorPid -> Process Exchange Source
Starts an exchange as part of a supervision tree.
Example: > childSpec = toChildStart $ startSupervised exType
startSupervisedRef :: forall s. ExchangeType s -> SupervisorPid -> Process (ProcessId, Message) Source
Starts an exchange as part of a supervision tree.
Example: > childSpec = toChildStart $ startSupervisedRef exType
runExchange :: forall s. ExchangeType s -> MVar (ControlPort ControlMessage) -> Process () Source
Client Facing API
post :: Serializable a => Exchange -> a -> Process () Source
Posts an arbitrary Serializable datum to an exchange. The raw datum is
 wrapped in the Message data type, with its key set to "" and its
 headers to [].
configureExchange :: Serializable m => Exchange -> m -> Process () Source
Sends an arbitrary Serializable datum to an exchange, for use as a
 configuration change - see configureEx for details.
createMessage :: Serializable m => String -> [(String, String)] -> m -> Message Source
Broadcast Exchange
broadcastExchange :: Process Exchange Source
Start a new broadcast exchange and return a handle to the exchange.
broadcastExchangeT :: Process BroadcastExchange Source
The ExchangeType of a broadcast exchange. Can be combined with the
 startSupervisedRef and startSupervised APIs.
broadcastClient :: Exchange -> Process (InputStream Message) Source
Create a binding to the given broadcast exchange for the calling process
 and return an InputStream that can be used in the expect and
 receiveWait family of messaging primitives. This form of client interaction
 helps avoid cluttering the caller's mailbox with Message data, since the
 InputChannel provides a separate input stream (in a similar fashion to
 a typed channel).
 Example:
is <- broadcastClient ex msg <- receiveWait [ matchInputStream is ] handleMessage (payload msg)
bindToBroadcaster :: Exchange -> Process () Source
type BroadcastExchange = ExchangeType BroadcastEx Source
Routing (Content Based)
type HeaderName = String Source
The binding key used by the built-in key and header based routers.
Constructors
| BindKey | |
Fields 
  | |
| BindHeader | |
Fields 
  | |
| BindNone | |
class (Hashable k, Eq k, Serializable k) => Bindable k Source
Things that can be used as binding keys in a router.
Instances
| (Hashable k, Eq k, Serializable k) => Bindable k Source | 
type BindingSelector k = Message -> Process k Source
Given to a router to indicate whether clients should
 receive Message payloads only, or the whole Message object
 itself.
Constructors
| PayloadOnly | |
| WholeMessage | 
Starting a Router
router :: Bindable k => RelayType -> BindingSelector k -> Process Exchange Source
Defines a router exchange. The BindingSelector is used to construct
 a binding (i.e., an instance of the Bindable type k) for each incoming
 Message. Such bindings are matched against bindings stored in the exchange.
 Clients of a router exchange are identified by a binding, mapped to
 one or more ProcessIds.
The format of the bindings, nature of their storage and mechanism for
 submitting new bindings is implementation dependent (i.e., will vary by
 exchange type). For example, the messageKeyRouter and headerContentRouter
 implementations both use the Binding data type, which can represent a
 Message key or a HeaderName and content. As with all custom exchange
 types, bindings should be submitted by evaluating configureExchange with
 a suitable data type.
supervisedRouter :: Bindable k => RelayType -> BindingSelector k -> SupervisorPid -> Process Exchange Source
Defines a router that can be used in a supervision tree.
Routing (Publishing) API
route :: Serializable m => Exchange -> m -> Process () Source
Send a Serializable message to the supplied Exchange. The given datum
 will be converted to a Message, with the key set to "" and the
 headers to [].
The routing behaviour will be dependent on the choice of BindingSelector
 given when initialising the router.
routeMessage :: Exchange -> Message -> Process () Source
Send a Message to the supplied Exchange.
 The routing behaviour will be dependent on the choice of BindingSelector
 given when initialising the router.
Routing via message/binding keys
bindKey :: String -> Exchange -> Process () Source
Add a binding (for the calling process) to a messageKeyRouter exchange.
Routing via message headers
headerContentRouter :: RelayType -> HeaderName -> Process Exchange Source
A router that matches on a specific (named) header. To bind a client
 Process to such an exchange, use the bindHeader function.
bindHeader :: HeaderName -> String -> Exchange -> Process () Source
Add a binding (for the calling process) to a headerContentRouter exchange.
Defining Custom Exchange Types
data ExchangeType s Source
Different exchange types are defined using record syntax.
 The configureEx and routeEx API functions are called during the exchange
 lifecycle when incoming traffic arrives. Configuration messages are
 completely arbitrary types and the exchange type author is entirely
 responsible for decoding them. Messages posted to the exchange (see the
 Message data type) are passed to the routeEx API function along with the
 exchange type's own internal state. Both API functions return a new
 (potentially updated) state and run in the Process monad.
Constructors
| ExchangeType | |