| Safe Haskell | None | 
|---|
Control.Distributed.Process
Contents
Description
- Cloud Haskell
This is an implementation of Cloud Haskell, as described in Towards Haskell in the Cloud by Jeff Epstein, Andrew Black, and Simon Peyton Jones (http://research.microsoft.com/en-us/um/people/simonpj/papers/parallel/), although some of the details are different. The precise message passing semantics are based on A unified semantics for future Erlang by Hans Svensson, Lars-Åke Fredlund and Clara Benac Earle.
For a detailed description of the package and other reference materials, please see the distributed-process wiki page on github: https://github.com/haskell-distributed/distributed-process/wiki.
- data ProcessId
- data NodeId
- data Process a
- data SendPortId
- processNodeId :: ProcessId -> NodeId
- sendPortProcessId :: SendPortId -> ProcessId
- liftIO :: MonadIO m => forall a. IO a -> m a
- send :: Serializable a => ProcessId -> a -> Process ()
- expect :: forall a. Serializable a => Process a
- expectTimeout :: forall a. Serializable a => Int -> Process (Maybe a)
- data ReceivePort a
- data SendPort a
- sendPortId :: SendPort a -> SendPortId
- newChan :: Serializable a => Process (SendPort a, ReceivePort a)
- sendChan :: Serializable a => SendPort a -> a -> Process ()
- receiveChan :: Serializable a => ReceivePort a -> Process a
- receiveChanTimeout :: Serializable a => Int -> ReceivePort a -> Process (Maybe a)
- mergePortsBiased :: Serializable a => [ReceivePort a] -> Process (ReceivePort a)
- mergePortsRR :: Serializable a => [ReceivePort a] -> Process (ReceivePort a)
- data Match b
- receiveWait :: [Match b] -> Process b
- receiveTimeout :: Int -> [Match b] -> Process (Maybe b)
- match :: forall a b. Serializable a => (a -> Process b) -> Match b
- matchIf :: forall a b. Serializable a => (a -> Bool) -> (a -> Process b) -> Match b
- matchUnknown :: Process b -> Match b
- data  AbstractMessage  = AbstractMessage {- forward :: ProcessId -> Process ()
- maybeHandleMessage :: forall a b. Serializable a => (a -> Process b) -> Process (Maybe b)
 
- matchAny :: forall b. (AbstractMessage -> Process b) -> Match b
- matchAnyIf :: forall a b. Serializable a => (a -> Bool) -> (AbstractMessage -> Process b) -> Match b
- matchChan :: ReceivePort a -> (a -> Process b) -> Match b
- spawn :: NodeId -> Closure (Process ()) -> Process ProcessId
- call :: Serializable a => Static (SerializableDict a) -> NodeId -> Closure (Process a) -> Process a
- terminate :: Process a
- die :: Serializable a => a -> Process b
- kill :: ProcessId -> String -> Process ()
- exit :: Serializable a => ProcessId -> a -> Process ()
- catchExit :: forall a b. (Show a, Serializable a) => Process b -> (ProcessId -> a -> Process b) -> Process b
- catchesExit :: Process b -> [ProcessId -> AbstractMessage -> Process (Maybe b)] -> Process b
- data ProcessTerminationException = ProcessTerminationException
- data ProcessRegistrationException = ProcessRegistrationException !String
- data SpawnRef
- getSelfPid :: Process ProcessId
- getSelfNode :: Process NodeId
- data  ProcessInfo  = ProcessInfo {- infoNode :: NodeId
- infoRegisteredNames :: [String]
- infoMessageQueueLength :: Maybe Int
- infoMonitors :: [(ProcessId, MonitorRef)]
- infoLinks :: [ProcessId]
 
- getProcessInfo :: ProcessId -> Process (Maybe ProcessInfo)
- link :: ProcessId -> Process ()
- linkNode :: NodeId -> Process ()
- linkPort :: SendPort a -> Process ()
- unlink :: ProcessId -> Process ()
- unlinkNode :: NodeId -> Process ()
- unlinkPort :: SendPort a -> Process ()
- monitor :: ProcessId -> Process MonitorRef
- monitorNode :: NodeId -> Process MonitorRef
- monitorPort :: forall a. Serializable a => SendPort a -> Process MonitorRef
- unmonitor :: MonitorRef -> Process ()
- withMonitor :: ProcessId -> Process a -> Process a
- data MonitorRef
- data ProcessLinkException = ProcessLinkException !ProcessId !DiedReason
- data NodeLinkException = NodeLinkException !NodeId !DiedReason
- data PortLinkException = PortLinkException !SendPortId !DiedReason
- data ProcessMonitorNotification = ProcessMonitorNotification !MonitorRef !ProcessId !DiedReason
- data NodeMonitorNotification = NodeMonitorNotification !MonitorRef !NodeId !DiedReason
- data PortMonitorNotification = PortMonitorNotification !MonitorRef !SendPortId !DiedReason
- data DiedReason
- data Closure a
- closure :: Static (ByteString -> a) -> ByteString -> Closure a
- data Static a
- unStatic :: Typeable a => Static a -> Process a
- unClosure :: Typeable a => Closure a -> Process a
- data RemoteTable
- say :: String -> Process ()
- register :: String -> ProcessId -> Process ()
- reregister :: String -> ProcessId -> Process ()
- unregister :: String -> Process ()
- whereis :: String -> Process (Maybe ProcessId)
- nsend :: Serializable a => String -> a -> Process ()
- registerRemoteAsync :: NodeId -> String -> ProcessId -> Process ()
- reregisterRemoteAsync :: NodeId -> String -> ProcessId -> Process ()
- unregisterRemoteAsync :: NodeId -> String -> Process ()
- whereisRemoteAsync :: NodeId -> String -> Process ()
- nsendRemote :: Serializable a => NodeId -> String -> a -> Process ()
- data WhereIsReply = WhereIsReply String (Maybe ProcessId)
- data RegisterReply = RegisterReply String Bool
- catch :: Exception e => Process a -> (e -> Process a) -> Process a
- data Handler a = forall e . Exception e => Handler (e -> Process a)
- catches :: Process a -> [Handler a] -> Process a
- try :: Exception e => Process a -> Process (Either e a)
- mask :: ((forall a. Process a -> Process a) -> Process b) -> Process b
- onException :: Process a -> Process b -> Process a
- bracket :: Process a -> (a -> Process b) -> (a -> Process c) -> Process c
- bracket_ :: Process a -> Process b -> Process c -> Process c
- finally :: Process a -> Process b -> Process a
- spawnAsync :: NodeId -> Closure (Process ()) -> Process SpawnRef
- spawnSupervised :: NodeId -> Closure (Process ()) -> Process (ProcessId, MonitorRef)
- spawnLink :: NodeId -> Closure (Process ()) -> Process ProcessId
- spawnMonitor :: NodeId -> Closure (Process ()) -> Process (ProcessId, MonitorRef)
- spawnChannel :: forall a. Typeable a => Static (SerializableDict a) -> NodeId -> Closure (ReceivePort a -> Process ()) -> Process (SendPort a)
- data DidSpawn = DidSpawn SpawnRef ProcessId
- spawnLocal :: Process () -> Process ProcessId
- spawnChannelLocal :: Serializable a => (ReceivePort a -> Process ()) -> Process (SendPort a)
- reconnect :: ProcessId -> Process ()
- reconnectPort :: SendPort a -> Process ()
Basic types
Process identifier
Node identifier
The Cloud Haskell Process type
data SendPortId Source
A send port is identified by a SendPortId.
You cannot send directly to a SendPortId; instead, use newChan
 to create a SendPort.
Instances
processNodeId :: ProcessId -> NodeIdSource
The ID of the node the process is running on
sendPortProcessId :: SendPortId -> ProcessIdSource
The ID of the process that will receive messages sent on this port
Basic messaging
send :: Serializable a => ProcessId -> a -> Process ()Source
Send a message
expect :: forall a. Serializable a => Process aSource
Wait for a message of a specific type
expectTimeout :: forall a. Serializable a => Int -> Process (Maybe a)Source
Like expect but with a timeout
Channels
data ReceivePort a Source
The receive end of a typed channel (not serializable)
Note that ReceivePort implements Functor, Applicative, Alternative
 and Monad. This is especially useful when merging receive ports.
The send send of a typed channel (serializable)
sendPortId :: SendPort a -> SendPortIdSource
The (unique) ID of this send port
newChan :: Serializable a => Process (SendPort a, ReceivePort a)Source
Create a new typed channel
sendChan :: Serializable a => SendPort a -> a -> Process ()Source
Send a message on a typed channel
receiveChan :: Serializable a => ReceivePort a -> Process aSource
Wait for a message on a typed channel
receiveChanTimeout :: Serializable a => Int -> ReceivePort a -> Process (Maybe a)Source
Like receiveChan but with a timeout. If the timeout is 0, do a
 non-blocking check for a message.
mergePortsBiased :: Serializable a => [ReceivePort a] -> Process (ReceivePort a)Source
Merge a list of typed channels.
The result port is left-biased: if there are messages available on more than one port, the first available message is returned.
mergePortsRR :: Serializable a => [ReceivePort a] -> Process (ReceivePort a)Source
Like mergePortsBiased, but with a round-robin scheduler (rather than
 left-biased)
Advanced messaging
Opaque type used in receiveWait and receiveTimeout
receiveWait :: [Match b] -> Process bSource
Test the matches in order against each message in the queue
receiveTimeout :: Int -> [Match b] -> Process (Maybe b)Source
Like receiveWait but with a timeout.
If the timeout is zero do a non-blocking check for matching messages. A non-zero timeout is applied only when waiting for incoming messages (that is, after we have checked the messages that are already in the mailbox).
match :: forall a b. Serializable a => (a -> Process b) -> Match bSource
Match against any message of the right type
matchIf :: forall a b. Serializable a => (a -> Bool) -> (a -> Process b) -> Match bSource
Match against any message of the right type that satisfies a predicate
matchUnknown :: Process b -> Match bSource
Remove any message from the queue
data AbstractMessage Source
Represents a received message and provides two basic operations on it.
Constructors
| AbstractMessage | |
| Fields 
 | |
matchAny :: forall b. (AbstractMessage -> Process b) -> Match bSource
Match against an arbitrary message. matchAny removes the first available
 message from the process mailbox, and via the AbstractMessage type,
 supports forwarding or handling the message if it is of the correct
 type. If not of the right type, then the AbstractMessage
 maybeHandleMessage function will not evaluate the supplied expression,
 but the message will still have been removed from the process mailbox!
matchAnyIf :: forall a b. Serializable a => (a -> Bool) -> (AbstractMessage -> Process b) -> Match bSource
Match against an arbitrary message. matchAnyIf will only remove the
 message from the process mailbox, if the supplied condition matches. The
 success (or failure) of runtime type checks in maybeHandleMessage does not
 count here, i.e., if the condition evaluates to True then the message will
 be removed from the process mailbox and decoded, but that does not
 guarantee that an expression passed to maybeHandleMessage will pass the
 runtime type checks and therefore be evaluated. If the types do not match
 up, then maybeHandleMessage returns Nothing.
matchChan :: ReceivePort a -> (a -> Process b) -> Match bSource
Process management
spawn :: NodeId -> Closure (Process ()) -> Process ProcessIdSource
Spawn a process
For more information about Closure, see
 Control.Distributed.Process.Closure.
See also call.
call :: Serializable a => Static (SerializableDict a) -> NodeId -> Closure (Process a) -> Process aSource
Run a process remotely and wait for it to reply
We monitor the remote process: if it dies before it can send a reply, we die too.
For more information about Static, SerializableDict, and Closure, see
 Control.Distributed.Process.Closure.
See also spawn.
die :: Serializable a => a -> Process bSource
Die immediately - throws a ProcessExitException with the given reason.
exit :: Serializable a => ProcessId -> a -> Process ()Source
Graceful request to exit a process. Throws ProcessExitException with the
 supplied reason encoded as a message. Any exit signal raised in this
 manner can be handled using the catchExit family of functions.
catchExit :: forall a b. (Show a, Serializable a) => Process b -> (ProcessId -> a -> Process b) -> Process bSource
Catches ProcessExitException. The handler will not be applied unless its
 type matches the encoded data stored in the exception (see the reason
 argument given to the exit primitive). If the handler cannot be applied,
 the exception will be re-thrown.
To handle ProcessExitException without regard for reason, see catch.
 To handle multiple reasons of differing types, see catchesExit.
catchesExit :: Process b -> [ProcessId -> AbstractMessage -> Process (Maybe b)] -> Process bSource
Lift catches (almost).
As ProcessExitException stores the exit reason as a typed, encoded
 message, a handler must accept an input of the expected type. In order to
 handle a list of potentially different handlers (and therefore input types),
 a handler passed to catchesExit must accept AbstractMessage and return
 Maybe (i.e., Just p if it handled the exit reason, otherwise Nothing).
See maybeHandleMessage and AsbtractMessage for more details.
data ProcessRegistrationException Source
Exception thrown when a process attempts to register a process under an already-registered name or to unregister a name that hasn't been registered
Constructors
| ProcessRegistrationException !String | 
SpawnRef are used to return pids of spawned processes
getSelfPid :: Process ProcessIdSource
Our own process ID
getSelfNode :: Process NodeIdSource
Get the node ID of our local node
data ProcessInfo Source
Provide information about a running process
Constructors
| ProcessInfo | |
| Fields 
 | |
Instances
getProcessInfo :: ProcessId -> Process (Maybe ProcessInfo)Source
Get information about the specified process
Monitoring and linking
link :: ProcessId -> Process ()Source
Link to a remote process (asynchronous)
When process A links to process B (that is, process A calls
 link pidB) then an asynchronous exception will be thrown to process A
 when process B terminates (normally or abnormally), or when process A gets
 disconnected from process B. Although it is technically possible to catch
 these exceptions, chances are if you find yourself trying to do so you should
 probably be using monitor rather than link. In particular, code such as
link pidB -- Link to process B expect -- Wait for a message from process B unlink pidB -- Unlink again
doesn't quite do what one might expect: if process B sends a message to
 process A, and subsequently terminates, then process A might or might not
 be terminated too, depending on whether the exception is thrown before or
 after the unlink (i.e., this code has a race condition).
Linking is all-or-nothing: A is either linked to B, or it's not. A second
 call to link has no effect.
Note that link provides unidirectional linking (see spawnSupervised).
 Linking makes no distinction between normal and abnormal termination of
 the remote process.
unlink :: ProcessId -> Process ()Source
Remove a link
This is synchronous in the sense that once it returns you are guaranteed that no exception will be raised if the remote process dies. However, it is asynchronous in the sense that we do not wait for a response from the remote node.
unlinkNode :: NodeId -> Process ()Source
Remove a node link
This has the same synchronous/asynchronous nature as unlink.
unlinkPort :: SendPort a -> Process ()Source
Remove a channel (send port) link
This has the same synchronous/asynchronous nature as unlink.
monitor :: ProcessId -> Process MonitorRefSource
Monitor another process (asynchronous)
When process A monitors process B (that is, process A calls
 monitor pidB) then process A will receive a ProcessMonitorNotification
 when process B terminates (normally or abnormally), or when process A gets
 disconnected from process B. You receive this message like any other (using
 expect); the notification includes a reason (DiedNormal, DiedException,
 DiedDisconnect, etc.).
Every call to monitor returns a new monitor reference MonitorRef; if
 multiple monitors are set up, multiple notifications will be delivered
 and monitors can be disabled individually using unmonitor.
monitorNode :: NodeId -> Process MonitorRefSource
Monitor a node (asynchronous)
monitorPort :: forall a. Serializable a => SendPort a -> Process MonitorRefSource
Monitor a typed channel (asynchronous)
unmonitor :: MonitorRef -> Process ()Source
Remove a monitor
This has the same synchronous/asynchronous nature as unlink.
withMonitor :: ProcessId -> Process a -> Process aSource
Establishes temporary monitoring of another process.
withMonitor pid code sets up monitoring of pid for the duration
 of code.  Note: although monitoring is no longer active when
 withMonitor returns, there might still be unreceived monitor
 messages in the queue.
data ProcessLinkException Source
Exceptions thrown when a linked process dies
Constructors
| ProcessLinkException !ProcessId !DiedReason | 
data NodeLinkException Source
Exception thrown when a linked node dies
Constructors
| NodeLinkException !NodeId !DiedReason | 
data PortLinkException Source
Exception thrown when a linked channel (port) dies
Constructors
| PortLinkException !SendPortId !DiedReason | 
data ProcessMonitorNotification Source
Message sent by process monitors
Constructors
| ProcessMonitorNotification !MonitorRef !ProcessId !DiedReason | 
data NodeMonitorNotification Source
Message sent by node monitors
Constructors
| NodeMonitorNotification !MonitorRef !NodeId !DiedReason | 
data PortMonitorNotification Source
Message sent by channel (port) monitors
Constructors
| PortMonitorNotification !MonitorRef !SendPortId !DiedReason | 
data DiedReason Source
Why did a process die?
Constructors
| DiedNormal | Normal termination | 
| DiedException !String | The process exited with an exception
 (provided as  | 
| DiedDisconnect | We got disconnected from the process node | 
| DiedNodeDown | The process node died | 
| DiedUnknownId | Invalid (processnodechannel) identifier | 
Instances
Closures
data Closure a
A closure is a static value and an encoded environment
Arguments
| :: Static (ByteString -> a) | Decoder | 
| -> ByteString | Encoded closure environment | 
| -> Closure a | 
data Static a
A static value. Static is opaque; see staticLabel and staticApply.
data RemoteTable
Runtime dictionary for unstatic lookups 
Logging
say :: String -> Process ()Source
Log a string
say message sends a message (time, pid of the current process, message)
 to the process registered as logger.  By default, this process simply
 sends the string to stderr. Individual Cloud Haskell backends might
 replace this with a different logger process, however.
Registry
register :: String -> ProcessId -> Process ()Source
Register a process with the local registry (asynchronous).
 This version will wait until a response is gotten from the
 management process. The name must not already be registered.
 The process need not be on this node.
 A bad registration will result in a ProcessRegistrationException
The process to be registered does not have to be local itself.
reregister :: String -> ProcessId -> Process ()Source
Like register, but will replace an existing registration.
 The name must already be registered.
unregister :: String -> Process ()Source
Remove a process from the local registry (asynchronous). This version will wait until a response is gotten from the management process. The name must already be registered.
nsend :: Serializable a => String -> a -> Process ()Source
Named send to a process in the local registry (asynchronous)
registerRemoteAsync :: NodeId -> String -> ProcessId -> Process ()Source
Register a process with a remote registry (asynchronous).
The process to be registered does not have to live on the same remote node.
 Reply wil come in the form of a RegisterReply message
See comments in whereisRemoteAsync
unregisterRemoteAsync :: NodeId -> String -> Process ()Source
Remove a process from a remote registry (asynchronous).
Reply wil come in the form of a RegisterReply message
See comments in whereisRemoteAsync
whereisRemoteAsync :: NodeId -> String -> Process ()Source
Query a remote process registry (asynchronous)
Reply will come in the form of a WhereIsReply message.
There is currently no synchronous version of whereisRemoteAsync: if
 you implement one yourself, be sure to take into account that the remote
 node might die or get disconnect before it can respond (i.e. you should
 use monitorNode and take appropriate action when you receive a
 NodeMonitorNotification).
nsendRemote :: Serializable a => NodeId -> String -> a -> Process ()Source
Named send to a process in a remote registry (asynchronous)
data WhereIsReply Source
(Asynchronous) reply from whereis
Constructors
| WhereIsReply String (Maybe ProcessId) | 
Instances
data RegisterReply Source
(Asynchronous) reply from register and unregister
Constructors
| RegisterReply String Bool | 
Exception handling
You need this when using catches
onException :: Process a -> Process b -> Process aSource
Lift onException
Auxiliary API
spawnAsync :: NodeId -> Closure (Process ()) -> Process SpawnRefSource
Asynchronous version of spawn
(spawn is defined in terms of spawnAsync and expect)
spawnSupervised :: NodeId -> Closure (Process ()) -> Process (ProcessId, MonitorRef)Source
Spawn a child process, have the child link to the parent and the parent monitor the child
spawnMonitor :: NodeId -> Closure (Process ()) -> Process (ProcessId, MonitorRef)Source
Like spawnLink, but monitor the spawned process
spawnChannel :: forall a. Typeable a => Static (SerializableDict a) -> NodeId -> Closure (ReceivePort a -> Process ()) -> Process (SendPort a)Source
Spawn a new process, supplying it with a new ReceivePort and return
 the corresponding SendPort.
(Asynchronius) reply from spawn
Local versions of spawn
spawnLocal :: Process () -> Process ProcessIdSource
Spawn a process on the local node
spawnChannelLocal :: Serializable a => (ReceivePort a -> Process ()) -> Process (SendPort a)Source
Create a new typed channel, spawn a process on the local node, passing it the receive port, and return the send port
Reconnecting
reconnect :: ProcessId -> Process ()Source
Cloud Haskell provides the illusion of connection-less, reliable, ordered
 message passing. However, when network connections get disrupted this
 illusion cannot always be maintained. Once a network connection breaks (even
 temporarily) no further communication on that connection will be possible.
 For example, if process A sends a message to process B, and A is then
 notified (by monitor notification) that it got disconnected from B, A will
 not be able to send any further messages to B, unless A explicitly
 indicates that it is acceptable to attempt to reconnect to B using the
 Cloud Haskell reconnect primitive.
Importantly, when A calls reconnect it acknowledges that some messages to
 B might have been lost. For instance, if A sends messages m1 and m2 to B,
 then receives a monitor notification that its connection to B has been lost,
 calls reconnect and then sends m3, it is possible that B will receive m1
 and m3 but not m2.
Note that reconnect does not mean reconnect now but rather /it is okay
 to attempt to reconnect on the next send/. In particular, if no further
 communication attempts are made to B then A can use reconnect to clean up
 its connection to B.
reconnectPort :: SendPort a -> Process ()Source
Reconnect to a sendport. See reconnect for more information.