-- Hoogle documentation, generated by Haddock
-- See Hoogle, http://www.haskell.org/hoogle/
-- | Execution Framework for The Cloud Haskell Application Platform
--
-- The Execution Framework provides tools for load regulation, workload
-- shedding and remote hand-off. The currently implementation provides
-- only a subset of the plumbing required, comprising tools for event
-- management, mailbox buffering and message routing.
@package distributed-process-execution
@version 0.1.0
-- |
--
-- The concept of a message exchange is borrowed from the world of
-- messaging and enterprise integration. The exchange acts like a
-- kind of mailbox, accepting inputs from producers and forwarding
-- these messages to one or more consumers, depending on the
-- implementation's semantics.
--
-- This module provides some basic types of message exchange and exposes
-- an API for defining your own custom exchange types.
--
--
-- - Broadcast Exchanges
--
--
-- The broadcast exchange type, started via broadcastExchange,
-- forward their inputs to all registered consumers (as the name
-- suggests). This exchange type is highly optimised for local
-- (intra-node) traffic and provides two different kinds of client
-- binding, one which causes messages to be delivered directly to the
-- client's mailbox (viz bindToBroadcaster), the other providing a
-- separate stream of messages that can be obtained using the
-- expect and receiveX family of messaging primitives
-- (and thus composed with other forms of input selection, such as typed
-- channels and selective reads on the process mailbox).
--
-- Important: When a ProcessId is registered via
-- bindToBroadcaster, only the payload of the Message
-- (i.e., the underlying Serializable datum) is forwarded to the
-- consumer, not the whole Message itself.
--
--
--
-- The router API provides a means to selectively route messages
-- to one or more clients, depending on the content of the
-- Message. Two modes of binding (and client selection) are
-- provided out of the box, one of which matches the message key,
-- the second of which matches on a name and value from the
-- headers. Alternative mechanisms for content based routing can
-- be derived by modifying the BindingSelector expression passed
-- to router
--
-- See messageKeyRouter and headerContentRouter for the
-- built-in routing exchanges, and router for the extensible
-- routing API.
--
--
-- - Custom Exchange Types
--
--
-- Both the broadcast and router exchanges are implemented
-- as custom exchange types. The mechanism for defining custom
-- exchange behaviours such as these is very simple. Raw exchanges are
-- started by evaluating startExchange with a specific
-- ExchangeType record. This type is parameterised by the internal
-- state it holds, and defines two API callbacks in its
-- configureEx and routeEx fields. The former is evaluated
-- whenever a client process evaluates configureExchange, the
-- latter whenever a client evaluates post or postMessage.
-- The configureEx callback takes a raw Message (from
-- Control.Distributed.Process) and is responsible for decoding
-- the message and updating its own state (if required). It is via this
-- callback that custom exchange types can receive information about
-- clients and handle it in their own way. The routeEx callback is
-- evaluated with the exchange type's own internal state and the
-- Message originally sent to the exchange process (via
-- post) and is responsible for delivering the message to its
-- clients in whatever way makes sense for that exchange type.
module Control.Distributed.Process.Execution.Exchange
-- | Opaque handle to an exchange.
data Exchange
-- | Messages sent to an exchange can optionally provide a routing key and
-- a list of (key, value) headers in addition to the underlying payload.
data Message
Message :: !String -> ![(String, String)] -> !Message -> Message
-- | a routing key for the payload
key :: Message -> !String
-- | arbitrary key-value headers
headers :: Message -> ![(String, String)]
-- | the underlying Message payload
payload :: Message -> !Message
-- | Starts an exchange process with the given ExchangeType.
startExchange :: ExchangeType s -> Process Exchange
-- | Starts an exchange as part of a supervision tree.
--
-- Example: > childSpec = toChildStart $ startSupervised exType
startSupervised :: ExchangeType s -> SupervisorPid -> Process Exchange
-- | Starts an exchange as part of a supervision tree.
--
-- Example: > childSpec = toChildStart $ startSupervisedRef exType
startSupervisedRef :: ExchangeType s -> SupervisorPid -> Process (ProcessId, Message)
runExchange :: ExchangeType s -> MVar (ControlPort ControlMessage) -> Process ()
-- | Posts an arbitrary Serializable datum to an exchange.
-- The raw datum is wrapped in the Message data type, with its
-- key set to "" and its headers to [].
post :: Serializable a => Exchange -> a -> Process ()
-- | Posts a Message to an exchange.
postMessage :: Exchange -> Message -> Process ()
-- | Sends an arbitrary Serializable datum to an exchange,
-- for use as a configuration change - see configureEx for
-- details.
configureExchange :: Serializable m => Exchange -> m -> Process ()
-- | Utility for creating a Message datum from its key,
-- headers and payload.
createMessage :: Serializable m => String -> [(String, String)] -> m -> Message
-- | Start a new broadcast exchange and return a handle to the
-- exchange.
broadcastExchange :: Process Exchange
-- | The ExchangeType of a broadcast exchange. Can be combined with
-- the startSupervisedRef and startSupervised APIs.
broadcastExchangeT :: Process BroadcastExchange
-- | Create a binding to the given broadcast exchange for the
-- calling process and return an InputStream that can be used in
-- the expect and receiveWait family of messaging
-- primitives. This form of client interaction helps avoid cluttering the
-- caller's mailbox with Message data, since the
-- InputChannel provides a separate input stream (in a similar
-- fashion to a typed channel). Example:
--
--
-- is <- broadcastClient ex
-- msg <- receiveWait [ matchInputStream is ]
-- handleMessage (payload msg)
--
broadcastClient :: Exchange -> Process (InputStream Message)
-- | Bind the calling process to the given broadcast exchange. For
-- each Message the exchange receives, only the payload will be
-- sent to the calling process' mailbox.
--
-- Example:
--
-- (producer) > post ex Hello
--
-- (consumer) > bindToBroadcaster ex > expect >>= liftIO .
-- putStrLn
bindToBroadcaster :: Exchange -> Process ()
type BroadcastExchange = ExchangeType BroadcastEx
type HeaderName = String
-- | The binding key used by the built-in key and header based routers.
data Binding
BindKey :: !String -> Binding
bindingKey :: Binding -> !String
BindHeader :: !String -> !HeaderName -> Binding
bindingKey :: Binding -> !String
headerName :: Binding -> !HeaderName
BindNone :: Binding
-- | Things that can be used as binding keys in a router.
class (Hashable k, Eq k, Serializable k) => Bindable k
-- | Used to convert a Message into a Bindable routing key.
type BindingSelector k = Message -> Process k
-- | Given to a router to indicate whether clients should receive
-- Message payloads only, or the whole Message object
-- itself.
data RelayType
PayloadOnly :: RelayType
WholeMessage :: RelayType
-- | Defines a router exchange. The BindingSelector is used
-- to construct a binding (i.e., an instance of the Bindable type
-- k) for each incoming Message. Such bindings are
-- matched against bindings stored in the exchange. Clients of a
-- router exchange are identified by a binding, mapped to one or
-- more ProcessIds.
--
-- The format of the bindings, nature of their storage and mechanism for
-- submitting new bindings is implementation dependent (i.e., will vary
-- by exchange type). For example, the messageKeyRouter and
-- headerContentRouter implementations both use the Binding
-- data type, which can represent a Message key or a
-- HeaderName and content. As with all custom exchange types,
-- bindings should be submitted by evaluating configureExchange
-- with a suitable data type.
router :: Bindable k => RelayType -> BindingSelector k -> Process Exchange
-- | Defines a router that can be used in a supervision tree.
supervisedRouter :: Bindable k => RelayType -> BindingSelector k -> SupervisorPid -> Process Exchange
-- | Send a Serializable message to the supplied Exchange.
-- The given datum will be converted to a Message, with the
-- key set to "" and the headers to [].
--
-- The routing behaviour will be dependent on the choice of
-- BindingSelector given when initialising the router.
route :: Serializable m => Exchange -> m -> Process ()
-- | Send a Message to the supplied Exchange. The routing
-- behaviour will be dependent on the choice of BindingSelector
-- given when initialising the router.
routeMessage :: Exchange -> Message -> Process ()
-- | A router that matches on a Message key. To bind a client
-- Process to such an exchange, use the bindKey function.
messageKeyRouter :: RelayType -> Process Exchange
-- | Add a binding (for the calling process) to a messageKeyRouter
-- exchange.
bindKey :: String -> Exchange -> Process ()
-- | A router that matches on a specific (named) header. To bind a client
-- Process to such an exchange, use the bindHeader
-- function.
headerContentRouter :: RelayType -> HeaderName -> Process Exchange
-- | Add a binding (for the calling process) to a
-- headerContentRouter exchange.
bindHeader :: HeaderName -> String -> Exchange -> Process ()
-- | Different exchange types are defined using record syntax. The
-- configureEx and routeEx API functions are called during
-- the exchange lifecycle when incoming traffic arrives. Configuration
-- messages are completely arbitrary types and the exchange type author
-- is entirely responsible for decoding them. Messages posted to the
-- exchange (see the Message data type) are passed to the
-- routeEx API function along with the exchange type's own
-- internal state. Both API functions return a new (potentially updated)
-- state and run in the Process monad.
data ExchangeType s
ExchangeType :: String -> s -> (s -> Message -> Process s) -> (s -> Message -> Process s) -> ExchangeType s
name :: ExchangeType s -> String
state :: ExchangeType s -> s
configureEx :: ExchangeType s -> s -> Message -> Process s
routeEx :: ExchangeType s -> s -> Message -> Process s
-- | Utility for custom exchange type authors - evaluates a set of
-- primitive message handlers from left to right, returning the first
-- which evaluates to Just a, or the initial e value if
-- all the handlers yield Nothing.
applyHandlers :: a -> Message -> [Message -> Process (Maybe a)] -> Process a
-- |
--
-- The EventManager is a parallel/concurrent event handling tool,
-- built on top of the Exchange API. Arbitrary events are
-- published to the event manager using notify, and are broadcast
-- simulataneously to a set of registered event handlers.
--
--
-- - Defining and Registering Event Handlers
--
--
-- Event handlers are defined as Serializable m => s -> m ->
-- Process s, i.e., an expression taking an initial state, an
-- arbitrary Serializable event/message and performing an action
-- in the Process monad that evaluates to a new state.
--
-- See Control.Distributed.Process.Execution.Exchange.
module Control.Distributed.Process.Execution.EventManager
-- | Opaque handle to an Event Manager.
data EventManager
-- | Start a new Event Manager process and return an opaque handle
-- to it.
start :: Process EventManager
startSupervised :: SupervisorPid -> Process EventManager
startSupervisedRef :: SupervisorPid -> Process (ProcessId, Message)
-- | Broadcast an event to all registered handlers.
notify :: Serializable a => EventManager -> a -> Process ()
-- | Add a new event handler. The handler runs in its own process, which is
-- spawned locally on behalf of the caller.
addHandler :: Serializable a => EventManager -> (s -> a -> Process s) -> Process s -> Process ProcessId
-- | As addHandler, but operates over a raw
-- Control.Distributed.Process.Message.
addMessageHandler :: EventManager -> (s -> Message -> Process (Maybe s)) -> Process s -> Process ProcessId
instance Typeable EventManager
instance Generic EventManager
instance Datatype D1EventManager
instance Constructor C1_0EventManager
instance Selector S1_0_0EventManager
instance Resolvable EventManager
instance Binary EventManager
-- | Generic process that acts as an external mailbox and message buffer.
--
--
--
-- For use when rate limiting is not possible (or desired), this module
-- provides a buffer process that receives mail via its
-- post API, buffers the received messages and delivers them when
-- its owning process asks for them. A mailbox has to be started
-- with a maximum buffer size - the so called limit - and will
-- discard messages once its internal storage reaches this user defined
-- threshold.
--
-- The usual behaviour of the buffer process is to accumulate
-- messages in its internal memory. When a client evaluates
-- notify, the buffer will send a NewMail message to the
-- (real) mailbox of its owning process as soon as it has any message(s)
-- ready to deliver. If the buffer already contains undelivered mail, the
-- NewMail message will be dispatched immediately.
--
-- When the owning process wishes to receive mail, evaluating
-- deliver (from any process) will cause the buffer to send its
-- owner a Delivery message containing the accumulated messages
-- and additional information about the number of messages it is
-- delivering, the number of messages dropped since the last delivery and
-- a handle for the mailbox (so that processes can have multiple
-- mailboxes if required, and distinguish between them).
--
--
--
-- A mailbox handles overflow - when the number of messages it is holding
-- reaches the limit - differently depending on the BufferType
-- selected when it starts. The Queue buffer will, once the
-- limit is reached, drop older messages first (i.e., the head of the
-- queue) to make space for newer ones. The Ring buffer works
-- similarly, but blocks new messages so as to preserve existing ones
-- instead. Finally, the Stack buffer will drop the last (i.e.,
-- most recently received) message to make room for new mail.
--
-- Mailboxes can be resized by evaluating resize with a new
-- value for the limit. If the new limit is older that the
-- current/previous one, messages are dropped as though the mailbox had
-- previously seen a volume of mail equal to the difference (in size)
-- between the limits. In this situation, the Queue will drop as
-- many older messages as neccessary to come within the limit, whilst the
-- other two buffer types will drop as many newer messages as needed.
--
--
-- - Ordering Guarantees
--
--
-- When messages are delivered to the owner, they arrive as a list of raw
-- Message entries, given in descending age order (i.e., eldest
-- first). Whilst this approximates the FIFO ordering a process' mailbox
-- would usually offer, the Stack buffer will appear to offer no
-- ordering at all, since it always deletes the most recent message(s).
-- The Queue and Ring buffers will maintain a more
-- queue-like (i.e., FIFO) view of received messages, with the obvious
-- constraint the newer or older data might have been deleted.
--
--
-- - Post API and Relaying
--
--
-- For messages to be properly handled by the mailbox, they can either be
-- sent via the post API or directly to the Mailbox.
-- Messages sent directly to the mailbox will still be handled via the
-- internal buffers and subjected to the mailbox limits. The post
-- API is really just a means to ensure that the conversion from
-- Serializable a -> Message is done in the caller's process
-- and uses the safe wrapMessage variant.
--
--
--
-- This API is based on the work of Erlang programmers Fred Hebert and
-- Geoff Cant, its design closely mirroring that of the the pobox
-- library application.
module Control.Distributed.Process.Execution.Mailbox
-- | Opaque handle to a mailbox.
data Mailbox
-- | Start a mailbox for the supplied ProcessId.
--
--
-- start = spawnLocal $ run
--
startMailbox :: ProcessId -> BufferType -> Limit -> Process Mailbox
-- | As startMailbox, but suitable for use in supervisor child
-- specs. This variant is for use when you want to access to the
-- underlying Mailbox handle in your supervised child refs. See
-- supervisor's ChildRef data type for more information.
--
-- Example: > childSpec = toChildStart $ startSupervised pid
-- bufferType mboxLimit
--
-- See Control.Distributed.Process.Supervisor
startSupervised :: ProcessId -> BufferType -> Limit -> SupervisorPid -> Process (ProcessId, Message)
-- | As startMailbox, but suitable for use in supervisor child
-- specs.
--
-- Example: > childSpec = toChildStart $ startSupervisedMailbox pid
-- bufferType mboxLimit
--
-- See Control.Distributed.Process.Supervisor
startSupervisedMailbox :: ProcessId -> BufferType -> Limit -> SupervisorPid -> Process Mailbox
-- | Start a mailbox for the calling process.
--
--
-- create = getSelfPid >>= start
--
createMailbox :: BufferType -> Limit -> Process Mailbox
-- | Alters the mailbox's limit - this might cause messages to be
-- dropped!
resize :: Mailbox -> Integer -> Process ()
-- | Obtain statistics (from/to anywhere) about a mailbox.
statistics :: Mailbox -> Process MailboxStats
-- | Monitor a mailbox.
monitor :: Mailbox -> Process MonitorRef
-- | Represents the maximum number of messages the internal buffer can
-- hold.
type Limit = Integer
-- | Describes the different types of buffer.
data BufferType
-- | FIFO buffer, limiter drops the eldest message (queue head)
Queue :: BufferType
-- | unordered buffer, limiter drops the newest (top) message
Stack :: BufferType
-- | FIFO buffer, limiter refuses (i.e., drops) new messages
Ring :: BufferType
-- | Bundle of statistics data, available on request via the
-- mailboxStats API call.
data MailboxStats
MailboxStats :: Integer -> Integer -> Limit -> ProcessId -> MailboxStats
pendingMessages :: MailboxStats -> Integer
droppedMessages :: MailboxStats -> Integer
currentLimit :: MailboxStats -> Limit
owningProcess :: MailboxStats -> ProcessId
-- | Posts a message to someone's mailbox.
post :: Serializable a => Mailbox -> a -> Process ()
-- | Instructs the mailbox to send a NewMail signal as soon as any
-- mail is available for delivery. Once the signal is sent, it will not
-- be resent, even when further mail arrives, until notify is
-- called again.
--
-- NB: signals are only delivered to the mailbox's owning process.
notify :: Mailbox -> Process ()
-- | Instructs the mailbox to deliver all pending messages to the owner.
deliver :: Mailbox -> Process ()
-- | Instructs the mailbox to send a Delivery as soon as any mail is
-- available, or immediately (if the buffer already contains data).
--
-- NB: signals are only delivered to the mailbox's owning process.
active :: Mailbox -> Filter -> Process ()
-- | Marker message indicating to the owning process that mail has arrived.
data NewMail
NewMail :: !Mailbox -> !Integer -> NewMail
-- | Mail delivery.
data Delivery
Delivery :: Mailbox -> [Message] -> Integer -> Integer -> Delivery
-- | handle to the sending mailbox
box :: Delivery -> Mailbox
-- | list of raw messages
messages :: Delivery -> [Message]
-- | number of messages delivered
count :: Delivery -> Integer
-- | total dropped/skipped messages
totalDropped :: Delivery -> Integer
data FilterResult
Keep :: FilterResult
Skip :: FilterResult
Send :: FilterResult
-- | A do-nothing filter that accepts all messages (i.e., returns
-- Keep for any input).
acceptEverything :: Closure (Message -> Process FilterResult)
-- | A filter that takes a Closure (Message -> Process
-- FilterResult) holding the filter function and applies it remotely
-- (i.e., in the mailbox's own managed process).
acceptMatching :: Closure (Closure (Message -> Process FilterResult) -> Message -> Process FilterResult)
__remoteTable :: RemoteTable -> RemoteTable
instance Typeable BufferType
instance Typeable MailboxStats
instance Typeable Post
instance Typeable StatsReq
instance Typeable FilterResult
instance Typeable Mode
instance Typeable ControlMessage
instance Typeable Mailbox
instance Typeable Delivery
instance Typeable NewMail
instance Eq BufferType
instance Show BufferType
instance Generic MailboxStats
instance Show MailboxStats
instance Generic Post
instance Generic StatsReq
instance Generic FilterResult
instance Generic Mode
instance Generic ControlMessage
instance Generic Mailbox
instance Eq Mailbox
instance Generic Delivery
instance Generic NewMail
instance Show NewMail
instance Datatype D1MailboxStats
instance Constructor C1_0MailboxStats
instance Selector S1_0_0MailboxStats
instance Selector S1_0_1MailboxStats
instance Selector S1_0_2MailboxStats
instance Selector S1_0_3MailboxStats
instance Datatype D1Post
instance Constructor C1_0Post
instance Datatype D1StatsReq
instance Constructor C1_0StatsReq
instance Datatype D1FilterResult
instance Constructor C1_0FilterResult
instance Constructor C1_1FilterResult
instance Constructor C1_2FilterResult
instance Datatype D1Mode
instance Constructor C1_0Mode
instance Constructor C1_1Mode
instance Constructor C1_2Mode
instance Datatype D1ControlMessage
instance Constructor C1_0ControlMessage
instance Constructor C1_1ControlMessage
instance Datatype D1Mailbox
instance Constructor C1_0Mailbox
instance Selector S1_0_0Mailbox
instance Selector S1_0_1Mailbox
instance Datatype D1Delivery
instance Constructor C1_0Delivery
instance Selector S1_0_0Delivery
instance Selector S1_0_1Delivery
instance Selector S1_0_2Delivery
instance Selector S1_0_3Delivery
instance Datatype D1NewMail
instance Constructor C1_0NewMail
instance Buffered State
instance Binary ControlMessage
instance Show Mode
instance Binary Mode
instance Binary FilterResult
instance Binary StatsReq
instance Binary Post
instance Binary MailboxStats
instance Binary Delivery
instance Binary NewMail
instance Routable Mailbox
instance Resolvable Mailbox
instance Linkable Mailbox
instance Show Mailbox
instance Binary Mailbox
-- |
-- - Inter-Process Traffic Management
--
--
-- The Execution Framework provides tools for load regulation,
-- workload shedding and remote hand-off. The currently implementation
-- provides only a subset of the plumbing required, comprising tools for
-- event management, mailbox buffering and message routing.
module Control.Distributed.Process.Execution