remote-0.1.1: Cloud Haskell

Remote

Contents

Description

Cloud Haskell (previously Remote Haskell) is a distributed computing framework for Haskell. We can describe its interface as roughly two levels: the process layer, consisting of processes, messages, and fault monitoring; and the task layer, consisting of tasks, promises, and fault recovery. This summary module provides the most common interface functions for both layers, although advanced users might want to import names from the other constituent modules, as well.

Synopsis

The process layer

remoteInit :: Maybe FilePath -> [RemoteCallMetaData] -> (String -> ProcessM ()) -> IO ()Source

This is the usual way create a single node of distributed program. The intent is that remoteInit be called in your program's Main.main function. A typical call takes this form:

 main = remoteInit (Just "config") [Main.__remoteCallMetaData] initialProcess

This will:

  1. Read the configuration file config in the current directory or, if specified, from the file whose path is given by the environment variable RH_CONFIG. If the given file does not exist or is invalid, an exception will be thrown.
  2. Use the configuration given in the file as well as on the command-line to create a new node. The usual system processes will be started, including logging, discovery, and spawning.
  3. Compile-time metadata, generated by Remote.Call.remotable, will used for invoking closures. Metadata from each module must be explicitly mentioned.
  4. The function initialProcess will be called, given as a parameter a string indicating the value of the cfgRole setting of this node. initialProcess is provided by the user and provides an entrypoint for controlling node behavior on startup.

data ProcessM a Source

The monad ProcessM is the core of the process layer. Functions in the ProcessM monad may participate in messaging and create additional concurrent processes. You can create a ProcessM context from an IO context with the remoteInit function.

data NodeId Source

Identifies a node somewhere on the network. These can be queried from getPeers. See also getSelfNode

data ProcessId Source

Identifies a process somewhere on the network. These are produced by the spawn family of functions and consumed by send. When a process ends, its process ID ceases to be valid. See also getSelfPid

data MatchM q a Source

This monad provides the state and structure for matching received messages from the incoming message queue. It's the interface between the receive family of functions, and the match family, which together can express which messages can be accepted.

Instances

getSelfPid :: ProcessM ProcessIdSource

Returns the process ID of the current process.

getSelfNode :: ProcessM NodeIdSource

Returns the node ID of the node that the current process is running on.

send :: Serializable a => ProcessId -> a -> ProcessM ()Source

Sends a message to the given process. If the process isn't running or can't be accessed, this function will throw a TransmitException. The message must implement the Serializable interface.

sendQuiet :: Serializable a => ProcessId -> a -> ProcessM TransmitStatusSource

Like send, but in case of error returns a value rather than throw an exception.

spawn :: NodeId -> Closure (ProcessM ()) -> ProcessM ProcessIdSource

Start a process running the code, given as a closure, on the specified node. If successful, returns the process ID of the new process. If unsuccessful, throw a TransmitException.

spawnLocal :: ProcessM () -> ProcessM ProcessIdSource

Create a new process on the current node. Returns the new process's identifier. Unlike spawn, this function does not need a Closure or a NodeId.

spawnAnd :: NodeId -> Closure (ProcessM ()) -> AmSpawnOptions -> ProcessM ProcessIdSource

A variant of spawn that allows greater control over how the remote process is started.

spawnLink :: NodeId -> Closure (ProcessM ()) -> ProcessM ProcessIdSource

A variant of spawn that starts the remote process with bidirectoinal monitoring, as in linkProcess

callRemote :: Serializable a => NodeId -> Closure (ProcessM a) -> ProcessM aSource

Invokes a function on a remote node. The function must be given by a closure. This function will block until the called function completes or the connection is broken.

terminate :: ProcessM aSource

Ends the current process in an orderly manner.

expect :: Serializable a => ProcessM aSource

A simple way to receive messages. This will return the first message received of the specified type; if no such message is available, the function will block. Unlike the receive family of functions, this function does not allow the notion of choice in message extraction.

receive :: [MatchM q ()] -> ProcessM (Maybe q)Source

Examines the message queue of the current process, matching each message against each of the provided message pattern clauses (typically provided by a function from the match family). If a message matches, the corresponding handler is invoked and its result is returned. If no message matches, Nothing is returned.

receiveWait :: [MatchM q ()] -> ProcessM qSource

Examines the message queue of the current process, matching each message against each of the provided message pattern clauses (typically provided by a function from the match family). If a message matches, the corresponding handler is invoked and its result is returned. If no message matches, the function blocks until a matching message is received.

receiveTimeout :: Int -> [MatchM q ()] -> ProcessM (Maybe q)Source

Examines the message queue of the current process, matching each message against each of the provided message pattern clauses (typically provided by a function from the match family). If a message matches, the corresponding handler is invoked and its result is returned. If no message matches, the function blocks until a matching message is received, or until the specified time in microseconds has elapsed, at which point it will return Nothing. If the specified time is 0, this function is equivalent to receive.

match :: Serializable a => (a -> ProcessM q) -> MatchM q ()Source

Used to specify a message pattern in receiveWait and related functions. Only messages containing data of type a, where a is the argument to the user-provided function in the first parameter of match, will be removed from the queue, at which point the user-provided function will be invoked.

matchIf :: Serializable a => (a -> Bool) -> (a -> ProcessM q) -> MatchM q ()Source

Similar to match, but allows for additional criteria to be checked prior to message acceptance. Here, the first user-provided function operates as a filter, and the message will be accepted only if it returns True. Once it's been accepted, the second user-defined function is invoked, as in match

matchUnknown :: ProcessM q -> MatchM q ()Source

A catch-all variant of match that invokes user-provided code and will extact any message from the queue. This is useful for matching against messages that are not recognized. Since message matching patterns are evaluated in order, this function, if used, should be the last element in the list of matchers given to receiveWait and similar functions.

matchUnknownThrow :: MatchM q ()Source

A variant of matchUnknown that throws a UnknownMessageException if the process receives a message that isn't extracted by another message matcher. Equivalent to:

 matchUnknown (throw (UnknownMessageException "..."))

matchProcessDown :: ProcessId -> ProcessM q -> MatchM q ()Source

A specialized version of match (for use with receive, receiveWait and friends) for catching process down messages. This way processes can avoid waiting forever for a response from another process that has crashed. Intended to be used within a withMonitor block, e.g.:

 withMonitor apid $
   do send apid QueryMsg
      receiveWait 
      [
        match (\AnswerMsg -> return "ok"),
        matchProcessDown apid (return "aborted")   
      ]

logS :: LogSphere -> LogLevel -> String -> ProcessM ()Source

Generates a log entry, using the process's current logging configuration.

  • LogSphere indicates the subsystem generating this message. SYS in the case of componentes of the framework.
  • LogLevel indicates the importance of the message.
  • The third parameter is the log message.

Both of the first two parameters may be used to filter log output.

say :: String -> ProcessM ()Source

Uses the logging facility to produce non-filterable, programmatic output. Shouldn't be used for informational logging, but rather for application-level output.

type LogSphere = StringSource

Specifies the subsystem or region that is responsible for generating a given log entry. This is useful in conjunction with LogFilter to limit displayed log output to the particular area of your program that you are currently debugging. The SYS, TSK, and SAY spheres are used by the framework for messages relating to the Process layer, the Task layer, and the say function. The remainder of values are free for use at the application level.

data LogTarget Source

A preference as to what is done with log messages

Constructors

LtStdout

Messages will be output to the console; the default

LtForward NodeId

Log messages will be forwarded to the given node; please don't set up a loop

LtFile FilePath

Log messages will be appended to the given file

LtForwarded

Special value -- don't set this in your LogConfig!

data LogFilter Source

Specifies which log messages will be output. All log messages of importance below the current log level or not among the criterea given here will be suppressed. This type lets you limit displayed log messages to certain components.

data LogConfig Source

Expresses a current configuration of the logging subsystem, which determines which log messages to be output and where to send them when they are. Both processes and nodes have log configurations, set with setLogConfig and setNodeLogConfig respectively. The node log configuration is used for all processes that have not explicitly set their log configuration. Otherwise, the process log configuration takes priority.

Constructors

LogConfig 

Fields

logLevel :: LogLevel

The lowest message priority that will be displayed

logTarget :: LogTarget

Where to send messages

logFilter :: LogFilter

Other filtering

data LogLevel Source

Specifies the importance of a particular log entry. Can also be used to filter log output.

Constructors

LoSay

Non-suppressible application-level emission

LoFatal 
LoCritical 
LoImportant 
LoStandard

The default log level

LoInformation 
LoTrivial 

setLogConfig :: LogConfig -> ProcessM ()Source

Set the process's log configuration. This overrides any node-level log configuration

setNodeLogConfig :: LogConfig -> ProcessM ()Source

Sets the node's log configuration

getLogConfig :: ProcessM LogConfigSource

Gets the currently active log configuration for the current process; if the current process doesn't have a log configuration set, the process's log configuration will be returned

defaultLogConfig :: LogConfigSource

The default log configuration represents a starting point for setting your own configuration. It is:

 logLevel = LoStandard
 logTarget = LtStdout
 logFilter = LfAll

getCfgArgs :: ProcessM [String]Source

Returns command-line arguments provided to the executable, excluding any command line arguments that were processed by the framework.

data UnknownMessageException Source

Thrown by matchUnknownThrow in response to a message of a wrong type being received by a process

data ServiceException Source

Thrown by Remote.Process system services in response to some problem

Constructors

ServiceException String 

data TransmitException Source

Thrown by various network-related functions when communication with a host has failed

nameSet :: String -> ProcessM ()Source

Assigns a name to the current process. The name is local to the node. On each node, each process may have only one name, and each name may be given to only one node. If this function is called more than once by the same process, or called more than once with the name on a single node, it will throw a ServiceException. The PID of a named process can be queried later with nameQuery. When the named process ends, its name will again become available. One reason to use named processes is to create node-local state. This example lets each node have its own favorite color, which can be changed and queried.

 nodeFavoriteColor :: ProcessM ()
 nodeFavoriteColor =
  do nameSet "favorite_color"
     loop Blue
  where loop color =
      receiveWait
         [ match (\newcolor -> return newcolor),
           match (\pid -> send pid color >> return color)
         ] >>= loop

 setFavoriteColor :: NodeId -> Color -> ProcessM ()
 setFavoriteColor nid color =
  do (Just pid) <- nameQuery nid "favorite_color"
     send pid color

 getFavoriteColor :: NodeId -> ProcessM Color
 getFavoriteColor nid =
  do (Just pid) <- nameQuery nid "favorite_color"
     mypid <- getSelfPid
     send pid mypid
     expect

nameQuery :: NodeId -> String -> ProcessM (Maybe ProcessId)Source

Query the PID of a named process on a particular node. If no process of that name exists, or if that process has ended, this function returns Nothing.

nameQueryOrStart :: NodeId -> String -> Closure (ProcessM ()) -> ProcessM ProcessIdSource

Similar to nameQuery but if the named process doesn't exist, it will be started from the given closure. If the process is already running, the closure will be ignored.

linkProcess :: ProcessId -> ProcessM ()Source

Establishes bidirectional abnormal termination monitoring between the current process and another. Monitoring established with linkProcess is bidirectional and signals only in the event of abnormal termination. In other words, linkProcess a is equivalent to:

 monitorProcess mypid a MaLinkError
 monitorProcess a mypid MaLinkError

monitorProcess :: ProcessId -> ProcessId -> MonitorAction -> ProcessM ()Source

Establishes unidirectional processing of another process. The format is:

 monitorProcess monitor monitee action

Here,

  • monitor is the process that will be notified if the monitee goes down
  • monitee is the process that will be monitored
  • action determines how the monitor will be notified

Monitoring will remain in place until one of the processes ends or until unmonitorProcess is called. Calls to monitorProcess are cumulative, such that calling monitorProcess 3 three times on the same pair of processes will ensure that monitoring will stay in place until unmonitorProcess is called three times on the same pair of processes. If the monitee is not currently running, the monitor will be signalled immediately. See also MonitorAction.

unmonitorProcess :: ProcessId -> ProcessId -> MonitorAction -> ProcessM ()Source

Removes monitoring established by monitorProcess. Note that the type of monitoring, given in the third parameter, must match in order for monitoring to be removed. If monitoring has not already been established between these two processes, this function takes not action.

withMonitor :: ProcessId -> ProcessM a -> ProcessM aSource

Establishes temporary monitoring of another process. The process to be monitored is given in the first parameter, and the code to run in the second. If the given process goes down while the code in the second parameter is running, a process down message will be sent to the current process, which can be handled by matchProcessDown.

data MonitorAction Source

The different kinds of monitoring available between processes.

Constructors

MaMonitor

MaMonitor means that the monitor process will be sent a ProcessMonitorException message when the monitee terminates for any reason.

MaLink

MaLink means that the monitor process will receive an asynchronous exception of type ProcessMonitorException when the monitee terminates for any reason

MaLinkError

MaLinkError means that the monitor process will receive an asynchronous exception of type ProcessMonitorException when the monitee terminates abnormally

data ProcessMonitorException Source

The main form of notification to a monitoring process that a monitored process has terminated. This data structure can be delivered to the monitor either as a message (if the monitor is of type MaMonitor) or as an asynchronous exception (if the monitor is of type MaLink or MaLinkError). It contains the PID of the monitored process and the reason for its nofication.

getPeers :: ProcessM PeerInfoSource

Returns information about all nodes on the current network that this node knows about. This function combines dynamic and static mechanisms. See documentation on getPeersStatic and getPeersDynamic for more info. This function depends on the configuration values cfgKnownHosts and cfgPeerDiscoveryPort.

findPeerByRole :: PeerInfo -> String -> [NodeId]Source

Given a PeerInfo returned by getPeersDynamic or getPeersStatic, give a list of nodes registered as a particular role. If no nodes of that role are found, the empty list is returned.

type PeerInfo = Map String [NodeId]Source

Created by Remote.Peer.getPeers, this maps each role to a list of nodes that have that role. It can be examined directly or queried with findPeerByRole.

remotable :: [Name] -> Q [Dec]Source

A compile-time macro to provide easy invocation of closures. To use this, follow the following steps:

  1. First, enable Template Haskell in the module:
 {-# LANGUAGE TemplateHaskell #-}
 module Main where
 import Remote.Call (remotable)
    ...
  1. Define your functions normally. Restrictions: function's type signature must be explicitly declared; no polymorphism; all parameters must implement Serializable; return value must be pure, or in one of the ProcessM, TaskM, or IO monads; probably other restrictions as well.
 greet :: String -> ProcessM ()
 greet name = say ("Hello, "++name)
 badFib :: Integer -> Integer
 badFib 0 = 1
 badFib 1 = 1
 badFib n = badFib (n-1) + badFib (n-2)
  1. Use the remotable function to automagically generate stubs and closure generators for your functions:
 $( remotable ['greet, 'badFib] )

remotable may be used only once per module.

  1. When you call remoteInit (usually the first thing in your program), be sure to give it the automagically generated function lookup tables from all modules that use remotable:
 main = remoteInit (Just "config") [Main.__remoteCallMetaData, OtherModule.__remoteCallMetaData] initialProcess
  1. Now you can invoke your functions remotely. When a function expects a closure, give it the name of the generated closure, rather than the name of the original function. If the function takes parameters, so will the closure. To start the greet function on someNode:
 spawn someNode (greet__closure "John Baptist")

Note that we say greet__closure rather than just greet. If you prefer, you can use mkClosure instead, i.e. $(mkClosure 'greet), which will expand to greet__closure. To calculate a Fibonacci number remotely:

 val <- callRemotePure someNode (badFib__closure 5)

type RemoteCallMetaData = Lookup -> LookupSource

Data of this type is generated at compile-time by remotable and can be used with registerCalls and remoteInit to create a metadata lookup table, Lookup. The name __remoteCallMetaData will be present in any module that uses remotable.

data Closure a Source

A data type representing a closure, that is, a function with its environment. In spirit, this is actually:

   data Closure a where
     Closure :: Serializable v => Static (v -> a) -> v -> Closure a     

where the Static type wraps a function with no non-static free variables. We simulate this behavior by identifying top-level functions as strings. See the paper for clarification.

genericPut :: Data a => a -> PutSource

Data types that can be used in messaging must be serializable, which means that they must implement the get and put methods from Binary. If you are too lazy to write these functions yourself, you can delegate responsibility to this function. It's usually sufficient to do something like this:

 import Data.Data (Data)
 import Data.Typeable (Typeable)
 import Data.Binary (Binary, get, put)
 data MyType = MkMyType Foobar Int [(String, Waddle Baz)]
             | MkSpatula
                  deriving (Data, Typeable)
 instance Binary MyType where
    put = genericPut
    get = genericGet

genericGet :: Data a => Get aSource

This is the counterpart genericPut

class (Binary a, Typeable a) => Serializable a Source

Data that can be sent as a message must implement this class. The class has no functions of its own, but instead simply requires that the type implement both Typeable and Binary. Typeable can usually be derived automatically. Binary requires the put and get functions, which can be easily implemented by hand, or you can use the genericGet and genericPut flavors, which will work automatically for types implementing Data.

Instances

Channels

data SendPort a Source

A channel is a unidirectional communication pipeline with two ends: a sending port, and a receiving port. This is the sending port. A process holding this value can insert messages into the channel. SendPorts themselves can also be sent to other processes. The other side of the channel is the ReceivePort.

data ReceivePort a Source

A process holding a ReceivePort can extract messages from the channel, which we inserted by the holder(s) of the corresponding SendPort. Critically, ReceivePorts, unlike SendPorts, are not serializable. This means that you can only receive messages through a channel on the node on which the channel was created.

newChannel :: Serializable a => ProcessM (SendPort a, ReceivePort a)Source

Create a new channel, and returns both the SendPort and ReceivePort thereof.

sendChannel :: Serializable a => SendPort a -> a -> ProcessM ()Source

Inserts a new value into the channel.

receiveChannel :: Serializable a => ReceivePort a -> ProcessM aSource

Extract a value from the channel, in FIFO order.

combinedChannelAction :: Serializable a => ReceivePort a -> (a -> b) -> CombinedChannelAction bSource

Specifies a port and an adapter for combining ports via combinePortsBiased and combinePortsRR.

combinePortsBiased :: Serializable b => [CombinedChannelAction b] -> ProcessM (ReceivePort b)Source

This function lets us respond to messages on multiple channels by combining several ReceivePorts into one. The resulting port is the sum of the input ports, and will extract messages from all of them in FIFO order. The input ports are specified by combinedChannelAction, which also gives a converter function. After combining the underlying receive ports can still be used independently, as well. We provide two ways to combine ports, which differ bias they demonstrate in returning messages when more than one underlying channel is nonempty. combinePortsBiased will check ports in the order given by its argument, and so if the first channel always was a message waiting, it will. starve the other channels. The alternative is combinePortsRR.

combinePortsRR :: Serializable b => [CombinedChannelAction b] -> ProcessM (ReceivePort b)Source

See combinePortsBiased. This function differs from that one in that the order that the underlying ports are checked is rotated with each invocation, guaranteeing that, given enough invocations, every channel will have a chance to contribute a message.

mergePortsBiased :: Serializable a => [ReceivePort a] -> ProcessM (ReceivePort a)Source

Similar to combinePortsBiased, with the difference that the the underlying ports must be of the same type, and you don't have the opportunity to provide an adapter function.

mergePortsRR :: Serializable a => [ReceivePort a] -> ProcessM (ReceivePort a)Source

Similar to combinePortsRR, with the difference that the the underlying ports must be of the same type, and you don't have the opportunity to provide an adapter function.

terminateChannel :: Serializable a => ReceivePort a -> ProcessM ()Source

Terminate a channel. After calling this function, receiveChannel on that port (or on any combined port based on it) will either fail or block indefinitely, and sendChannel on the corresponding SendPort will fail. Any unread messages remaining in the channel will be lost.

The task layer

runTask :: TaskM a -> ProcessM aSource

Starts a new context for executing a TaskM environment. The node on which this function is run becomes a new master in a Task application; as a result, the application should only call this function once. The master will attempt to control all nodes that it can find; if you are going to be running more than one CH application on a single network, be sure to give each application a different network magic (via cfgNetworkMagic). The master TaskM environment created by this function can then spawn other threads, locally or remotely, using newPromise and friends.

data Promise a Source

The basic data type for expressing data dependence in the TaskM monad. A Promise represents a value that may or may not have been computed yet; thus, it's like a distributed thunk (in the sense of a non-strict unit of evaluation). These are created by newPromise and friends, and the underlying value can be gotten with readPromise.

newPromise :: Serializable a => Closure (TaskM a) -> TaskM (Promise a)Source

Given a function (expressed here as a closure, see Remote.Call) that computes a value, returns a token identifying that value. This token, a Promise can be moved about even if the value hasn't been computed yet. The computing function will be started somewhere among the nodes visible to the current master, preferring those nodes that correspond to the defaultLocality. Afterwards, attempts to redeem the promise with readPromise will contact the node where the function is executing.

newPromiseHere :: Serializable a => Closure (TaskM a) -> TaskM (Promise a)Source

A variant of newPromise that prefers to start the computing function on the same node as the caller. Useful if you plan to use the resulting value locally.

newPromiseAtRole :: Serializable a => String -> Closure (TaskM a) -> TaskM (Promise a)Source

A variant of newPromise that prefers to start the computing functions on some set of nodes that have a given role (assigned by the cfgRole configuration option).

newPromiseNear :: (Serializable a, Serializable b) => Promise b -> Closure (TaskM a) -> TaskM (Promise a)Source

A variant of newPromise that prefers to start the computing function on the same node where some other promise lives. The other promise is not evaluated.

toPromise :: Serializable a => a -> TaskM (Promise a)Source

Like newPromise, but creates a promise whose values is already known. In other words, it puts a given, already-calculated value in a promise. Conceptually (but not syntactically, due to closures), you can consider it like this:

 toPromise a = newPromise (return a)

toPromiseImm :: Serializable a => a -> TaskM (Promise a)Source

Creates an immediate promise, which is to say, a promise in name only. Unlike a regular promise (created by toPromise), this kind of promise contains the value directly. The advantage is that promise redemption is very fast, requiring no network communication. The downside is that it the underlying data will be copied along with the promise. Useful only for small data.

readPromise :: Serializable a => Promise a -> TaskM aSource

Given a promise, gets the value that is being calculated. If the calculation has finished, the owning node will be contacted and the data moved to the current node. If the calculation has not finished, this function will block until it has. If the calculation failed by throwing an exception (e.g. divide by zero), then this function will throw an excption as well (a TaskException). If the node owning the promise is not accessible, the calculation will be restarted.

tlogS :: LogSphere -> LogLevel -> String -> TaskM ()Source

Writes various kinds of messages to the Remote.Process log.

tsay :: String -> TaskM ()Source

A Task-monadic version of say. Puts text messages in the log.

data MapReduce rawinput input middle1 middle2 result Source

A data structure that stores the important user-provided functions that are the namesakes of the MapReduce algorithm. The number of mapper processes can be controlled by the user by controlling the length of the string returned by mtChunkify. The number of reducer promises is controlled by the number of values values returned by shuffler. The user must provide their own mapper and reducer. For many cases, the default chunkifier (chunkify) and shuffler (shuffle) are adequate.

Constructors

MapReduce 

Fields

mtMapper :: input -> Closure (TaskM [middle1])
 
mtReducer :: middle2 -> Closure (TaskM result)
 
mtChunkify :: rawinput -> [input]
 
mtShuffle :: [middle1] -> [middle2]
 

mapReduce :: (Serializable i, Serializable k, Serializable m, Serializable r) => MapReduce ri i k m r -> ri -> TaskM [r]Source

The MapReduce algorithm, implemented in a very simple form on top of the Task layer. Its use depends on four user-determined data types:

  • input -- The data type provided as the input to the algorithm as a whole and given to the mapper.
  • middle1 -- The output of the mapper. This may include some key which is used by the shuffler to allocate data to reducers. If you use the default shuffler, shuffle, this type must have the form Ord a => (a,b).
  • middle2 -- The output of the shuffler. The default shuffler emits a type in the form Ord => (a,[b]). Each middle2 output by shuffler is given to a separate reducer.
  • result -- The output of the reducer, upon being given a bunch of middles.

chunkify :: Int -> [a] -> [[a]]Source

A convenient way to provide the mtChunkify function as part of mapReduce.

shuffle :: Ord a => [(a, b)] -> [(a, [b])]Source

A convenient way to provide the mtShuffle function as part of mapReduce.