-- Hoogle documentation, generated by Haddock -- See Hoogle, http://www.haskell.org/hoogle/ -- | Execution Framework for The Cloud Haskell Application Platform -- -- The Execution Framework provides tools for load regulation, workload -- shedding and remote hand-off. The currently implementation provides -- only a subset of the plumbing required, comprising tools for event -- management, mailbox buffering and message routing. @package distributed-process-execution @version 0.1.1 -- | -- -- The concept of a message exchange is borrowed from the world of -- messaging and enterprise integration. The exchange acts like a -- kind of mailbox, accepting inputs from producers and forwarding -- these messages to one or more consumers, depending on the -- implementation's semantics. -- -- This module provides some basic types of message exchange and exposes -- an API for defining your own custom exchange types. -- -- -- -- The broadcast exchange type, started via broadcastExchange, -- forward their inputs to all registered consumers (as the name -- suggests). This exchange type is highly optimised for local -- (intra-node) traffic and provides two different kinds of client -- binding, one which causes messages to be delivered directly to the -- client's mailbox (viz bindToBroadcaster), the other providing a -- separate stream of messages that can be obtained using the -- expect and receiveX family of messaging primitives -- (and thus composed with other forms of input selection, such as typed -- channels and selective reads on the process mailbox). -- -- Important: When a ProcessId is registered via -- bindToBroadcaster, only the payload of the Message -- (i.e., the underlying Serializable datum) is forwarded to the -- consumer, not the whole Message itself. -- -- -- -- The router API provides a means to selectively route messages -- to one or more clients, depending on the content of the -- Message. Two modes of binding (and client selection) are -- provided out of the box, one of which matches the message key, -- the second of which matches on a name and value from the -- headers. Alternative mechanisms for content based routing can -- be derived by modifying the BindingSelector expression passed -- to router -- -- See messageKeyRouter and headerContentRouter for the -- built-in routing exchanges, and router for the extensible -- routing API. -- -- -- -- Both the broadcast and router exchanges are implemented -- as custom exchange types. The mechanism for defining custom -- exchange behaviours such as these is very simple. Raw exchanges are -- started by evaluating startExchange with a specific -- ExchangeType record. This type is parameterised by the internal -- state it holds, and defines two API callbacks in its -- configureEx and routeEx fields. The former is evaluated -- whenever a client process evaluates configureExchange, the -- latter whenever a client evaluates post or postMessage. -- The configureEx callback takes a raw Message (from -- Control.Distributed.Process) and is responsible for decoding -- the message and updating its own state (if required). It is via this -- callback that custom exchange types can receive information about -- clients and handle it in their own way. The routeEx callback is -- evaluated with the exchange type's own internal state and the -- Message originally sent to the exchange process (via -- post) and is responsible for delivering the message to its -- clients in whatever way makes sense for that exchange type. module Control.Distributed.Process.Execution.Exchange -- | Opaque handle to an exchange. data Exchange -- | Messages sent to an exchange can optionally provide a routing key and -- a list of (key, value) headers in addition to the underlying payload. data Message Message :: !String -> ![(String, String)] -> !Message -> Message -- | a routing key for the payload key :: Message -> !String -- | arbitrary key-value headers headers :: Message -> ![(String, String)] -- | the underlying Message payload payload :: Message -> !Message -- | Starts an exchange process with the given ExchangeType. startExchange :: ExchangeType s -> Process Exchange -- | Starts an exchange as part of a supervision tree. -- -- Example: > childSpec = toChildStart $ startSupervised exType startSupervised :: ExchangeType s -> SupervisorPid -> Process Exchange -- | Starts an exchange as part of a supervision tree. -- -- Example: > childSpec = toChildStart $ startSupervisedRef exType startSupervisedRef :: ExchangeType s -> SupervisorPid -> Process (ProcessId, Message) runExchange :: ExchangeType s -> MVar (ControlPort ControlMessage) -> Process () -- | Posts an arbitrary Serializable datum to an exchange. -- The raw datum is wrapped in the Message data type, with its -- key set to "" and its headers to []. post :: Serializable a => Exchange -> a -> Process () -- | Posts a Message to an exchange. postMessage :: Exchange -> Message -> Process () -- | Sends an arbitrary Serializable datum to an exchange, -- for use as a configuration change - see configureEx for -- details. configureExchange :: Serializable m => Exchange -> m -> Process () -- | Utility for creating a Message datum from its key, -- headers and payload. createMessage :: Serializable m => String -> [(String, String)] -> m -> Message -- | Start a new broadcast exchange and return a handle to the -- exchange. broadcastExchange :: Process Exchange -- | The ExchangeType of a broadcast exchange. Can be combined with -- the startSupervisedRef and startSupervised APIs. broadcastExchangeT :: Process BroadcastExchange -- | Create a binding to the given broadcast exchange for the -- calling process and return an InputStream that can be used in -- the expect and receiveWait family of messaging -- primitives. This form of client interaction helps avoid cluttering the -- caller's mailbox with Message data, since the -- InputChannel provides a separate input stream (in a similar -- fashion to a typed channel). Example: -- --
--   is <- broadcastClient ex
--   msg <- receiveWait [ matchInputStream is ]
--   handleMessage (payload msg)
--   
broadcastClient :: Exchange -> Process (InputStream Message) -- | Bind the calling process to the given broadcast exchange. For -- each Message the exchange receives, only the payload will be -- sent to the calling process' mailbox. -- -- Example: -- -- (producer) > post ex Hello -- -- (consumer) > bindToBroadcaster ex > expect >>= liftIO . -- putStrLn bindToBroadcaster :: Exchange -> Process () type BroadcastExchange = ExchangeType BroadcastEx type HeaderName = String -- | The binding key used by the built-in key and header based routers. data Binding BindKey :: !String -> Binding bindingKey :: Binding -> !String BindHeader :: !String -> !HeaderName -> Binding bindingKey :: Binding -> !String headerName :: Binding -> !HeaderName BindNone :: Binding -- | Things that can be used as binding keys in a router. class (Hashable k, Eq k, Serializable k) => Bindable k -- | Used to convert a Message into a Bindable routing key. type BindingSelector k = Message -> Process k -- | Given to a router to indicate whether clients should receive -- Message payloads only, or the whole Message object -- itself. data RelayType PayloadOnly :: RelayType WholeMessage :: RelayType -- | Defines a router exchange. The BindingSelector is used -- to construct a binding (i.e., an instance of the Bindable type -- k) for each incoming Message. Such bindings are -- matched against bindings stored in the exchange. Clients of a -- router exchange are identified by a binding, mapped to one or -- more ProcessIds. -- -- The format of the bindings, nature of their storage and mechanism for -- submitting new bindings is implementation dependent (i.e., will vary -- by exchange type). For example, the messageKeyRouter and -- headerContentRouter implementations both use the Binding -- data type, which can represent a Message key or a -- HeaderName and content. As with all custom exchange types, -- bindings should be submitted by evaluating configureExchange -- with a suitable data type. router :: Bindable k => RelayType -> BindingSelector k -> Process Exchange -- | Defines a router that can be used in a supervision tree. supervisedRouter :: Bindable k => RelayType -> BindingSelector k -> SupervisorPid -> Process Exchange -- | Send a Serializable message to the supplied Exchange. -- The given datum will be converted to a Message, with the -- key set to "" and the headers to []. -- -- The routing behaviour will be dependent on the choice of -- BindingSelector given when initialising the router. route :: Serializable m => Exchange -> m -> Process () -- | Send a Message to the supplied Exchange. The routing -- behaviour will be dependent on the choice of BindingSelector -- given when initialising the router. routeMessage :: Exchange -> Message -> Process () -- | A router that matches on a Message key. To bind a client -- Process to such an exchange, use the bindKey function. messageKeyRouter :: RelayType -> Process Exchange -- | Add a binding (for the calling process) to a messageKeyRouter -- exchange. bindKey :: String -> Exchange -> Process () -- | A router that matches on a specific (named) header. To bind a client -- Process to such an exchange, use the bindHeader -- function. headerContentRouter :: RelayType -> HeaderName -> Process Exchange -- | Add a binding (for the calling process) to a -- headerContentRouter exchange. bindHeader :: HeaderName -> String -> Exchange -> Process () -- | Different exchange types are defined using record syntax. The -- configureEx and routeEx API functions are called during -- the exchange lifecycle when incoming traffic arrives. Configuration -- messages are completely arbitrary types and the exchange type author -- is entirely responsible for decoding them. Messages posted to the -- exchange (see the Message data type) are passed to the -- routeEx API function along with the exchange type's own -- internal state. Both API functions return a new (potentially updated) -- state and run in the Process monad. data ExchangeType s ExchangeType :: String -> s -> (s -> Message -> Process s) -> (s -> Message -> Process s) -> ExchangeType s name :: ExchangeType s -> String state :: ExchangeType s -> s configureEx :: ExchangeType s -> s -> Message -> Process s routeEx :: ExchangeType s -> s -> Message -> Process s -- | Utility for custom exchange type authors - evaluates a set of -- primitive message handlers from left to right, returning the first -- which evaluates to Just a, or the initial e value if -- all the handlers yield Nothing. applyHandlers :: a -> Message -> [Message -> Process (Maybe a)] -> Process a -- | -- -- The EventManager is a parallel/concurrent event handling tool, -- built on top of the Exchange API. Arbitrary events are -- published to the event manager using notify, and are broadcast -- simulataneously to a set of registered event handlers. -- -- -- -- Event handlers are defined as Serializable m => s -> m -> -- Process s, i.e., an expression taking an initial state, an -- arbitrary Serializable event/message and performing an action -- in the Process monad that evaluates to a new state. -- -- See Control.Distributed.Process.Execution.Exchange. module Control.Distributed.Process.Execution.EventManager -- | Opaque handle to an Event Manager. data EventManager -- | Start a new Event Manager process and return an opaque handle -- to it. start :: Process EventManager startSupervised :: SupervisorPid -> Process EventManager startSupervisedRef :: SupervisorPid -> Process (ProcessId, Message) -- | Broadcast an event to all registered handlers. notify :: Serializable a => EventManager -> a -> Process () -- | Add a new event handler. The handler runs in its own process, which is -- spawned locally on behalf of the caller. addHandler :: Serializable a => EventManager -> (s -> a -> Process s) -> Process s -> Process ProcessId -- | As addHandler, but operates over a raw -- Control.Distributed.Process.Message. addMessageHandler :: EventManager -> (s -> Message -> Process (Maybe s)) -> Process s -> Process ProcessId instance Typeable EventManager instance Generic EventManager instance Datatype D1EventManager instance Constructor C1_0EventManager instance Selector S1_0_0EventManager instance Resolvable EventManager instance Binary EventManager -- | Generic process that acts as an external mailbox and message buffer. -- -- -- -- For use when rate limiting is not possible (or desired), this module -- provides a buffer process that receives mail via its -- post API, buffers the received messages and delivers them when -- its owning process asks for them. A mailbox has to be started -- with a maximum buffer size - the so called limit - and will -- discard messages once its internal storage reaches this user defined -- threshold. -- -- The usual behaviour of the buffer process is to accumulate -- messages in its internal memory. When a client evaluates -- notify, the buffer will send a NewMail message to the -- (real) mailbox of its owning process as soon as it has any message(s) -- ready to deliver. If the buffer already contains undelivered mail, the -- NewMail message will be dispatched immediately. -- -- When the owning process wishes to receive mail, evaluating -- deliver (from any process) will cause the buffer to send its -- owner a Delivery message containing the accumulated messages -- and additional information about the number of messages it is -- delivering, the number of messages dropped since the last delivery and -- a handle for the mailbox (so that processes can have multiple -- mailboxes if required, and distinguish between them). -- -- -- -- A mailbox handles overflow - when the number of messages it is holding -- reaches the limit - differently depending on the BufferType -- selected when it starts. The Queue buffer will, once the -- limit is reached, drop older messages first (i.e., the head of the -- queue) to make space for newer ones. The Ring buffer works -- similarly, but blocks new messages so as to preserve existing ones -- instead. Finally, the Stack buffer will drop the last (i.e., -- most recently received) message to make room for new mail. -- -- Mailboxes can be resized by evaluating resize with a new -- value for the limit. If the new limit is older that the -- current/previous one, messages are dropped as though the mailbox had -- previously seen a volume of mail equal to the difference (in size) -- between the limits. In this situation, the Queue will drop as -- many older messages as neccessary to come within the limit, whilst the -- other two buffer types will drop as many newer messages as needed. -- -- -- -- When messages are delivered to the owner, they arrive as a list of raw -- Message entries, given in descending age order (i.e., eldest -- first). Whilst this approximates the FIFO ordering a process' mailbox -- would usually offer, the Stack buffer will appear to offer no -- ordering at all, since it always deletes the most recent message(s). -- The Queue and Ring buffers will maintain a more -- queue-like (i.e., FIFO) view of received messages, with the obvious -- constraint the newer or older data might have been deleted. -- -- -- -- For messages to be properly handled by the mailbox, they can either be -- sent via the post API or directly to the Mailbox. -- Messages sent directly to the mailbox will still be handled via the -- internal buffers and subjected to the mailbox limits. The post -- API is really just a means to ensure that the conversion from -- Serializable a -> Message is done in the caller's process -- and uses the safe wrapMessage variant. -- -- -- -- This API is based on the work of Erlang programmers Fred Hebert and -- Geoff Cant, its design closely mirroring that of the the pobox -- library application. module Control.Distributed.Process.Execution.Mailbox -- | Opaque handle to a mailbox. data Mailbox -- | Start a mailbox for the supplied ProcessId. -- --
--   start = spawnLocal $ run
--   
startMailbox :: ProcessId -> BufferType -> Limit -> Process Mailbox -- | As startMailbox, but suitable for use in supervisor child -- specs. This variant is for use when you want to access to the -- underlying Mailbox handle in your supervised child refs. See -- supervisor's ChildRef data type for more information. -- -- Example: > childSpec = toChildStart $ startSupervised pid -- bufferType mboxLimit -- -- See Control.Distributed.Process.Supervisor startSupervised :: ProcessId -> BufferType -> Limit -> SupervisorPid -> Process (ProcessId, Message) -- | As startMailbox, but suitable for use in supervisor child -- specs. -- -- Example: > childSpec = toChildStart $ startSupervisedMailbox pid -- bufferType mboxLimit -- -- See Control.Distributed.Process.Supervisor startSupervisedMailbox :: ProcessId -> BufferType -> Limit -> SupervisorPid -> Process Mailbox -- | Start a mailbox for the calling process. -- --
--   create = getSelfPid >>= start
--   
createMailbox :: BufferType -> Limit -> Process Mailbox -- | Alters the mailbox's limit - this might cause messages to be -- dropped! resize :: Mailbox -> Integer -> Process () -- | Obtain statistics (from/to anywhere) about a mailbox. statistics :: Mailbox -> Process MailboxStats -- | Monitor a mailbox. monitor :: Mailbox -> Process MonitorRef -- | Represents the maximum number of messages the internal buffer can -- hold. type Limit = Integer -- | Describes the different types of buffer. data BufferType -- | FIFO buffer, limiter drops the eldest message (queue head) Queue :: BufferType -- | unordered buffer, limiter drops the newest (top) message Stack :: BufferType -- | FIFO buffer, limiter refuses (i.e., drops) new messages Ring :: BufferType -- | Bundle of statistics data, available on request via the -- mailboxStats API call. data MailboxStats MailboxStats :: Integer -> Integer -> Limit -> ProcessId -> MailboxStats pendingMessages :: MailboxStats -> Integer droppedMessages :: MailboxStats -> Integer currentLimit :: MailboxStats -> Limit owningProcess :: MailboxStats -> ProcessId -- | Posts a message to someone's mailbox. post :: Serializable a => Mailbox -> a -> Process () -- | Instructs the mailbox to send a NewMail signal as soon as any -- mail is available for delivery. Once the signal is sent, it will not -- be resent, even when further mail arrives, until notify is -- called again. -- -- NB: signals are only delivered to the mailbox's owning process. notify :: Mailbox -> Process () -- | Instructs the mailbox to deliver all pending messages to the owner. deliver :: Mailbox -> Process () -- | Instructs the mailbox to send a Delivery as soon as any mail is -- available, or immediately (if the buffer already contains data). -- -- NB: signals are only delivered to the mailbox's owning process. active :: Mailbox -> Filter -> Process () -- | Marker message indicating to the owning process that mail has arrived. data NewMail NewMail :: !Mailbox -> !Integer -> NewMail -- | Mail delivery. data Delivery Delivery :: Mailbox -> [Message] -> Integer -> Integer -> Delivery -- | handle to the sending mailbox box :: Delivery -> Mailbox -- | list of raw messages messages :: Delivery -> [Message] -- | number of messages delivered count :: Delivery -> Integer -- | total dropped/skipped messages totalDropped :: Delivery -> Integer data FilterResult Keep :: FilterResult Skip :: FilterResult Send :: FilterResult -- | A do-nothing filter that accepts all messages (i.e., returns -- Keep for any input). acceptEverything :: Closure (Message -> Process FilterResult) -- | A filter that takes a Closure (Message -> Process -- FilterResult) holding the filter function and applies it remotely -- (i.e., in the mailbox's own managed process). acceptMatching :: Closure (Closure (Message -> Process FilterResult) -> Message -> Process FilterResult) __remoteTable :: RemoteTable -> RemoteTable instance Typeable BufferType instance Typeable MailboxStats instance Typeable Post instance Typeable StatsReq instance Typeable FilterResult instance Typeable Mode instance Typeable ControlMessage instance Typeable Mailbox instance Typeable Delivery instance Typeable NewMail instance Eq BufferType instance Show BufferType instance Generic MailboxStats instance Show MailboxStats instance Generic Post instance Generic StatsReq instance Generic FilterResult instance Generic Mode instance Generic ControlMessage instance Generic Mailbox instance Eq Mailbox instance Generic Delivery instance Generic NewMail instance Show NewMail instance Datatype D1MailboxStats instance Constructor C1_0MailboxStats instance Selector S1_0_0MailboxStats instance Selector S1_0_1MailboxStats instance Selector S1_0_2MailboxStats instance Selector S1_0_3MailboxStats instance Datatype D1Post instance Constructor C1_0Post instance Datatype D1StatsReq instance Constructor C1_0StatsReq instance Datatype D1FilterResult instance Constructor C1_0FilterResult instance Constructor C1_1FilterResult instance Constructor C1_2FilterResult instance Datatype D1Mode instance Constructor C1_0Mode instance Constructor C1_1Mode instance Constructor C1_2Mode instance Datatype D1ControlMessage instance Constructor C1_0ControlMessage instance Constructor C1_1ControlMessage instance Datatype D1Mailbox instance Constructor C1_0Mailbox instance Selector S1_0_0Mailbox instance Selector S1_0_1Mailbox instance Datatype D1Delivery instance Constructor C1_0Delivery instance Selector S1_0_0Delivery instance Selector S1_0_1Delivery instance Selector S1_0_2Delivery instance Selector S1_0_3Delivery instance Datatype D1NewMail instance Constructor C1_0NewMail instance Buffered State instance Binary ControlMessage instance Show Mode instance Binary Mode instance Binary FilterResult instance Binary StatsReq instance Binary Post instance Binary MailboxStats instance Binary Delivery instance Binary NewMail instance Routable Mailbox instance Resolvable Mailbox instance Linkable Mailbox instance Show Mailbox instance Binary Mailbox -- | -- -- The Execution Framework provides tools for load regulation, -- workload shedding and remote hand-off. The currently implementation -- provides only a subset of the plumbing required, comprising tools for -- event management, mailbox buffering and message routing. module Control.Distributed.Process.Execution