Copyright | (c) Well-Typed / Tim Watson |
---|---|
License | BSD3 (see the file LICENSE) |
Maintainer | Tim Watson <watson.timothy@gmail.com> |
Stability | experimental |
Portability | non-portable (requires concurrency) |
Safe Haskell | None |
Language | Haskell98 |
- Management Extensions API
This module presents an API for creating Management Agents: special processes that are capable of receiving and responding to a node's internal system events. These system events are delivered by the management event bus: An internal subsystem maintained for each running node, to which all agents are automatically subscribed.
Agents are defined in terms of event sinks, taking a particular
Serializable
type and evaluating to an action in the MxAgent
monad in
response. Each MxSink
evaluates to an MxAction
that specifies whether
the agent should continue processing it's inputs or stop. If the type of a
message cannot be matched to any of the agent's sinks, it will be discarded.
A sink can also deliberately skip processing a message, deferring to the
remaining handlers. This is the only way that more than one event sink
can handle the same data type, since otherwise the first type match will
win every time a message arrives. See mxSkip
for details.
Various events are published to the management event bus automatically,
the full list of which can be found in the definition of the MxEvent
data
type. Additionally, clients of the Management API can publish arbitrary
Serializable
data to the event bus using mxNotify
. All running agents
receive all events (from the primary event bus to which they're subscribed).
Agent processes are automatically registered on the local node, and can
receive messages via their mailbox just like ordinary processes. Unlike
ordinary Process
code however, it is unnecessary (though possible) for
agents to use the base expect
and receiveX
primitives to do this, since
the management infrastructure will continuously read from both the primary
event bus and the process' own mailbox. Messages are transparently passed
to the agent's event sinks from both sources, so an agent need only concern
itself with how to respond to its inputs.
Some agents may wish to prioritise messages from their mailbox over traffic
on the management event bus, or vice versa. The mxReceive
and
mxReceiveChan
API calls do this for the mailbox and event bus,
respectively. The prioritisation these APIs offer is simply that the chosen
data stream will be checked first. No blocking will occur if the chosen
(prioritised) source is devoid of input messages, instead the agent handling
code will revert to switching between the alternatives in round-robin as
usual. If messages exist in one or more channels, they will be consumed as
soon as they're available, priority is effectively a hint about which
channel to consume from, should messages be available in both.
Prioritisation then, is a hint about the preference of data source from which the next input should be chosen. No guarantee can be made that the chosen source will in fact be selected at runtime.
- Management API Semantics
The management API provides no guarantees whatsoever, viz:
- The ordering of messages delivered to the event bus.
- The order in which agents will be executed.
- Whether messages will be taken from the mailbox first, or the event bus.
- Management Data API
Both management agents and clients of the API have access to a variety of
data storage capabilities, to facilitate publishing and consuming useful
system information. Agents maintain their own internal state privately (via a
state transformer - see mxGetLocal
et al), however it is possible for
agents to share additional data with each other (and the outside world)
using data tables.
Each agent is assigned its own data table, which acts as a shared map, where
the keys are String
s and the values are Serializable
datum of whatever
type the agent or its clients stores.
Because an agent's data table stores its values in raw Message
format,
it works effectively as an un-typed dictionary, into which data of varying
types can be fed and later retrieved. The upside of this is that different
keys can be mapped to various types without any additional work on the part
of the developer. The downside is that the code reading these values must
know in advance what type(s) to expect, and the API provides no additional
support for handling that.
Publishing is accomplished using the mxPublish
and mxSet
APIs, whilst
querying and deletion are handled by mxGet
, mxClear
, mxPurgeTable
and
mxDropTable
respectively.
When a management agent terminates, their tables are left in memory despite
termination, such that an agent may resume its role (by restarting) or have
its MxAgentId
taken over by another subsequent agent, leaving the data
originally captured in place.
- Defining Agents
New agents are defined with mxAgent
and require a unique MxAgentId
, an
initial state - MxAgent
runs in a state transformer - and a list of the
agent's event sinks. Each MxSink
is defined in terms of a specific
Serializable
type, via the mxSink
function, binding the event handler
expression to inputs of only that type.
Apart from modifying its own local state, an agent can execute arbitrary
Process a
code via lifting (see liftMX
) and even publish its own messages
back to the primary event bus (see mxBroadcast
).
Since messages are delivered to agents from both the management event bus and
the agent processes mailbox, agents (i.e., event sinks) will generally have
no idea as to their origin. An agent can, however, choose to prioritise the
choice of input (source) each time one of its event sinks runs. The standard
way for an event sink to indicate that the agent is ready for its next input
is to evaluate mxReady
. When this happens, the management infrastructure
will obtain data from the event bus and process' mailbox in a round robbin
fashion, i.e., one after the other, changing each time.
- Example Code
What follows is a grossly over-simplified example of a management agent that provides a basic name monitoring facility. Whenever a process name is registered or unregistered, clients are informed of the fact.
-- simple notification data type data Registration = Reg { added :: Bool , procId :: ProcessId , name :: String } -- start a /name monitoring agent/ nameMonitorAgent = do mxAgent (MxAgentId "name-monitor") Set.empty [ (mxSink $ \(pid :: ProcessId) -> do mxUpdateState $ Set.insert pid mxReady) , (mxSink $ let act = case ev of (MxRegistered p n) -> notify True n p (MxUnRegistered p n) -> notify False n p _ -> return () act >> mxReady) ] where notify a n p = do Foldable.mapM_ (liftMX . deliver (Reg a n p)) =<< mxGetLocal
The client interface (for sending their pid) can take one of two forms:
monitorNames = getSelfPid >>= nsend "name-monitor" monitorNames2 = getSelfPid >>= mxNotify
For some real-world examples, see the distributed-process-platform package.
- Performance, Stablity and Scalability
Management Agents offer numerous advantages over regular processes: broadcast communication with them can have a lower latency, they offer simplified messgage (i.e., input type) handling and they have access to internal system information that would be otherwise unobtainable.
Do not be tempted to implement everything (e.g., the kitchen sink) using the management API though. There are overheads associated with management agents which is why they're presented as tools for consuming low level system information, instead of as application level development tools.
Agents that rely heavily on a busy mailbox can cause the management event bus to backlog un-GC'ed data, leading to increased heap space. Producers that do not take care to avoid passing unevaluated thunks to the API can crash all the agents in the system. Agents are not monitored or managed in any way, and those that crash will not be restarted.
The management event bus can receive a great deal of traffic. Every time a message is sent and/or received, an event is passed to the agent controller and broadcast to all agents (plus the trace controller, if tracing is enabled for the node). This is already a significant overhead - though profiling and benchmarks have demonstrated that it does not adversely affect performance if few agents are installed. Agents will typically use more cycles than plain processes, since they perform additional work: selecting input data from both the event bus and their own mailboxes, plus searching through the set of event sinks (for each agent) to determine the right handler for the event.
Each management agent requires not only its own Process
(in which the agent
code is run), but also a peer process that provides its data table. These
data tables also have to be coordinated and manaaged on each agent's behalf.
- Architecture Overview
The architecture of the management event bus is internal and subject to change without prior notice. The description that follows is provided for informational purposes only.
When a node initially starts, two special, internal system processes are
started to support the management infrastructure. The first, known as the
trace controller, is responsible for consuming MxEvent
s and forwarding
them to the configured tracer - see Control.Distributed.Process.Debug for
further details. The second is the management agent controller, and is the
primary worker process underpinning the management infrastructure. All
published management events are routed to this process, which places them
onto a system wide event bus and additionally passes them directly to the
trace controller.
There are several reasons for segregating the tracing and management control
planes in this fashion. Tracing can be enabled or disabled by clients, whilst
the management event bus cannot, since in addition to providing
runtime instrumentation, its intended use-cases include node monitoring, peer
discovery (via topology providing backends) and other essential system
services that require knowledge of otherwise hidden system internals. Tracing
is also subject to trace flags that limit the specific MxEvent
s delivered
to trace clients - an overhead/complexity not shared by management agents.
Finally, tracing and management agents are implemented using completely
different signalling techniques - more on this later - which would introduce
considerable complexity if the shared the same event loop.
The management control plane is driven by a shared broadcast channel, which is written to by the agent controller and subscribed to by all agent processes. Agents are spawned as regular processes, whose primary implementation (i.e., server loop) is responsible for consuming messages from both the broadcast channel and their own mailbox. Once consumed, messages are applied to the agent's event sinks until one matches the input, at which point it is applied and the loop continues. The implementation chooses from the event bus and the mailbox in a round-robin fashion, until a message is received. This polling activity would lead to management agents consuming considerable system resources if left unchecked, therefore the implementation will poll for a limitted number of retries, after which it will perform a blocking read on the event bus.
- data MxEvent
- = MxSpawned ProcessId
- | MxRegistered ProcessId String
- | MxUnRegistered ProcessId String
- | MxProcessDied ProcessId DiedReason
- | MxNodeDied NodeId DiedReason
- | MxSent ProcessId ProcessId Message
- | MxReceived ProcessId Message
- | MxConnected ConnectionId EndPointAddress
- | MxDisconnected ConnectionId EndPointAddress
- | MxUser Message
- | MxLog String
- | MxTraceTakeover ProcessId
- | MxTraceDisable
- mxNotify :: Serializable a => a -> Process ()
- data MxAction
- newtype MxAgentId = MxAgentId {}
- data MxAgent s a
- mxAgent :: MxAgentId -> s -> [MxSink s] -> Process ProcessId
- mxAgentWithFinalize :: MxAgentId -> s -> [MxSink s] -> MxAgent s () -> Process ProcessId
- type MxSink s = Message -> MxAgent s (Maybe MxAction)
- mxSink :: forall s m. Serializable m => (m -> MxAgent s MxAction) -> MxSink s
- mxGetId :: MxAgent s MxAgentId
- mxDeactivate :: forall s. String -> MxAgent s MxAction
- mxReady :: forall s. MxAgent s MxAction
- mxSkip :: forall s. MxAgent s MxAction
- mxReceive :: forall s. MxAgent s MxAction
- mxReceiveChan :: forall s. MxAgent s MxAction
- mxBroadcast :: Serializable m => m -> MxAgent s ()
- mxSetLocal :: s -> MxAgent s ()
- mxGetLocal :: MxAgent s s
- mxUpdateLocal :: (s -> s) -> MxAgent s ()
- liftMX :: Process a -> MxAgent s a
- mxPublish :: MxAgentId -> String -> Message -> Process ()
- mxSet :: Serializable a => MxAgentId -> String -> a -> Process ()
- mxGet :: Serializable a => MxAgentId -> String -> Process (Maybe a)
- mxClear :: MxAgentId -> String -> Process ()
- mxPurgeTable :: MxAgentId -> Process ()
- mxDropTable :: MxAgentId -> Process ()
Documentation
This is the default management event, fired for various internal events around the NT connection and Process lifecycle. All published events that conform to this type, are eligible for tracing - i.e., they will be delivered to the trace controller.
MxSpawned ProcessId | fired whenever a local process is spawned |
MxRegistered ProcessId String | fired whenever a process/name is registered (locally) |
MxUnRegistered ProcessId String | fired whenever a process/name is unregistered (locally) |
MxProcessDied ProcessId DiedReason | fired whenever a process dies |
MxNodeDied NodeId DiedReason | fired whenever a node dies (i.e., the connection is broken/disconnected) |
MxSent ProcessId ProcessId Message | fired whenever a message is sent from a local process |
MxReceived ProcessId Message | fired whenever a message is received by a local process |
MxConnected ConnectionId EndPointAddress | fired when a network-transport connection is first established |
MxDisconnected ConnectionId EndPointAddress | fired when a network-transport connection is broken/disconnected |
MxUser Message | a user defined trace event |
MxLog String | a logging event - used for debugging purposes only |
MxTraceTakeover ProcessId | notifies a trace listener that all subsequent traces will be sent to pid |
MxTraceDisable | notifies a trace listener that it has been disabled/removed |
Firing Arbitrary Mx Events
mxNotify :: Serializable a => a -> Process () Source
Publishes an arbitrary Serializable
message to the management event bus.
Note that no attempt is made to force the argument, therefore it is very
important that you do not pass unevaluated thunks that might crash the
receiving process via this API, since all registered agents will gain
access to the data structure once it is broadcast by the agent controller.
Constructing Mx Agents
Represents the actions a management agent can take when evaluating an event sink.
A newtype wrapper for an agent id (which is a string).
Monad for management agents.
Monad (MxAgent s) | |
Functor (MxAgent s) | |
Applicative (MxAgent s) | |
MonadIO (MxAgent s) | |
MonadState (MxAgentState s) (MxAgent s) | |
Typeable (* -> * -> *) MxAgent |
mxAgentWithFinalize :: MxAgentId -> s -> [MxSink s] -> MxAgent s () -> Process ProcessId Source
Activates a new agent. This variant takes a finalizer expression,
that is run once the agent shuts down (even in case of failure/exceptions).
The finalizer expression runs in the mx monad - MxAgent s ()
- such
that the agent's internal state remains accessible to the shutdown/cleanup
code.
type MxSink s = Message -> MxAgent s (Maybe MxAction) Source
Type of a management agent's event sink.
mxDeactivate :: forall s. String -> MxAgent s MxAction Source
Gracefully terminate an agent.
mxReady :: forall s. MxAgent s MxAction Source
Continue executing (i.e., receiving and processing messages).
mxSkip :: forall s. MxAgent s MxAction Source
Causes the currently executing event sink to be skipped. The remaining declared event sinks will be evaluated to find a matching handler. Can be used to allow multiple event sinks to process data of the same type.
mxReceive :: forall s. MxAgent s MxAction Source
Continue exeucting, prioritising inputs from the process' own mailbox ahead of data from the management event bus.
mxReceiveChan :: forall s. MxAgent s MxAction Source
Continue exeucting, prioritising inputs from the management event bus over the process' own mailbox.
mxBroadcast :: Serializable m => m -> MxAgent s () Source
mxSetLocal :: s -> MxAgent s () Source
Set the agent's local state.
mxGetLocal :: MxAgent s s Source
Fetch the agent's local state.
mxUpdateLocal :: (s -> s) -> MxAgent s () Source
Update the agent's local state.
Mx Data API
mxPublish :: MxAgentId -> String -> Message -> Process () Source
Publish an arbitrary Message
as a property in the management database.
For publishing Serializable
data, use mxSet
instead.
mxSet :: Serializable a => MxAgentId -> String -> a -> Process () Source
Sets an arbitrary Serializable
datum against a key in the management
database. Note that no attempt is made to force the argument, therefore
it is very important that you do not pass unevaluated thunks that might
crash some other, arbitrary process (or management agent!) that obtains
and attempts to force the value later on.
mxGet :: Serializable a => MxAgentId -> String -> Process (Maybe a) Source
Fetches a property from the management database for the given key.
If the property is not set, or does not match the expected type when
typechecked (at runtime), returns Nothing
.
mxClear :: MxAgentId -> String -> Process () Source
Clears a property from the management database using the given key. If the key does not exist in the database, this is a noop.
mxPurgeTable :: MxAgentId -> Process () Source
Purges a table in the management database of all its stored properties.
mxDropTable :: MxAgentId -> Process () Source
Deletes a table from the management database.