remote-0.1.1: Cloud Haskell

Remote.Process

Contents

Description

This module is the core of Cloud Haskell. It provides processes, messages, monitoring, and configuration.

Synopsis

The Process monad

data ProcessM a Source

The monad ProcessM is the core of the process layer. Functions in the ProcessM monad may participate in messaging and create additional concurrent processes. You can create a ProcessM context from an IO context with the remoteInit function.

data NodeId Source

Identifies a node somewhere on the network. These can be queried from getPeers. See also getSelfNode

data ProcessId Source

Identifies a process somewhere on the network. These are produced by the spawn family of functions and consumed by send. When a process ends, its process ID ceases to be valid. See also getSelfPid

type PeerInfo = Map String [NodeId]Source

Created by Remote.Peer.getPeers, this maps each role to a list of nodes that have that role. It can be examined directly or queried with findPeerByRole.

getSelfPid :: ProcessM ProcessIdSource

Returns the process ID of the current process.

getSelfNode :: ProcessM NodeIdSource

Returns the node ID of the node that the current process is running on.

isPidLocal :: ProcessId -> ProcessM BoolSource

Returns true if the given process ID is associated with the current node. Does not examine if the process is currently running.

Message receiving

expect :: Serializable a => ProcessM aSource

A simple way to receive messages. This will return the first message received of the specified type; if no such message is available, the function will block. Unlike the receive family of functions, this function does not allow the notion of choice in message extraction.

data MatchM q a Source

This monad provides the state and structure for matching received messages from the incoming message queue. It's the interface between the receive family of functions, and the match family, which together can express which messages can be accepted.

Instances

receive :: [MatchM q ()] -> ProcessM (Maybe q)Source

Examines the message queue of the current process, matching each message against each of the provided message pattern clauses (typically provided by a function from the match family). If a message matches, the corresponding handler is invoked and its result is returned. If no message matches, Nothing is returned.

receiveWait :: [MatchM q ()] -> ProcessM qSource

Examines the message queue of the current process, matching each message against each of the provided message pattern clauses (typically provided by a function from the match family). If a message matches, the corresponding handler is invoked and its result is returned. If no message matches, the function blocks until a matching message is received.

receiveTimeout :: Int -> [MatchM q ()] -> ProcessM (Maybe q)Source

Examines the message queue of the current process, matching each message against each of the provided message pattern clauses (typically provided by a function from the match family). If a message matches, the corresponding handler is invoked and its result is returned. If no message matches, the function blocks until a matching message is received, or until the specified time in microseconds has elapsed, at which point it will return Nothing. If the specified time is 0, this function is equivalent to receive.

match :: Serializable a => (a -> ProcessM q) -> MatchM q ()Source

Used to specify a message pattern in receiveWait and related functions. Only messages containing data of type a, where a is the argument to the user-provided function in the first parameter of match, will be removed from the queue, at which point the user-provided function will be invoked.

matchIf :: Serializable a => (a -> Bool) -> (a -> ProcessM q) -> MatchM q ()Source

Similar to match, but allows for additional criteria to be checked prior to message acceptance. Here, the first user-provided function operates as a filter, and the message will be accepted only if it returns True. Once it's been accepted, the second user-defined function is invoked, as in match

matchUnknown :: ProcessM q -> MatchM q ()Source

A catch-all variant of match that invokes user-provided code and will extact any message from the queue. This is useful for matching against messages that are not recognized. Since message matching patterns are evaluated in order, this function, if used, should be the last element in the list of matchers given to receiveWait and similar functions.

matchUnknownThrow :: MatchM q ()Source

A variant of matchUnknown that throws a UnknownMessageException if the process receives a message that isn't extracted by another message matcher. Equivalent to:

 matchUnknown (throw (UnknownMessageException "..."))

matchProcessDown :: ProcessId -> ProcessM q -> MatchM q ()Source

A specialized version of match (for use with receive, receiveWait and friends) for catching process down messages. This way processes can avoid waiting forever for a response from another process that has crashed. Intended to be used within a withMonitor block, e.g.:

 withMonitor apid $
   do send apid QueryMsg
      receiveWait 
      [
        match (\AnswerMsg -> return "ok"),
        matchProcessDown apid (return "aborted")   
      ]

Message sending

send :: Serializable a => ProcessId -> a -> ProcessM ()Source

Sends a message to the given process. If the process isn't running or can't be accessed, this function will throw a TransmitException. The message must implement the Serializable interface.

sendQuiet :: Serializable a => ProcessId -> a -> ProcessM TransmitStatusSource

Like send, but in case of error returns a value rather than throw an exception.

Logging functions

logS :: LogSphere -> LogLevel -> String -> ProcessM ()Source

Generates a log entry, using the process's current logging configuration.

  • LogSphere indicates the subsystem generating this message. SYS in the case of componentes of the framework.
  • LogLevel indicates the importance of the message.
  • The third parameter is the log message.

Both of the first two parameters may be used to filter log output.

say :: String -> ProcessM ()Source

Uses the logging facility to produce non-filterable, programmatic output. Shouldn't be used for informational logging, but rather for application-level output.

type LogSphere = StringSource

Specifies the subsystem or region that is responsible for generating a given log entry. This is useful in conjunction with LogFilter to limit displayed log output to the particular area of your program that you are currently debugging. The SYS, TSK, and SAY spheres are used by the framework for messages relating to the Process layer, the Task layer, and the say function. The remainder of values are free for use at the application level.

data LogLevel Source

Specifies the importance of a particular log entry. Can also be used to filter log output.

Constructors

LoSay

Non-suppressible application-level emission

LoFatal 
LoCritical 
LoImportant 
LoStandard

The default log level

LoInformation 
LoTrivial 

data LogTarget Source

A preference as to what is done with log messages

Constructors

LtStdout

Messages will be output to the console; the default

LtForward NodeId

Log messages will be forwarded to the given node; please don't set up a loop

LtFile FilePath

Log messages will be appended to the given file

LtForwarded

Special value -- don't set this in your LogConfig!

data LogFilter Source

Specifies which log messages will be output. All log messages of importance below the current log level or not among the criterea given here will be suppressed. This type lets you limit displayed log messages to certain components.

data LogConfig Source

Expresses a current configuration of the logging subsystem, which determines which log messages to be output and where to send them when they are. Both processes and nodes have log configurations, set with setLogConfig and setNodeLogConfig respectively. The node log configuration is used for all processes that have not explicitly set their log configuration. Otherwise, the process log configuration takes priority.

Constructors

LogConfig 

Fields

logLevel :: LogLevel

The lowest message priority that will be displayed

logTarget :: LogTarget

Where to send messages

logFilter :: LogFilter

Other filtering

setLogConfig :: LogConfig -> ProcessM ()Source

Set the process's log configuration. This overrides any node-level log configuration

getLogConfig :: ProcessM LogConfigSource

Gets the currently active log configuration for the current process; if the current process doesn't have a log configuration set, the process's log configuration will be returned

setNodeLogConfig :: LogConfig -> ProcessM ()Source

Sets the node's log configuration

setRemoteNodeLogConfig :: NodeId -> LogConfig -> ProcessM ()Source

Sets the log configuration of a remote node. May throw TransmitException

defaultLogConfig :: LogConfigSource

The default log configuration represents a starting point for setting your own configuration. It is:

 logLevel = LoStandard
 logTarget = LtStdout
 logFilter = LfAll

Exception handling

ptry :: Exception e => ProcessM a -> ProcessM (Either e a)Source

A ProcessM-flavoured variant of try

ptimeout :: Int -> ProcessM a -> ProcessM (Maybe a)Source

A ProcessM-flavoured variant of timeout

pbracket :: ProcessM a -> (a -> ProcessM b) -> (a -> ProcessM c) -> ProcessM cSource

A ProcessM-flavoured variant of bracket

pfinally :: ProcessM a -> ProcessM b -> ProcessM aSource

A ProcessM-flavoured variant of finally

data UnknownMessageException Source

Thrown by matchUnknownThrow in response to a message of a wrong type being received by a process

data ServiceException Source

Thrown by Remote.Process system services in response to some problem

Constructors

ServiceException String 

data TransmitException Source

Thrown by various network-related functions when communication with a host has failed

Process naming

nameSet :: String -> ProcessM ()Source

Assigns a name to the current process. The name is local to the node. On each node, each process may have only one name, and each name may be given to only one node. If this function is called more than once by the same process, or called more than once with the name on a single node, it will throw a ServiceException. The PID of a named process can be queried later with nameQuery. When the named process ends, its name will again become available. One reason to use named processes is to create node-local state. This example lets each node have its own favorite color, which can be changed and queried.

 nodeFavoriteColor :: ProcessM ()
 nodeFavoriteColor =
  do nameSet "favorite_color"
     loop Blue
  where loop color =
      receiveWait
         [ match (\newcolor -> return newcolor),
           match (\pid -> send pid color >> return color)
         ] >>= loop

 setFavoriteColor :: NodeId -> Color -> ProcessM ()
 setFavoriteColor nid color =
  do (Just pid) <- nameQuery nid "favorite_color"
     send pid color

 getFavoriteColor :: NodeId -> ProcessM Color
 getFavoriteColor nid =
  do (Just pid) <- nameQuery nid "favorite_color"
     mypid <- getSelfPid
     send pid mypid
     expect

nameQuery :: NodeId -> String -> ProcessM (Maybe ProcessId)Source

Query the PID of a named process on a particular node. If no process of that name exists, or if that process has ended, this function returns Nothing.

nameQueryOrStart :: NodeId -> String -> Closure (ProcessM ()) -> ProcessM ProcessIdSource

Similar to nameQuery but if the named process doesn't exist, it will be started from the given closure. If the process is already running, the closure will be ignored.

Process spawning and monitoring

spawnLocal :: ProcessM () -> ProcessM ProcessIdSource

Create a new process on the current node. Returns the new process's identifier. Unlike spawn, this function does not need a Closure or a NodeId.

spawnLocalAnd :: ProcessM () -> ProcessM () -> ProcessM ProcessIdSource

Start executing a process on the current node. This is a variation of spawnLocal which accepts two blocks of user-defined code. The first block is the main body of the code to run concurrently. The second block is a prefix which is run in the new process, prior to the main body, but its completion is guaranteed before spawnAnd returns. Thus, the prefix code is useful for initializing the new process synchronously.

spawn :: NodeId -> Closure (ProcessM ()) -> ProcessM ProcessIdSource

Start a process running the code, given as a closure, on the specified node. If successful, returns the process ID of the new process. If unsuccessful, throw a TransmitException.

spawnAnd :: NodeId -> Closure (ProcessM ()) -> AmSpawnOptions -> ProcessM ProcessIdSource

A variant of spawn that allows greater control over how the remote process is started.

spawnLink :: NodeId -> Closure (ProcessM ()) -> ProcessM ProcessIdSource

A variant of spawn that starts the remote process with bidirectoinal monitoring, as in linkProcess

unpause :: ProcessId -> ProcessM ()Source

If a remote process has been started in a paused state with spawnAnd , it will be running but inactive until unpaused. Use this function to unpause such a function. It has no effect on processes that are not paused or that have already been unpaused.

data MonitorAction Source

The different kinds of monitoring available between processes.

Constructors

MaMonitor

MaMonitor means that the monitor process will be sent a ProcessMonitorException message when the monitee terminates for any reason.

MaLink

MaLink means that the monitor process will receive an asynchronous exception of type ProcessMonitorException when the monitee terminates for any reason

MaLinkError

MaLinkError means that the monitor process will receive an asynchronous exception of type ProcessMonitorException when the monitee terminates abnormally

data SignalReason Source

Part of the notification system of process monitoring, indicating why the monitor is being notified.

Constructors

SrNormal

the monitee terminated normally

SrException String

the monitee terminated with an uncaught exception, which is given as a string

SrNoPing

the monitee is believed to have ended or be inaccessible, as the node on which its running is not responding to pings. This may indicate a network bisection or that the remote node has crashed.

SrInvalid

SrInvalid: the monitee was not running at the time of the attempt to establish monitoring

data ProcessMonitorException Source

The main form of notification to a monitoring process that a monitored process has terminated. This data structure can be delivered to the monitor either as a message (if the monitor is of type MaMonitor) or as an asynchronous exception (if the monitor is of type MaLink or MaLinkError). It contains the PID of the monitored process and the reason for its nofication.

linkProcess :: ProcessId -> ProcessM ()Source

Establishes bidirectional abnormal termination monitoring between the current process and another. Monitoring established with linkProcess is bidirectional and signals only in the event of abnormal termination. In other words, linkProcess a is equivalent to:

 monitorProcess mypid a MaLinkError
 monitorProcess a mypid MaLinkError

monitorProcess :: ProcessId -> ProcessId -> MonitorAction -> ProcessM ()Source

Establishes unidirectional processing of another process. The format is:

 monitorProcess monitor monitee action

Here,

  • monitor is the process that will be notified if the monitee goes down
  • monitee is the process that will be monitored
  • action determines how the monitor will be notified

Monitoring will remain in place until one of the processes ends or until unmonitorProcess is called. Calls to monitorProcess are cumulative, such that calling monitorProcess 3 three times on the same pair of processes will ensure that monitoring will stay in place until unmonitorProcess is called three times on the same pair of processes. If the monitee is not currently running, the monitor will be signalled immediately. See also MonitorAction.

unmonitorProcess :: ProcessId -> ProcessId -> MonitorAction -> ProcessM ()Source

Removes monitoring established by monitorProcess. Note that the type of monitoring, given in the third parameter, must match in order for monitoring to be removed. If monitoring has not already been established between these two processes, this function takes not action.

withMonitor :: ProcessId -> ProcessM a -> ProcessM aSource

Establishes temporary monitoring of another process. The process to be monitored is given in the first parameter, and the code to run in the second. If the given process goes down while the code in the second parameter is running, a process down message will be sent to the current process, which can be handled by matchProcessDown.

pingNode :: NodeId -> ProcessM BoolSource

Sends a small message to the specified node to determine if it's alive. If the node cannot be reached or does not respond within a time frame, the function will return False.

callRemote :: Serializable a => NodeId -> Closure (ProcessM a) -> ProcessM aSource

Invokes a function on a remote node. The function must be given by a closure. This function will block until the called function completes or the connection is broken.

terminate :: ProcessM aSource

Ends the current process in an orderly manner.

Config file

readConfig :: Bool -> Maybe FilePath -> IO ConfigSource

Reads in configuration data from external sources, specifically from the command line arguments and a configuration file. The first parameter to this function determines whether command-line arguments are consulted. If the second parameter is not Nothing then it should be the name of the configuration file; an exception will be thrown if the specified file does not exist. Usually, this function shouldn't be called directly, but rather from Remote.Init.remoteInit, which also takes into account environment variables. Options set by command-line parameters have the highest precedence, followed by options read from a configuration file; if a configuration option is not explicitly specified anywhere, a reasonable default is used. The configuration file has a format, wherein one configuration option is specified on each line; the first token on each line is the name of the configuration option, followed by whitespace, followed by its value. Lines beginning with # are comments. Thus:

 # This is a sample configuration file
 cfgHostName host3
 cfgKnownHosts host1 host2 host3 host4

Options may be specified on the command line similarly. Note that command-line arguments containing spaces must be quoted.

 ./MyProgram -cfgHostName=host3 -cfgKnownHosts='host1 host2 host3 host4'

data Config Source

The Config structure encapsulates the user-settable configuration options for each node. This settings are usually read in from a configuration file or from the executable's command line; in either case, see Remote.Init.remoteInit and readConfig

Constructors

Config 

Fields

cfgRole :: !String

The user-assigned role of this node determines what its initial behavior is and how it presents itself to its peers. Default to NODE

cfgHostName :: !HostName

The hostname, used as a basis for creating the name of the node. If unspecified, the OS will be queried. Since the hostname is part of the nodename, the computer must be accessible to other nodes using this name.

cfgListenPort :: !PortId

The TCP port on which to listen to for new connections. If unassigned or 0, the OS will assign a free port.

cfgLocalRegistryListenPort :: !PortId

The TCP port on which to communicate with the local node registry, or to start the local node registry if it isn't already running. This defaults to 38813 and shouldn't be changed unless you have prohibitive firewall rules

cfgPeerDiscoveryPort :: !PortId

The UDP port on which local peer discovery broadcasts are sent. Defaults to 38813, and only matters if you rely on dynamic peer discovery

cfgNetworkMagic :: !String

The unique identifying string for this network or application. Must not contain spaces. The uniqueness of this string ensures that multiple applications running on the same physical network won't accidentally communicate with each other. All nodes of your application should have the same network magic. Defaults to MAGIC

cfgKnownHosts :: ![String]

A list of hosts where nodes may be running. When Remote.Peer.getPeers or Remote.Peer.getPeerStatic is called, each host on this list will be queried for its nodes. Only matters if you rely on static peer discovery.

cfgRoundtripTimeout :: !Int

Microseconds to wait for a response from a system service on a remote node. If your network has high latency or congestion, you may need to increase this to avoid incorrect reports of node inaccessibility. 0 to wait indefinitely (not recommended).

cfgMaxOutgoing :: !Int

A limit on the number of simultaneous outgoing connections per node

cfgPromiseFlushDelay :: !Int

Time in microseconds before an in-memory promise is flushed to disk. 0 to disable disk flush entirely.

cfgPromisePrefix :: !String

Prepended to the filename of flushed promises.

cfgArgs :: [String]

Command-line arguments that are not part of the node configuration are placed here and can be examined by your application logConfig :: LogConfig

Instances

getCfgArgs :: ProcessM [String]Source

Returns command-line arguments provided to the executable, excluding any command line arguments that were processed by the framework.

Initialization

initNode :: Config -> Lookup -> IO (MVar Node)Source

Creates a new Node object, given the specified configuration (usually created by readConfig) and function metadata table (usually create by Remote.Call.registerCalls). You probably want to use Remote.Init.remoteInit instead of this lower-level function.

roleDispatch :: MVar Node -> (String -> ProcessM ()) -> IO ()Source

Given a Node (created by initNode), start execution of user-provided code by invoking the given function with the node's cfgRole string.

waitForThreads :: MVar Node -> IO ()Source

Blocks until all non-daemonic processes of the given node have ended. Usually called on the main thread of a program.

forkAndListenAndDeliver :: MVar Node -> Config -> IO ()Source

Starts a message-receive loop on the given node. You probably don't want to call this function yourself.

Closures

Debugging aids

Various internals, not for general use

localRegistryHello :: ProcessM ()Source

Contacts the local node registry and attempts to verify that it is alive. If the local node registry cannot be contacted, an exception will be thrown.

localRegistryRegisterNode :: ProcessM ()Source

Contacts the local node registry and attempts to register current node. You probably don't want to call this function yourself, as it's done for you in Remote.Init.remoteInit

localRegistryUnregisterNode :: ProcessM ()Source

Contacts the local node registry and attempts to unregister current node. You probably don't want to call this function yourself, as it's done for you in Remote.Init.remoteInit

System service processes, not for general use

standaloneLocalRegistry :: String -> IO ()Source

Every host on which a node is running also needs a node registry, which arbitrates those nodes can responds to peer queries. If no registry is running, one will be automatically started when the framework is started, but the registry can be started independently, also. This function does that.