distributed-process-0.4.2: Cloud Haskell: Erlang-style concurrency in Haskell

Safe HaskellNone

Control.Distributed.Process.Internal.Primitives

Contents

Description

Cloud Haskell primitives

We define these in a separate module so that we don't have to rely on the closure combinators

Synopsis

Basic messaging

send :: Serializable a => ProcessId -> a -> Process ()Source

Send a message

expect :: forall a. Serializable a => Process aSource

Wait for a message of a specific type

Channels

newChan :: Serializable a => Process (SendPort a, ReceivePort a)Source

Create a new typed channel

sendChan :: Serializable a => SendPort a -> a -> Process ()Source

Send a message on a typed channel

receiveChan :: Serializable a => ReceivePort a -> Process aSource

Wait for a message on a typed channel

mergePortsBiased :: Serializable a => [ReceivePort a] -> Process (ReceivePort a)Source

Merge a list of typed channels.

The result port is left-biased: if there are messages available on more than one port, the first available message is returned.

mergePortsRR :: Serializable a => [ReceivePort a] -> Process (ReceivePort a)Source

Like mergePortsBiased, but with a round-robin scheduler (rather than left-biased)

Advanced messaging

data Match b Source

Opaque type used in receiveWait and receiveTimeout

receiveWait :: [Match b] -> Process bSource

Test the matches in order against each message in the queue

receiveTimeout :: Int -> [Match b] -> Process (Maybe b)Source

Like receiveWait but with a timeout.

If the timeout is zero do a non-blocking check for matching messages. A non-zero timeout is applied only when waiting for incoming messages (that is, after we have checked the messages that are already in the mailbox).

match :: forall a b. Serializable a => (a -> Process b) -> Match bSource

Match against any message of the right type

matchIf :: forall a b. Serializable a => (a -> Bool) -> (a -> Process b) -> Match bSource

Match against any message of the right type that satisfies a predicate

matchUnknown :: Process b -> Match bSource

Remove any message from the queue

data AbstractMessage Source

Represents a received message and provides two basic operations on it.

Constructors

AbstractMessage 

Fields

forward :: ProcessId -> Process ()

forward the message to ProcessId

maybeHandleMessage :: forall a b. Serializable a => (a -> Process b) -> Process (Maybe b)

Handle the message. If the type of the message matches the type of the first argument to the supplied expression, then the expression will be evaluated against it. If this runtime type checking fails, then Nothing will be returned to indicate the fact. If the check succeeds and evaluation proceeds however, the resulting value with be wrapped with Just.

matchAny :: forall b. (AbstractMessage -> Process b) -> Match bSource

Match against an arbitrary message. matchAny removes the first available message from the process mailbox, and via the AbstractMessage type, supports forwarding or handling the message if it is of the correct type. If not of the right type, then the AbstractMessage maybeHandleMessage function will not evaluate the supplied expression, but the message will still have been removed from the process mailbox!

matchAnyIf :: forall a b. Serializable a => (a -> Bool) -> (AbstractMessage -> Process b) -> Match bSource

Match against an arbitrary message. matchAnyIf will only remove the message from the process mailbox, if the supplied condition matches. The success (or failure) of runtime type checks in maybeHandleMessage does not count here, i.e., if the condition evaluates to True then the message will be removed from the process mailbox and decoded, but that does not guarantee that an expression passed to maybeHandleMessage will pass the runtime type checks and therefore be evaluated. If the types do not match up, then maybeHandleMessage returns Nothing.

Process management

terminate :: Process aSource

Terminate immediately (throws a ProcessTerminationException)

die :: Serializable a => a -> Process bSource

Die immediately - throws a ProcessExitException with the given reason.

kill :: ProcessId -> String -> Process ()Source

Forceful request to kill a process. Where exit provides an exception that can be caught and handled, kill throws an unexposed exception type which cannot be handled explicitly (by type).

exit :: Serializable a => ProcessId -> a -> Process ()Source

Graceful request to exit a process. Throws ProcessExitException with the supplied reason encoded as a message. Any exit signal raised in this manner can be handled using the catchExit family of functions.

catchExit :: forall a b. (Show a, Serializable a) => Process b -> (ProcessId -> a -> Process b) -> Process bSource

Catches ProcessExitException. The handler will not be applied unless its type matches the encoded data stored in the exception (see the reason argument given to the exit primitive). If the handler cannot be applied, the exception will be re-thrown.

To handle ProcessExitException without regard for reason, see catch. To handle multiple reasons of differing types, see catchesExit.

catchesExit :: Process b -> [ProcessId -> AbstractMessage -> Process (Maybe b)] -> Process bSource

Lift catches (almost).

As ProcessExitException stores the exit reason as a typed, encoded message, a handler must accept an input of the expected type. In order to handle a list of potentially different handlers (and therefore input types), a handler passed to catchesExit must accept AbstractMessage and return Maybe (i.e., Just p if it handled the exit reason, otherwise Nothing).

See maybeHandleMessage and AsbtractMessage for more details.

getSelfPid :: Process ProcessIdSource

Our own process ID

getSelfNode :: Process NodeIdSource

Get the node ID of our local node

getProcessInfo :: ProcessId -> Process (Maybe ProcessInfo)Source

Get information about the specified process

Monitoring and linking

link :: ProcessId -> Process ()Source

Link to a remote process (asynchronous)

When process A links to process B (that is, process A calls link pidB) then an asynchronous exception will be thrown to process A when process B terminates (normally or abnormally), or when process A gets disconnected from process B. Although it is technically possible to catch these exceptions, chances are if you find yourself trying to do so you should probably be using monitor rather than link. In particular, code such as

 link pidB   -- Link to process B
 expect      -- Wait for a message from process B
 unlink pidB -- Unlink again

doesn't quite do what one might expect: if process B sends a message to process A, and subsequently terminates, then process A might or might not be terminated too, depending on whether the exception is thrown before or after the unlink (i.e., this code has a race condition).

Linking is all-or-nothing: A is either linked to B, or it's not. A second call to link has no effect.

Note that link provides unidirectional linking (see spawnSupervised). Linking makes no distinction between normal and abnormal termination of the remote process.

unlink :: ProcessId -> Process ()Source

Remove a link

This is synchronous in the sense that once it returns you are guaranteed that no exception will be raised if the remote process dies. However, it is asynchronous in the sense that we do not wait for a response from the remote node.

monitor :: ProcessId -> Process MonitorRefSource

Monitor another process (asynchronous)

When process A monitors process B (that is, process A calls monitor pidB) then process A will receive a ProcessMonitorNotification when process B terminates (normally or abnormally), or when process A gets disconnected from process B. You receive this message like any other (using expect); the notification includes a reason (DiedNormal, DiedException, DiedDisconnect, etc.).

Every call to monitor returns a new monitor reference MonitorRef; if multiple monitors are set up, multiple notifications will be delivered and monitors can be disabled individually using unmonitor.

unmonitor :: MonitorRef -> Process ()Source

Remove a monitor

This has the same synchronous/asynchronous nature as unlink.

withMonitor :: ProcessId -> Process a -> Process aSource

Establishes temporary monitoring of another process.

withMonitor pid code sets up monitoring of pid for the duration of code. Note: although monitoring is no longer active when withMonitor returns, there might still be unreceived monitor messages in the queue.

Logging

say :: String -> Process ()Source

Log a string

say message sends a message (time, pid of the current process, message) to the process registered as logger. By default, this process simply sends the string to stderr. Individual Cloud Haskell backends might replace this with a different logger process, however.

Registry

register :: String -> ProcessId -> Process ()Source

Register a process with the local registry (asynchronous). This version will wait until a response is gotten from the management process. The name must not already be registered. The process need not be on this node. A bad registration will result in a ProcessRegistrationException

The process to be registered does not have to be local itself.

reregister :: String -> ProcessId -> Process ()Source

Like register, but will replace an existing registration. The name must already be registered.

unregister :: String -> Process ()Source

Remove a process from the local registry (asynchronous). This version will wait until a response is gotten from the management process. The name must already be registered.

whereis :: String -> Process (Maybe ProcessId)Source

Query the local process registry

nsend :: Serializable a => String -> a -> Process ()Source

Named send to a process in the local registry (asynchronous)

registerRemoteAsync :: NodeId -> String -> ProcessId -> Process ()Source

Register a process with a remote registry (asynchronous).

The process to be registered does not have to live on the same remote node. Reply wil come in the form of a RegisterReply message

See comments in whereisRemoteAsync

unregisterRemoteAsync :: NodeId -> String -> Process ()Source

Remove a process from a remote registry (asynchronous).

Reply wil come in the form of a RegisterReply message

See comments in whereisRemoteAsync

whereisRemoteAsync :: NodeId -> String -> Process ()Source

Query a remote process registry (asynchronous)

Reply will come in the form of a WhereIsReply message.

There is currently no synchronous version of whereisRemoteAsync: if you implement one yourself, be sure to take into account that the remote node might die or get disconnect before it can respond (i.e. you should use monitorNode and take appropriate action when you receive a NodeMonitorNotification).

nsendRemote :: Serializable a => NodeId -> String -> a -> Process ()Source

Named send to a process in a remote registry (asynchronous)

Closures

unClosure :: Typeable a => Closure a -> Process aSource

Resolve a closure

unStatic :: Typeable a => Static a -> Process aSource

Resolve a static value

Exception handling

catch :: Exception e => Process a -> (e -> Process a) -> Process aSource

Lift catch

data Handler a Source

You need this when using catches

Constructors

forall e . Exception e => Handler (e -> Process a) 

Instances

try :: Exception e => Process a -> Process (Either e a)Source

Lift try

mask :: ((forall a. Process a -> Process a) -> Process b) -> Process bSource

Lift mask

bracket :: Process a -> (a -> Process b) -> (a -> Process c) -> Process cSource

Lift bracket

Auxiliary API

expectTimeout :: forall a. Serializable a => Int -> Process (Maybe a)Source

Like expect but with a timeout

receiveChanTimeout :: Serializable a => Int -> ReceivePort a -> Process (Maybe a)Source

Like receiveChan but with a timeout. If the timeout is 0, do a non-blocking check for a message.

spawnAsync :: NodeId -> Closure (Process ()) -> Process SpawnRefSource

Asynchronous version of spawn

(spawn is defined in terms of spawnAsync and expect)

linkNode :: NodeId -> Process ()Source

Link to a node (asynchronous)

linkPort :: SendPort a -> Process ()Source

Link to a channel (asynchronous)

unlinkNode :: NodeId -> Process ()Source

Remove a node link

This has the same synchronous/asynchronous nature as unlink.

unlinkPort :: SendPort a -> Process ()Source

Remove a channel (send port) link

This has the same synchronous/asynchronous nature as unlink.

monitorNode :: NodeId -> Process MonitorRefSource

Monitor a node (asynchronous)

monitorPort :: forall a. Serializable a => SendPort a -> Process MonitorRefSource

Monitor a typed channel (asynchronous)

Reconnecting

reconnect :: ProcessId -> Process ()Source

Cloud Haskell provides the illusion of connection-less, reliable, ordered message passing. However, when network connections get disrupted this illusion cannot always be maintained. Once a network connection breaks (even temporarily) no further communication on that connection will be possible. For example, if process A sends a message to process B, and A is then notified (by monitor notification) that it got disconnected from B, A will not be able to send any further messages to B, unless A explicitly indicates that it is acceptable to attempt to reconnect to B using the Cloud Haskell reconnect primitive.

Importantly, when A calls reconnect it acknowledges that some messages to B might have been lost. For instance, if A sends messages m1 and m2 to B, then receives a monitor notification that its connection to B has been lost, calls reconnect and then sends m3, it is possible that B will receive m1 and m3 but not m2.

Note that reconnect does not mean reconnect now but rather /it is okay to attempt to reconnect on the next send/. In particular, if no further communication attempts are made to B then A can use reconnect to clean up its connection to B.

reconnectPort :: SendPort a -> Process ()Source

Reconnect to a sendport. See reconnect for more information.

Tracing/Debugging

trace :: String -> Process ()Source

Send a message to the internal (system) trace facility. If tracing is enabled, this will create a custom trace event. Note that several Cloud Haskell sub-systems also generate trace events for informational/debugging purposes, thus traces generated this way will not be the only output seen.

Just as with the Debug.Trace module, this is a debugging/tracing facility for use in development, and should not be used in a production setting - which is why the default behaviour is to trace to the GHC eventlog. For a general purpose logging facility, you should consider say.

Trace events can be written to the GHC event log, a text file, or to the standard system logger process (see say). The default behaviour for writing to the eventlog requires specific intervention to work, without which traces are silently dropped/ignored and no output will be generated. The GHC eventlog documentation provides information about enabling, viewing and working with event traces: http://hackage.haskell.org/trac/ghc/wiki/EventLog.

When a new local node is started, the contents of the environment variable DISTRIBUTED_PROCESS_TRACE_FILE are checked for a valid file path. If this exists and the file can be opened for writing, all trace output will be directed thence. If the environment variable is empty, the path invalid, or the file unavailable for writing - e.g., because another node has already started tracing to it - then the DISTRIBUTED_PROCESS_TRACE_CONSOLE environment variable is checked for any non-empty value. If this is set, then all trace output will be directed to the system logger process. If neither evironment variable provides a valid trace configuration, all internal traces are written to Debug.Trace.traceEventIO, which writes to the GHC eventlog.

Users of the simplelocalnet Cloud Haskell backend should also note that because the trace file option only supports trace output from a single node (so as to avoid interleaving), a file trace configured for the master node will prevent slaves from tracing to the file and they will fall back to using the console/say or eventlog instead.