distributed-process-0.4.0.1: Cloud Haskell: Erlang-style concurrency in Haskell

Safe HaskellNone

Control.Distributed.Process.Internal.Primitives

Contents

Description

Cloud Haskell primitives

We define these in a separate module so that we don't have to rely on the closure combinators

Synopsis

Basic messaging

send :: Serializable a => ProcessId -> a -> Process ()Source

Send a message

expect :: forall a. Serializable a => Process aSource

Wait for a message of a specific type

Channels

newChan :: Serializable a => Process (SendPort a, ReceivePort a)Source

Create a new typed channel

sendChan :: Serializable a => SendPort a -> a -> Process ()Source

Send a message on a typed channel

receiveChan :: Serializable a => ReceivePort a -> Process aSource

Wait for a message on a typed channel

mergePortsBiased :: Serializable a => [ReceivePort a] -> Process (ReceivePort a)Source

Merge a list of typed channels.

The result port is left-biased: if there are messages available on more than one port, the first available message is returned.

mergePortsRR :: Serializable a => [ReceivePort a] -> Process (ReceivePort a)Source

Like mergePortsBiased, but with a round-robin scheduler (rather than left-biased)

Advanced messaging

data Match b Source

Opaque type used in receiveWait and receiveTimeout

receiveWait :: [Match b] -> Process bSource

Test the matches in order against each message in the queue

receiveTimeout :: Int -> [Match b] -> Process (Maybe b)Source

Like receiveWait but with a timeout.

If the timeout is zero do a non-blocking check for matching messages. A non-zero timeout is applied only when waiting for incoming messages (that is, after we have checked the messages that are already in the mailbox).

match :: forall a b. Serializable a => (a -> Process b) -> Match bSource

Match against any message of the right type

matchIf :: forall a b. Serializable a => (a -> Bool) -> (a -> Process b) -> Match bSource

Match against any message of the right type that satisfies a predicate

matchUnknown :: Process b -> Match bSource

Remove any message from the queue

matchAny :: forall b. (AbstractMessage -> Process b) -> Match bSource

Match against an arbitrary message

Process management

terminate :: Process aSource

Terminate (throws a ProcessTerminationException)

getSelfPid :: Process ProcessIdSource

Our own process ID

getSelfNode :: Process NodeIdSource

Get the node ID of our local node

Monitoring and linking

link :: ProcessId -> Process ()Source

Link to a remote process (asynchronous)

Note that link provides unidirectional linking (see spawnSupervised). Linking makes no distinction between normal and abnormal termination of the remote process.

unlink :: ProcessId -> Process ()Source

Remove a link

This is synchronous in the sense that once it returns you are guaranteed that no exception will be raised if the remote process dies. However, it is asynchronous in the sense that we do not wait for a response from the remote node.

monitor :: ProcessId -> Process MonitorRefSource

Monitor another process (asynchronous)

unmonitor :: MonitorRef -> Process ()Source

Remove a monitor

This has the same synchronous/asynchronous nature as unlink.

Logging

say :: String -> Process ()Source

Log a string

say message sends a message (time, pid of the current process, message) to the process registered as logger. By default, this process simply sends the string to stderr. Individual Cloud Haskell backends might replace this with a different logger process, however.

Registry

register :: String -> ProcessId -> Process ()Source

Register a process with the local registry (asynchronous).

The process to be registered does not have to be local itself.

unregister :: String -> Process ()Source

Remove a process from the local registry (asynchronous).

whereis :: String -> Process (Maybe ProcessId)Source

Query the local process registry

nsend :: Serializable a => String -> a -> Process ()Source

Named send to a process in the local registry (asynchronous)

registerRemote :: NodeId -> String -> ProcessId -> Process ()Source

Register a process with a remote registry (asynchronous).

The process to be registered does not have to live on the same remote node.

unregisterRemote :: NodeId -> String -> Process ()Source

Remove a process from a remote registry (asynchronous).

whereisRemoteAsync :: NodeId -> String -> Process ()Source

Query a remote process registry (asynchronous)

Reply will come in the form of a WhereIsReply message.

There is currently no synchronous version of whereisRemoteAsync: if you implement one yourself, be sure to take into account that the remote node might die or get disconnect before it can respond (i.e. you should use monitorNode and take appropriate action when you receive a NodeMonitorNotification).

nsendRemote :: Serializable a => NodeId -> String -> a -> Process ()Source

Named send to a process in a remote registry (asynchronous)

Closures

unClosure :: Typeable a => Closure a -> Process aSource

Resolve a closure

unStatic :: Typeable a => Static a -> Process aSource

Resolve a static value

Exception handling

catch :: Exception e => Process a -> (e -> Process a) -> Process aSource

Lift catch

mask :: ((forall a. Process a -> Process a) -> Process b) -> Process bSource

Lift mask

bracket :: Process a -> (a -> Process b) -> (a -> Process c) -> Process cSource

Lift bracket

Auxiliary API

expectTimeout :: forall a. Serializable a => Int -> Process (Maybe a)Source

Like expect but with a timeout

spawnAsync :: NodeId -> Closure (Process ()) -> Process SpawnRefSource

Asynchronous version of spawn

(spawn is defined in terms of spawnAsync and expect)

linkNode :: NodeId -> Process ()Source

Link to a node (asynchronous)

linkPort :: SendPort a -> Process ()Source

Link to a channel (asynchronous)

unlinkNode :: NodeId -> Process ()Source

Remove a node link

This has the same synchronous/asynchronous nature as unlink.

unlinkPort :: SendPort a -> Process ()Source

Remove a channel (send port) link

This has the same synchronous/asynchronous nature as unlink.

monitorNode :: NodeId -> Process MonitorRefSource

Monitor a node (asynchronous)

monitorPort :: forall a. Serializable a => SendPort a -> Process MonitorRefSource

Monitor a typed channel (asynchronous)

Reconnecting

reconnect :: ProcessId -> Process ()Source

Cloud Haskell provides the illusion of connection-less, reliable, ordered message passing. However, when network connections get disrupted this illusion cannot always be maintained. Once a network connection breaks (even temporarily) no further communication on that connection will be possible. For example, if process A sends a message to process B, and A is then notified (by monitor notification) that it got disconnected from B, A will not be able to send any further messages to B, unless A explicitly indicates that it is acceptable to attempt to reconnect to B using the Cloud Haskell reconnect primitive.

Importantly, when A calls reconnect it acknowledges that some messages to B might have been lost. For instance, if A sends messages m1 and m2 to B, then receives a monitor notification that its connection to B has been lost, calls reconnect and then sends m3, it is possible that B will receive m1 and m3 but not m2.

Note that reconnect does not mean reconnect now but rather /it is okay to attempt to reconnect on the next send/. In particular, if no further communication attempts are made to B then A can use reconnect to clean up its connection to B.

reconnectPort :: SendPort a -> Process ()Source

Reconnect to a sendport. See reconnect for more information.