-- Hoogle documentation, generated by Haddock -- See Hoogle, http://www.haskell.org/hoogle/ -- | Execution Framework for The Cloud Haskell Application Platform -- -- The Execution Framework provides tools for load regulation, workload -- shedding and remote hand-off. The currently implementation provides -- only a subset of the plumbing required, comprising tools for event -- management, mailbox buffering and message routing. @package distributed-process-execution @version 0.1.3.0 -- | -- -- The concept of a message exchange is borrowed from the world of -- messaging and enterprise integration. The exchange acts like a -- kind of mailbox, accepting inputs from producers and forwarding -- these messages to one or more consumers, depending on the -- implementation's semantics. -- -- This module provides some basic types of message exchange and exposes -- an API for defining your own custom exchange types. -- -- -- -- The broadcast exchange type, started via broadcastExchange, -- forward their inputs to all registered consumers (as the name -- suggests). This exchange type is highly optimised for local -- (intra-node) traffic and provides two different kinds of client -- binding, one which causes messages to be delivered directly to the -- client's mailbox (viz bindToBroadcaster), the other providing a -- separate stream of messages that can be obtained using the -- expect and receiveX family of messaging primitives -- (and thus composed with other forms of input selection, such as typed -- channels and selective reads on the process mailbox). -- -- Important: When a ProcessId is registered via -- bindToBroadcaster, only the payload of the Message -- (i.e., the underlying Serializable datum) is forwarded to the -- consumer, not the whole Message itself. -- -- -- -- The router API provides a means to selectively route messages -- to one or more clients, depending on the content of the -- Message. Two modes of binding (and client selection) are -- provided out of the box, one of which matches the message key, -- the second of which matches on a name and value from the -- headers. Alternative mechanisms for content based routing can -- be derived by modifying the BindingSelector expression passed -- to router -- -- See messageKeyRouter and headerContentRouter for the -- built-in routing exchanges, and router for the extensible -- routing API. -- -- -- -- Both the broadcast and router exchanges are implemented -- as custom exchange types. The mechanism for defining custom -- exchange behaviours such as these is very simple. Raw exchanges are -- started by evaluating startExchange with a specific -- ExchangeType record. This type is parameterised by the internal -- state it holds, and defines two API callbacks in its -- configureEx and routeEx fields. The former is evaluated -- whenever a client process evaluates configureExchange, the -- latter whenever a client evaluates post or postMessage. -- The configureEx callback takes a raw Message (from -- Control.Distributed.Process) and is responsible for decoding -- the message and updating its own state (if required). It is via this -- callback that custom exchange types can receive information about -- clients and handle it in their own way. The routeEx callback is -- evaluated with the exchange type's own internal state and the -- Message originally sent to the exchange process (via -- post) and is responsible for delivering the message to its -- clients in whatever way makes sense for that exchange type. module Control.Distributed.Process.Execution.Exchange -- | Opaque handle to an exchange. data Exchange -- | Messages sent to an exchange can optionally provide a routing key and -- a list of (key, value) headers in addition to the underlying payload. data Message Message :: !String -> ![(String, String)] -> !Message -> Message -- | a routing key for the payload [key] :: Message -> !String -- | arbitrary key-value headers [headers] :: Message -> ![(String, String)] -- | the underlying Message payload [payload] :: Message -> !Message -- | Starts an exchange process with the given ExchangeType. startExchange :: forall s. ExchangeType s -> Process Exchange -- | Starts an exchange as part of a supervision tree. -- -- Example: > childSpec = toChildStart $ startSupervised exType startSupervised :: forall s. ExchangeType s -> SupervisorPid -> Process Exchange -- | Starts an exchange as part of a supervision tree. -- -- Example: > childSpec = toChildStart $ startSupervisedRef exType startSupervisedRef :: forall s. ExchangeType s -> SupervisorPid -> Process (ProcessId, Message) runExchange :: forall s. ExchangeType s -> MVar (ControlPort ControlMessage) -> Process () -- | Posts an arbitrary Serializable datum to an exchange. -- The raw datum is wrapped in the Message data type, with its -- key set to "" and its headers to []. post :: Serializable a => Exchange -> a -> Process () -- | Posts a Message to an exchange. postMessage :: Exchange -> Message -> Process () -- | Sends an arbitrary Serializable datum to an exchange, -- for use as a configuration change - see configureEx for -- details. configureExchange :: Serializable m => Exchange -> m -> Process () -- | Utility for creating a Message datum from its key, -- headers and payload. createMessage :: Serializable m => String -> [(String, String)] -> m -> Message -- | Start a new broadcast exchange and return a handle to the -- exchange. broadcastExchange :: Process Exchange -- | The ExchangeType of a broadcast exchange. Can be combined with -- the startSupervisedRef and startSupervised APIs. broadcastExchangeT :: Process BroadcastExchange -- | Create a binding to the given broadcast exchange for the -- calling process and return an InputStream that can be used in -- the expect and receiveWait family of messaging -- primitives. This form of client interaction helps avoid cluttering the -- caller's mailbox with Message data, since the -- InputChannel provides a separate input stream (in a similar -- fashion to a typed channel). Example: -- --
--   is <- broadcastClient ex
--   msg <- receiveWait [ matchInputStream is ]
--   handleMessage (payload msg)
--   
broadcastClient :: Exchange -> Process (InputStream Message) -- | Bind the calling process to the given broadcast exchange. For -- each Message the exchange receives, only the payload will be -- sent to the calling process' mailbox. -- -- Example: -- -- (producer) > post ex Hello -- -- (consumer) > bindToBroadcaster ex > expect >>= liftIO . -- putStrLn bindToBroadcaster :: Exchange -> Process () type BroadcastExchange = ExchangeType BroadcastEx type HeaderName = String -- | The binding key used by the built-in key and header based routers. data Binding BindKey :: !String -> Binding [bindingKey] :: Binding -> !String BindHeader :: !String -> !HeaderName -> Binding [bindingKey] :: Binding -> !String [headerName] :: Binding -> !HeaderName BindNone :: Binding -- | Things that can be used as binding keys in a router. class (Hashable k, Eq k, Serializable k) => Bindable k -- | Used to convert a Message into a Bindable routing key. type BindingSelector k = (Message -> Process k) -- | Given to a router to indicate whether clients should receive -- Message payloads only, or the whole Message object -- itself. data RelayType PayloadOnly :: RelayType WholeMessage :: RelayType -- | Defines a router exchange. The BindingSelector is used -- to construct a binding (i.e., an instance of the Bindable type -- k) for each incoming Message. Such bindings are -- matched against bindings stored in the exchange. Clients of a -- router exchange are identified by a binding, mapped to one or -- more ProcessIds. -- -- The format of the bindings, nature of their storage and mechanism for -- submitting new bindings is implementation dependent (i.e., will vary -- by exchange type). For example, the messageKeyRouter and -- headerContentRouter implementations both use the Binding -- data type, which can represent a Message key or a -- HeaderName and content. As with all custom exchange types, -- bindings should be submitted by evaluating configureExchange -- with a suitable data type. router :: Bindable k => RelayType -> BindingSelector k -> Process Exchange -- | Defines a router that can be used in a supervision tree. supervisedRouter :: Bindable k => RelayType -> BindingSelector k -> SupervisorPid -> Process Exchange -- | Send a Serializable message to the supplied Exchange. -- The given datum will be converted to a Message, with the -- key set to "" and the headers to []. -- -- The routing behaviour will be dependent on the choice of -- BindingSelector given when initialising the router. route :: Serializable m => Exchange -> m -> Process () -- | Send a Message to the supplied Exchange. The routing -- behaviour will be dependent on the choice of BindingSelector -- given when initialising the router. routeMessage :: Exchange -> Message -> Process () -- | A router that matches on a Message key. To bind a client -- Process to such an exchange, use the bindKey function. messageKeyRouter :: RelayType -> Process Exchange -- | Add a binding (for the calling process) to a messageKeyRouter -- exchange. bindKey :: String -> Exchange -> Process () -- | A router that matches on a specific (named) header. To bind a client -- Process to such an exchange, use the bindHeader -- function. headerContentRouter :: RelayType -> HeaderName -> Process Exchange -- | Add a binding (for the calling process) to a -- headerContentRouter exchange. bindHeader :: HeaderName -> String -> Exchange -> Process () -- | Different exchange types are defined using record syntax. The -- configureEx and routeEx API functions are called during -- the exchange lifecycle when incoming traffic arrives. Configuration -- messages are completely arbitrary types and the exchange type author -- is entirely responsible for decoding them. Messages posted to the -- exchange (see the Message data type) are passed to the -- routeEx API function along with the exchange type's own -- internal state. Both API functions return a new (potentially updated) -- state and run in the Process monad. data ExchangeType s ExchangeType :: String -> s -> (s -> Message -> Process s) -> (s -> Message -> Process s) -> ExchangeType s [name] :: ExchangeType s -> String [state] :: ExchangeType s -> s [configureEx] :: ExchangeType s -> s -> Message -> Process s [routeEx] :: ExchangeType s -> s -> Message -> Process s -- | Utility for custom exchange type authors - evaluates a set of -- primitive message handlers from left to right, returning the first -- which evaluates to Just a, or the initial e value if -- all the handlers yield Nothing. applyHandlers :: a -> Message -> [Message -> Process (Maybe a)] -> Process a -- | -- -- The EventManager is a parallel/concurrent event handling tool, -- built on top of the Exchange API. Arbitrary events are -- published to the event manager using notify, and are broadcast -- simulataneously to a set of registered event handlers. -- -- -- -- Event handlers are defined as Serializable m => s -> m -> -- Process s, i.e., an expression taking an initial state, an -- arbitrary Serializable event/message and performing an action -- in the Process monad that evaluates to a new state. -- -- See Control.Distributed.Process.Execution.Exchange. module Control.Distributed.Process.Execution.EventManager -- | Opaque handle to an Event Manager. data EventManager -- | Start a new Event Manager process and return an opaque handle -- to it. start :: Process EventManager startSupervised :: SupervisorPid -> Process EventManager startSupervisedRef :: SupervisorPid -> Process (ProcessId, Message) -- | Broadcast an event to all registered handlers. notify :: Serializable a => EventManager -> a -> Process () -- | Add a new event handler. The handler runs in its own process, which is -- spawned locally on behalf of the caller. addHandler :: forall s a. Serializable a => EventManager -> (s -> a -> Process s) -> Process s -> Process ProcessId -- | As addHandler, but operates over a raw -- Control.Distributed.Process.Message. addMessageHandler :: forall s. EventManager -> (s -> Message -> Process (Maybe s)) -> Process s -> Process ProcessId instance GHC.Generics.Generic Control.Distributed.Process.Execution.EventManager.EventManager instance Data.Binary.Class.Binary Control.Distributed.Process.Execution.EventManager.EventManager instance Control.Distributed.Process.Extras.Internal.Types.Resolvable Control.Distributed.Process.Execution.EventManager.EventManager -- | Generic process that acts as an external mailbox and message buffer. -- -- -- -- For use when rate limiting is not possible (or desired), this module -- provides a buffer process that receives mail via its -- post API, buffers the received messages and delivers them when -- its owning process asks for them. A mailbox has to be started -- with a maximum buffer size - the so called limit - and will -- discard messages once its internal storage reaches this user defined -- threshold. -- -- The usual behaviour of the buffer process is to accumulate -- messages in its internal memory. When a client evaluates -- notify, the buffer will send a NewMail message to the -- (real) mailbox of its owning process as soon as it has any message(s) -- ready to deliver. If the buffer already contains undelivered mail, the -- NewMail message will be dispatched immediately. -- -- When the owning process wishes to receive mail, evaluating -- deliver (from any process) will cause the buffer to send its -- owner a Delivery message containing the accumulated messages -- and additional information about the number of messages it is -- delivering, the number of messages dropped since the last delivery and -- a handle for the mailbox (so that processes can have multiple -- mailboxes if required, and distinguish between them). -- -- -- -- A mailbox handles overflow - when the number of messages it is holding -- reaches the limit - differently depending on the BufferType -- selected when it starts. The Queue buffer will, once the -- limit is reached, drop older messages first (i.e., the head of the -- queue) to make space for newer ones. The Ring buffer works -- similarly, but blocks new messages so as to preserve existing ones -- instead. Finally, the Stack buffer will drop the last (i.e., -- most recently received) message to make room for new mail. -- -- Mailboxes can be resized by evaluating resize with a new -- value for the limit. If the new limit is older that the -- current/previous one, messages are dropped as though the mailbox had -- previously seen a volume of mail equal to the difference (in size) -- between the limits. In this situation, the Queue will drop as -- many older messages as neccessary to come within the limit, whilst the -- other two buffer types will drop as many newer messages as needed. -- -- -- -- When messages are delivered to the owner, they arrive as a list of raw -- Message entries, given in descending age order (i.e., eldest -- first). Whilst this approximates the FIFO ordering a process' mailbox -- would usually offer, the Stack buffer will appear to offer no -- ordering at all, since it always deletes the most recent message(s). -- The Queue and Ring buffers will maintain a more -- queue-like (i.e., FIFO) view of received messages, with the obvious -- constraint the newer or older data might have been deleted. -- -- -- -- For messages to be properly handled by the mailbox, they can either be -- sent via the post API or directly to the Mailbox. -- Messages sent directly to the mailbox will still be handled via the -- internal buffers and subjected to the mailbox limits. The post -- API is really just a means to ensure that the conversion from -- Serializable a -> Message is done in the caller's process -- and uses the safe wrapMessage variant. -- -- -- -- This API is based on the work of Erlang programmers Fred Hebert and -- Geoff Cant, its design closely mirroring that of the the pobox -- library application. module Control.Distributed.Process.Execution.Mailbox -- | Opaque handle to a mailbox. data Mailbox -- | Start a mailbox for the supplied ProcessId. -- --
--   start = spawnLocal $ run
--   
startMailbox :: ProcessId -> BufferType -> Limit -> Process Mailbox -- | As startMailbox, but suitable for use in supervisor child -- specs. This variant is for use when you want to access to the -- underlying Mailbox handle in your supervised child refs. See -- supervisor's ChildRef data type for more information. -- -- Example: > childSpec = toChildStart $ startSupervised pid -- bufferType mboxLimit -- -- See Control.Distributed.Process.Supervisor startSupervised :: ProcessId -> BufferType -> Limit -> SupervisorPid -> Process (ProcessId, Message) -- | As startMailbox, but suitable for use in supervisor child -- specs. -- -- See Control.Distributed.Process.Supervisor startSupervisedMailbox :: ProcessId -> BufferType -> Limit -> SupervisorPid -> Process Mailbox -- | Start a mailbox for the calling process. -- --
--   create = getSelfPid >>= start
--   
createMailbox :: BufferType -> Limit -> Process Mailbox -- | Alters the mailbox's limit - this might cause messages to be -- dropped! resize :: Mailbox -> Integer -> Process () -- | Obtain statistics (from/to anywhere) about a mailbox. statistics :: Mailbox -> Process MailboxStats -- | Monitor a mailbox. monitor :: Mailbox -> Process MonitorRef -- | Represents the maximum number of messages the internal buffer can -- hold. type Limit = Integer -- | Describes the different types of buffer. data BufferType -- | FIFO buffer, limiter drops the eldest message (queue head) Queue :: BufferType -- | unordered buffer, limiter drops the newest (top) message Stack :: BufferType -- | FIFO buffer, limiter refuses (i.e., drops) new messages Ring :: BufferType -- | Bundle of statistics data, available on request via the -- mailboxStats API call. data MailboxStats MailboxStats :: Integer -> Integer -> Limit -> ProcessId -> MailboxStats [pendingMessages] :: MailboxStats -> Integer [droppedMessages] :: MailboxStats -> Integer [currentLimit] :: MailboxStats -> Limit [owningProcess] :: MailboxStats -> ProcessId -- | Posts a message to someone's mailbox. post :: Serializable a => Mailbox -> a -> Process () -- | Instructs the mailbox to send a NewMail signal as soon as any -- mail is available for delivery. Once the signal is sent, it will not -- be resent, even when further mail arrives, until notify is -- called again. -- -- NB: signals are only delivered to the mailbox's owning process. notify :: Mailbox -> Process () -- | Instructs the mailbox to deliver all pending messages to the owner. deliver :: Mailbox -> Process () -- | Instructs the mailbox to send a Delivery as soon as any mail is -- available, or immediately (if the buffer already contains data). -- -- NB: signals are only delivered to the mailbox's owning process. active :: Mailbox -> Filter -> Process () -- | Marker message indicating to the owning process that mail has arrived. data NewMail NewMail :: !Mailbox -> !Integer -> NewMail -- | Mail delivery. data Delivery Delivery :: Mailbox -> [Message] -> Integer -> Integer -> Delivery -- | handle to the sending mailbox [box] :: Delivery -> Mailbox -- | list of raw messages [messages] :: Delivery -> [Message] -- | number of messages delivered [count] :: Delivery -> Integer -- | total dropped/skipped messages [totalDropped] :: Delivery -> Integer data FilterResult Keep :: FilterResult Skip :: FilterResult Send :: FilterResult -- | A do-nothing filter that accepts all messages (i.e., returns -- Keep for any input). acceptEverything :: Closure (Message -> Process FilterResult) -- | A filter that takes a Closure (Message -> Process -- FilterResult) holding the filter function and applies it remotely -- (i.e., in the mailbox's own managed process). acceptMatching :: Closure (Closure (Message -> Process FilterResult) -> Message -> Process FilterResult) __remoteTable :: RemoteTable -> RemoteTable instance GHC.Show.Show Control.Distributed.Process.Execution.Mailbox.BufferType instance GHC.Classes.Eq Control.Distributed.Process.Execution.Mailbox.BufferType instance GHC.Show.Show Control.Distributed.Process.Execution.Mailbox.MailboxStats instance GHC.Generics.Generic Control.Distributed.Process.Execution.Mailbox.MailboxStats instance GHC.Generics.Generic Control.Distributed.Process.Execution.Mailbox.Post instance GHC.Generics.Generic Control.Distributed.Process.Execution.Mailbox.StatsReq instance GHC.Generics.Generic Control.Distributed.Process.Execution.Mailbox.FilterResult instance GHC.Generics.Generic Control.Distributed.Process.Execution.Mailbox.Mode instance GHC.Generics.Generic Control.Distributed.Process.Execution.Mailbox.ControlMessage instance GHC.Classes.Eq Control.Distributed.Process.Execution.Mailbox.Mailbox instance GHC.Generics.Generic Control.Distributed.Process.Execution.Mailbox.Mailbox instance GHC.Generics.Generic Control.Distributed.Process.Execution.Mailbox.Delivery instance GHC.Show.Show Control.Distributed.Process.Execution.Mailbox.NewMail instance GHC.Generics.Generic Control.Distributed.Process.Execution.Mailbox.NewMail instance Control.Distributed.Process.Execution.Mailbox.Buffered Control.Distributed.Process.Execution.Mailbox.State instance Data.Binary.Class.Binary Control.Distributed.Process.Execution.Mailbox.NewMail instance Data.Binary.Class.Binary Control.Distributed.Process.Execution.Mailbox.Delivery instance Data.Binary.Class.Binary Control.Distributed.Process.Execution.Mailbox.Mailbox instance GHC.Show.Show Control.Distributed.Process.Execution.Mailbox.Mailbox instance Control.Distributed.Process.Extras.Internal.Types.Linkable Control.Distributed.Process.Execution.Mailbox.Mailbox instance Control.Distributed.Process.Extras.Internal.Types.Resolvable Control.Distributed.Process.Execution.Mailbox.Mailbox instance Control.Distributed.Process.Extras.Internal.Types.Routable Control.Distributed.Process.Execution.Mailbox.Mailbox instance Control.Distributed.Process.Extras.Internal.Types.Addressable Control.Distributed.Process.Execution.Mailbox.Mailbox instance Data.Binary.Class.Binary Control.Distributed.Process.Execution.Mailbox.ControlMessage instance Data.Binary.Class.Binary Control.Distributed.Process.Execution.Mailbox.Mode instance GHC.Show.Show Control.Distributed.Process.Execution.Mailbox.Mode instance Data.Binary.Class.Binary Control.Distributed.Process.Execution.Mailbox.FilterResult instance Data.Binary.Class.Binary Control.Distributed.Process.Execution.Mailbox.StatsReq instance Data.Binary.Class.Binary Control.Distributed.Process.Execution.Mailbox.Post instance Data.Binary.Class.Binary Control.Distributed.Process.Execution.Mailbox.MailboxStats -- | -- -- The Execution Framework provides tools for load regulation, -- workload shedding and remote hand-off. The currently implementation -- provides only a subset of the plumbing required, comprising tools for -- event management, mailbox buffering and message routing. module Control.Distributed.Process.Execution -- | Represents the maximum number of messages the internal buffer can -- hold. type Limit = Integer -- | Opaque handle to a mailbox. data Mailbox -- | Describes the different types of buffer. data BufferType -- | FIFO buffer, limiter drops the eldest message (queue head) Queue :: BufferType -- | unordered buffer, limiter drops the newest (top) message Stack :: BufferType -- | FIFO buffer, limiter refuses (i.e., drops) new messages Ring :: BufferType -- | Bundle of statistics data, available on request via the -- mailboxStats API call. data MailboxStats MailboxStats :: Integer -> Integer -> Limit -> ProcessId -> MailboxStats [pendingMessages] :: MailboxStats -> Integer [droppedMessages] :: MailboxStats -> Integer [currentLimit] :: MailboxStats -> Limit [owningProcess] :: MailboxStats -> ProcessId -- | Marker message indicating to the owning process that mail has arrived. data NewMail NewMail :: !Mailbox -> !Integer -> NewMail -- | Mail delivery. data Delivery Delivery :: Mailbox -> [Message] -> Integer -> Integer -> Delivery -- | handle to the sending mailbox [box] :: Delivery -> Mailbox -- | list of raw messages [messages] :: Delivery -> [Message] -- | number of messages delivered [count] :: Delivery -> Integer -- | total dropped/skipped messages [totalDropped] :: Delivery -> Integer data FilterResult Keep :: FilterResult Skip :: FilterResult Send :: FilterResult -- | Monitor a mailbox. monitor :: Mailbox -> Process MonitorRef -- | Instructs the mailbox to deliver all pending messages to the owner. deliver :: Mailbox -> Process () __remoteTable :: RemoteTable -> RemoteTable -- | Obtain statistics (from/to anywhere) about a mailbox. statistics :: Mailbox -> Process MailboxStats -- | Instructs the mailbox to send a NewMail signal as soon as any -- mail is available for delivery. Once the signal is sent, it will not -- be resent, even when further mail arrives, until notify is -- called again. -- -- NB: signals are only delivered to the mailbox's owning process. notify :: Mailbox -> Process () -- | Start a mailbox for the supplied ProcessId. -- --
--   start = spawnLocal $ run
--   
startMailbox :: ProcessId -> BufferType -> Limit -> Process Mailbox -- | As startMailbox, but suitable for use in supervisor child -- specs. -- -- See Control.Distributed.Process.Supervisor startSupervisedMailbox :: ProcessId -> BufferType -> Limit -> SupervisorPid -> Process Mailbox -- | Start a mailbox for the calling process. -- --
--   create = getSelfPid >>= start
--   
createMailbox :: BufferType -> Limit -> Process Mailbox -- | Alters the mailbox's limit - this might cause messages to be -- dropped! resize :: Mailbox -> Integer -> Process () -- | Instructs the mailbox to send a Delivery as soon as any mail is -- available, or immediately (if the buffer already contains data). -- -- NB: signals are only delivered to the mailbox's owning process. active :: Mailbox -> Filter -> Process () -- | A do-nothing filter that accepts all messages (i.e., returns -- Keep for any input). acceptEverything :: Closure (Message -> Process FilterResult) -- | A filter that takes a Closure (Message -> Process -- FilterResult) holding the filter function and applies it remotely -- (i.e., in the mailbox's own managed process). acceptMatching :: Closure (Closure (Message -> Process FilterResult) -> Message -> Process FilterResult) -- | Messages sent to an exchange can optionally provide a routing key and -- a list of (key, value) headers in addition to the underlying payload. data Message Message :: !String -> ![(String, String)] -> !Message -> Message -- | a routing key for the payload [key] :: Message -> !String -- | arbitrary key-value headers [headers] :: Message -> ![(String, String)] -- | the underlying Message payload [payload] :: Message -> !Message -- | Opaque handle to an exchange. data Exchange type BroadcastExchange = ExchangeType BroadcastEx type HeaderName = String -- | The binding key used by the built-in key and header based routers. data Binding BindKey :: !String -> Binding [bindingKey] :: Binding -> !String BindHeader :: !String -> !HeaderName -> Binding [bindingKey] :: Binding -> !String [headerName] :: Binding -> !HeaderName BindNone :: Binding -- | Things that can be used as binding keys in a router. class (Hashable k, Eq k, Serializable k) => Bindable k -- | Used to convert a Message into a Bindable routing key. type BindingSelector k = (Message -> Process k) -- | Given to a router to indicate whether clients should receive -- Message payloads only, or the whole Message object -- itself. data RelayType PayloadOnly :: RelayType WholeMessage :: RelayType -- | Different exchange types are defined using record syntax. The -- configureEx and routeEx API functions are called during -- the exchange lifecycle when incoming traffic arrives. Configuration -- messages are completely arbitrary types and the exchange type author -- is entirely responsible for decoding them. Messages posted to the -- exchange (see the Message data type) are passed to the -- routeEx API function along with the exchange type's own -- internal state. Both API functions return a new (potentially updated) -- state and run in the Process monad. data ExchangeType s ExchangeType :: String -> s -> (s -> Message -> Process s) -> (s -> Message -> Process s) -> ExchangeType s [name] :: ExchangeType s -> String [state] :: ExchangeType s -> s [configureEx] :: ExchangeType s -> s -> Message -> Process s [routeEx] :: ExchangeType s -> s -> Message -> Process s -- | Utility for creating a Message datum from its key, -- headers and payload. createMessage :: Serializable m => String -> [(String, String)] -> m -> Message -- | Posts an arbitrary Serializable datum to an exchange. -- The raw datum is wrapped in the Message data type, with its -- key set to "" and its headers to []. post :: Serializable a => Exchange -> a -> Process () -- | Starts an exchange as part of a supervision tree. -- -- Example: > childSpec = toChildStart $ startSupervisedRef exType startSupervisedRef :: forall s. ExchangeType s -> SupervisorPid -> Process (ProcessId, Message) -- | Start a new broadcast exchange and return a handle to the -- exchange. broadcastExchange :: Process Exchange -- | The ExchangeType of a broadcast exchange. Can be combined with -- the startSupervisedRef and startSupervised APIs. broadcastExchangeT :: Process BroadcastExchange -- | Create a binding to the given broadcast exchange for the -- calling process and return an InputStream that can be used in -- the expect and receiveWait family of messaging -- primitives. This form of client interaction helps avoid cluttering the -- caller's mailbox with Message data, since the -- InputChannel provides a separate input stream (in a similar -- fashion to a typed channel). Example: -- --
--   is <- broadcastClient ex
--   msg <- receiveWait [ matchInputStream is ]
--   handleMessage (payload msg)
--   
broadcastClient :: Exchange -> Process (InputStream Message) -- | Starts an exchange process with the given ExchangeType. startExchange :: forall s. ExchangeType s -> Process Exchange runExchange :: forall s. ExchangeType s -> MVar (ControlPort ControlMessage) -> Process () -- | Posts a Message to an exchange. postMessage :: Exchange -> Message -> Process () -- | Sends an arbitrary Serializable datum to an exchange, -- for use as a configuration change - see configureEx for -- details. configureExchange :: Serializable m => Exchange -> m -> Process () -- | Bind the calling process to the given broadcast exchange. For -- each Message the exchange receives, only the payload will be -- sent to the calling process' mailbox. -- -- Example: -- -- (producer) > post ex Hello -- -- (consumer) > bindToBroadcaster ex > expect >>= liftIO . -- putStrLn bindToBroadcaster :: Exchange -> Process () -- | Defines a router exchange. The BindingSelector is used -- to construct a binding (i.e., an instance of the Bindable type -- k) for each incoming Message. Such bindings are -- matched against bindings stored in the exchange. Clients of a -- router exchange are identified by a binding, mapped to one or -- more ProcessIds. -- -- The format of the bindings, nature of their storage and mechanism for -- submitting new bindings is implementation dependent (i.e., will vary -- by exchange type). For example, the messageKeyRouter and -- headerContentRouter implementations both use the Binding -- data type, which can represent a Message key or a -- HeaderName and content. As with all custom exchange types, -- bindings should be submitted by evaluating configureExchange -- with a suitable data type. router :: Bindable k => RelayType -> BindingSelector k -> Process Exchange -- | Defines a router that can be used in a supervision tree. supervisedRouter :: Bindable k => RelayType -> BindingSelector k -> SupervisorPid -> Process Exchange -- | Send a Serializable message to the supplied Exchange. -- The given datum will be converted to a Message, with the -- key set to "" and the headers to []. -- -- The routing behaviour will be dependent on the choice of -- BindingSelector given when initialising the router. route :: Serializable m => Exchange -> m -> Process () -- | Send a Message to the supplied Exchange. The routing -- behaviour will be dependent on the choice of BindingSelector -- given when initialising the router. routeMessage :: Exchange -> Message -> Process () -- | A router that matches on a Message key. To bind a client -- Process to such an exchange, use the bindKey function. messageKeyRouter :: RelayType -> Process Exchange -- | Add a binding (for the calling process) to a messageKeyRouter -- exchange. bindKey :: String -> Exchange -> Process () -- | A router that matches on a specific (named) header. To bind a client -- Process to such an exchange, use the bindHeader -- function. headerContentRouter :: RelayType -> HeaderName -> Process Exchange -- | Add a binding (for the calling process) to a -- headerContentRouter exchange. bindHeader :: HeaderName -> String -> Exchange -> Process () -- | Utility for custom exchange type authors - evaluates a set of -- primitive message handlers from left to right, returning the first -- which evaluates to Just a, or the initial e value if -- all the handlers yield Nothing. applyHandlers :: a -> Message -> [Message -> Process (Maybe a)] -> Process a