Copyright | (c) Well-Typed / Tim Watson |
---|---|
License | BSD3 (see the file LICENSE) |
Maintainer | Tim Watson <watson.timothy@gmail.com> |
Stability | experimental |
Portability | non-portable (requires concurrency) |
Safe Haskell | Safe-Inferred |
Language | Haskell2010 |
- Management Extensions API
This module presents an API for creating Management Agents: special processes that are capable of receiving and responding to a node's internal system events. These system events are delivered by the management event bus: An internal subsystem maintained for each running node, to which all agents are automatically subscribed.
Agents are defined in terms of event sinks, taking a particular
Serializable
type and evaluating to an action in the MxAgent
monad in
response. Each MxSink
evaluates to an MxAction
that specifies whether
the agent should continue processing it's inputs or stop. If the type of a
message cannot be matched to any of the agent's sinks, it will be discarded.
A sink can also deliberately skip processing a message, deferring to the
remaining handlers. This is the only way that more than one event sink
can handle the same data type, since otherwise the first type match will
win every time a message arrives. See mxSkip
for details.
Various events are published to the management event bus automatically,
the full list of which can be found in the definition of the MxEvent
data
type. Additionally, clients of the Management API can publish arbitrary
Serializable
data to the event bus using mxNotify
. All running agents
receive all events (from the primary event bus to which they're subscribed).
Agent processes are automatically registered on the local node, and can
receive messages via their mailbox just like ordinary processes. Unlike
ordinary Process
code however, it is unnecessary (though possible) for
agents to use the base expect
and receiveX
primitives to do this, since
the management infrastructure will continuously read from both the primary
event bus and the process' own mailbox. Messages are transparently passed
to the agent's event sinks from both sources, so an agent need only concern
itself with how to respond to its inputs.
Some agents may wish to prioritise messages from their mailbox over traffic
on the management event bus, or vice versa. The mxReceive
and
mxReceiveChan
API calls do this for the mailbox and event bus,
respectively. The prioritisation these APIs offer is simply that the chosen
data stream will be checked first. No blocking will occur if the chosen
(prioritised) source is devoid of input messages, instead the agent handling
code will revert to switching between the alternatives in round-robin as
usual. If messages exist in one or more channels, they will be consumed as
soon as they're available, priority is effectively a hint about which
channel to consume from, should messages be available in both.
Prioritisation then, is a hint about the preference of data source from which the next input should be chosen. No guarantee can be made that the chosen source will in fact be selected at runtime.
- Management API Semantics
The management API provides no guarantees whatsoever, viz:
- The ordering of messages delivered to the event bus.
- The order in which agents will be executed.
- Whether messages will be taken from the mailbox first, or the event bus.
Since the event bus uses STM broadcast channels to communicate with agents, no message written to the bus successfully can be lost.
Agents can also receive messages via their mailboxes - these are subject to the same guarantees as all inter-process message sending.
Messages dispatched on an STM broadcast channel (i.e., management event bus) are guaranteed to be delivered with the same FIFO ordering guarantees that exist between two communicating processes, such that communication from the node controller's threads (i.e., MxEvent's) will never be re-ordered, but messages dispatched to the event bus by other processes (including, but not limited to agents) are only guaranteed to be ordered between one sender and one receiver.
No guarantee exists for the ordering in which messages sent to an agent's mailbox will be delivered, vs messages dispatched via the event bus.
Because of the above, there are no ordering guarantees for messages sent between agents, or for processes to agents, except for those that apply to messages sent between regular processes, since agents are implemented as such.
The event bus is serial and single threaded. Anything that is published by the node controller will be seen in FIFO order. There are no ordering guarantees pertaining to entries published to the event bus by other processes or agents.
It should not be possible to see, for example, an MxReceived
before the
corresponding MxSent
event, since the places where we issue the MxSent
write directly to the event bus (using STM) in the calling (green) thread,
before dispatching instructions to the node controller to perform the
necessary routing to deliver the message to a process (or registered name,
or typed channel) locally or remotely.
- Management Data API
Both management agents and clients of the API have access to a variety of
data storage capabilities, to facilitate publishing and consuming useful
system information. Agents maintain their own internal state privately (via a
state transformer - see mxGetLocal
et al), however it is possible for
agents to share additional data with each other (and the outside world)
using whatever mechanism the user wishes, e.g., acidstate, or shared memory
primitives.
- Defining Agents
New agents are defined with mxAgent
and require a unique MxAgentId
, an
initial state - MxAgent
runs in a state transformer - and a list of the
agent's event sinks. Each MxSink
is defined in terms of a specific
Serializable
type, via the mxSink
function, binding the event handler
expression to inputs of only that type.
Apart from modifying its own local state, an agent can execute arbitrary
Process a
code via lifting (see liftMX
) and even publish its own messages
back to the primary event bus (see mxBroadcast
).
Since messages are delivered to agents from both the management event bus and
the agent processes mailbox, agents (i.e., event sinks) will generally have
no idea as to their origin. An agent can, however, choose to prioritise the
choice of input (source) each time one of its event sinks runs. The standard
way for an event sink to indicate that the agent is ready for its next input
is to evaluate mxReady
. When this happens, the management infrastructure
will obtain data from the event bus and process' mailbox in a round robbin
fashion, i.e., one after the other, changing each time.
- Example Code
What follows is a grossly over-simplified example of a management agent that provides a basic name monitoring facility. Whenever a process name is registered or unregistered, clients are informed of the fact.
-- simple notification data type data Registration = Reg { added :: Bool , procId :: ProcessId , name :: String } -- start a /name monitoring agent/ nameMonitorAgent = do mxAgent (MxAgentId "name-monitor") Set.empty [ (mxSink $ \(pid :: ProcessId) -> do mxUpdateState $ Set.insert pid mxReady) , (mxSink $ let act = case ev of (MxRegistered p n) -> notify True n p (MxUnRegistered p n) -> notify False n p _ -> return () act >> mxReady) ] where notify a n p = do Foldable.mapM_ (liftMX . deliver (Reg a n p)) =<< mxGetLocal
The client interface (for sending their pid) can take one of two forms:
monitorNames = getSelfPid >>= nsend "name-monitor" monitorNames2 = getSelfPid >>= mxNotify
- Performance, Stablity and Scalability
Management Agents offer numerous advantages over regular processes: broadcast communication with them can have a lower latency, they offer simplified messgage (i.e., input type) handling and they have access to internal system information that would be otherwise unobtainable.
Do not be tempted to implement everything (e.g., the kitchen sink) using the management API though. There are overheads associated with management agents which is why they're presented as tools for consuming low level system information, instead of as application level development tools.
Agents that rely heavily on a busy mailbox can cause the management event bus to backlog un-GC'ed data, leading to increased heap space. Producers that do not take care to avoid passing unevaluated thunks to the API can crash all the agents in the system. Agents are not monitored or managed in any way, and those that crash will not be restarted.
The management event bus can receive a great deal of traffic. Every time a message is sent and/or received, an event is passed to the agent controller and broadcast to all agents (plus the trace controller, if tracing is enabled for the node). This is already a significant overhead - though profiling and benchmarks have demonstrated that it does not adversely affect performance if few agents are installed. Agents will typically use more cycles than plain processes, since they perform additional work: selecting input data from both the event bus and their own mailboxes, plus searching through the set of event sinks (for each agent) to determine the right handler for the event.
- Architecture Overview
The architecture of the management event bus is internal and subject to change without prior notice. The description that follows is provided for informational purposes only.
When a node initially starts, two special, internal system processes are
started to support the management infrastructure. The first, known as the
trace controller, is responsible for consuming MxEvent
s and forwarding
them to the configured tracer - see Control.Distributed.Process.Debug for
further details. The second is the management agent controller, and is the
primary worker process underpinning the management infrastructure. All
published management events are routed to this process, which places them
onto a system wide event bus and additionally passes them directly to the
trace controller.
There are several reasons for segregating the tracing and management control
planes in this fashion. Tracing can be enabled or disabled by clients, whilst
the management event bus cannot, since in addition to providing
runtime instrumentation, its intended use-cases include node monitoring, peer
discovery (via topology providing backends) and other essential system
services that require knowledge of otherwise hidden system internals. Tracing
is also subject to trace flags that limit the specific MxEvent
s delivered
to trace clients - an overhead/complexity not shared by management agents.
Finally, tracing and management agents are implemented using completely
different signalling techniques - more on this later - which would introduce
considerable complexity if the shared the same event loop.
The management control plane is driven by a shared broadcast channel, which is written to by the agent controller and subscribed to by all agent processes. Agents are spawned as regular processes, whose primary implementation (i.e., server loop) is responsible for consuming messages from both the broadcast channel and their own mailbox. Once consumed, messages are applied to the agent's event sinks until one matches the input, at which point it is applied and the loop continues. The implementation chooses from the event bus and the mailbox in a round-robin fashion, until a message is received. This polling activity would lead to management agents consuming considerable system resources if left unchecked, therefore the implementation will poll for a limitted number of retries, after which it will perform a blocking read on the event bus.
Synopsis
- data MxEvent
- = MxSpawned ProcessId
- | MxRegistered ProcessId String
- | MxUnRegistered ProcessId String
- | MxProcessDied ProcessId DiedReason
- | MxNodeDied NodeId DiedReason
- | MxSent ProcessId ProcessId Message
- | MxSentToName String ProcessId Message
- | MxSentToPort ProcessId SendPortId Message
- | MxReceived ProcessId Message
- | MxReceivedPort SendPortId Message
- | MxConnected ConnectionId EndPointAddress
- | MxDisconnected ConnectionId EndPointAddress
- | MxUser Message
- | MxLog String
- | MxTraceTakeover ProcessId
- | MxTraceDisable
- mxNotify :: Serializable a => a -> Process ()
- data MxAction
- newtype MxAgentId = MxAgentId {}
- data MxAgent s a
- mxAgent :: MxAgentId -> s -> [MxSink s] -> Process ProcessId
- mxAgentWithFinalize :: MxAgentId -> s -> [MxSink s] -> MxAgent s () -> Process ProcessId
- type MxSink s = Message -> MxAgent s (Maybe MxAction)
- mxSink :: forall s m. Serializable m => (m -> MxAgent s MxAction) -> MxSink s
- mxGetId :: MxAgent s MxAgentId
- mxDeactivate :: forall s. String -> MxAgent s MxAction
- mxReady :: forall s. MxAgent s MxAction
- mxSkip :: forall s. MxAgent s MxAction
- mxReceive :: forall s. MxAgent s MxAction
- mxReceiveChan :: forall s. MxAgent s MxAction
- mxBroadcast :: Serializable m => m -> MxAgent s ()
- mxSetLocal :: s -> MxAgent s ()
- mxGetLocal :: MxAgent s s
- mxUpdateLocal :: (s -> s) -> MxAgent s ()
- liftMX :: Process a -> MxAgent s a
Documentation
This is the default management event, fired for various internal events around the NT connection and Process lifecycle. All published events that conform to this type, are eligible for tracing - i.e., they will be delivered to the trace controller.
MxSpawned ProcessId | fired whenever a local process is spawned |
MxRegistered ProcessId String | fired whenever a process/name is registered (locally) |
MxUnRegistered ProcessId String | fired whenever a process/name is unregistered (locally) |
MxProcessDied ProcessId DiedReason | fired whenever a process dies |
MxNodeDied NodeId DiedReason | fired whenever a node dies (i.e., the connection is broken/disconnected) |
MxSent ProcessId ProcessId Message | fired whenever a message is sent from a local process |
MxSentToName String ProcessId Message | fired whenever a named send occurs |
MxSentToPort ProcessId SendPortId Message | fired whenever a sendChan occurs |
MxReceived ProcessId Message | fired whenever a message is received by a local process |
MxReceivedPort SendPortId Message | fired whenever a message is received via a typed channel |
MxConnected ConnectionId EndPointAddress | fired when a network-transport connection is first established |
MxDisconnected ConnectionId EndPointAddress | fired when a network-transport connection is broken/disconnected |
MxUser Message | a user defined trace event |
MxLog String | a logging event - used for debugging purposes only |
MxTraceTakeover ProcessId | notifies a trace listener that all subsequent traces will be sent to pid |
MxTraceDisable | notifies a trace listener that it has been disabled/removed |
Instances
Firing Arbitrary Mx Events
mxNotify :: Serializable a => a -> Process () Source #
Publishes an arbitrary Serializable
message to the management event bus.
Note that no attempt is made to force the argument, therefore it is very
important that you do not pass unevaluated thunks that might crash the
receiving process via this API, since all registered agents will gain
access to the data structure once it is broadcast by the agent controller.
Constructing Mx Agents
Represents the actions a management agent can take when evaluating an event sink.
A newtype wrapper for an agent id (which is a string).
Monad for management agents.
Instances
MonadFix (MxAgent s) Source # | |
MonadIO (MxAgent s) Source # | |
Applicative (MxAgent s) Source # | |
Functor (MxAgent s) Source # | |
Monad (MxAgent s) Source # | |
MonadState (MxAgentState s) (MxAgent s) Source # | |
Defined in Control.Distributed.Process.Management.Internal.Types get :: MxAgent s (MxAgentState s) # put :: MxAgentState s -> MxAgent s () # state :: (MxAgentState s -> (a, MxAgentState s)) -> MxAgent s a # |
mxAgentWithFinalize :: MxAgentId -> s -> [MxSink s] -> MxAgent s () -> Process ProcessId Source #
Activates a new agent. This variant takes a finalizer expression,
that is run once the agent shuts down (even in case of failure/exceptions).
The finalizer expression runs in the mx monad - MxAgent s ()
- such
that the agent's internal state remains accessible to the shutdown/cleanup
code.
type MxSink s = Message -> MxAgent s (Maybe MxAction) Source #
Type of a management agent's event sink.
mxReady :: forall s. MxAgent s MxAction Source #
Continue executing (i.e., receiving and processing messages).
mxSkip :: forall s. MxAgent s MxAction Source #
Causes the currently executing event sink to be skipped. The remaining declared event sinks will be evaluated to find a matching handler. Can be used to allow multiple event sinks to process data of the same type.
mxReceive :: forall s. MxAgent s MxAction Source #
Continue exeucting, prioritising inputs from the process' own mailbox ahead of data from the management event bus.
mxReceiveChan :: forall s. MxAgent s MxAction Source #
Continue exeucting, prioritising inputs from the management event bus over the process' own mailbox.
mxBroadcast :: Serializable m => m -> MxAgent s () Source #
mxSetLocal :: s -> MxAgent s () Source #
Set the agent's local state.
mxGetLocal :: MxAgent s s Source #
Fetch the agent's local state.
mxUpdateLocal :: (s -> s) -> MxAgent s () Source #
Update the agent's local state.