Safe Haskell | None |
---|
- Cloud Haskell
This is an implementation of Cloud Haskell, as described in Towards Haskell in the Cloud by Jeff Epstein, Andrew Black, and Simon Peyton Jones (http://research.microsoft.com/en-us/um/people/simonpj/papers/parallel/), although some of the details are different. The precise message passing semantics are based on A unified semantics for future Erlang by Hans Svensson, Lars-Åke Fredlund and Clara Benac Earle.
- data ProcessId
- data NodeId
- data Process a
- data SendPortId
- processNodeId :: ProcessId -> NodeId
- sendPortProcessId :: SendPortId -> ProcessId
- liftIO :: MonadIO m => forall a. IO a -> m a
- send :: Serializable a => ProcessId -> a -> Process ()
- expect :: forall a. Serializable a => Process a
- data ReceivePort a
- data SendPort a
- sendPortId :: SendPort a -> SendPortId
- newChan :: Serializable a => Process (SendPort a, ReceivePort a)
- sendChan :: Serializable a => SendPort a -> a -> Process ()
- receiveChan :: Serializable a => ReceivePort a -> Process a
- mergePortsBiased :: Serializable a => [ReceivePort a] -> Process (ReceivePort a)
- mergePortsRR :: Serializable a => [ReceivePort a] -> Process (ReceivePort a)
- data Match b
- receiveWait :: [Match b] -> Process b
- receiveTimeout :: Int -> [Match b] -> Process (Maybe b)
- match :: forall a b. Serializable a => (a -> Process b) -> Match b
- matchIf :: forall a b. Serializable a => (a -> Bool) -> (a -> Process b) -> Match b
- matchUnknown :: Process b -> Match b
- data AbstractMessage = AbstractMessage {}
- matchAny :: forall b. (AbstractMessage -> Process b) -> Match b
- spawn :: NodeId -> Closure (Process ()) -> Process ProcessId
- call :: Serializable a => Static (SerializableDict a) -> NodeId -> Closure (Process a) -> Process a
- terminate :: Process a
- data ProcessTerminationException = ProcessTerminationException
- data SpawnRef
- getSelfPid :: Process ProcessId
- getSelfNode :: Process NodeId
- link :: ProcessId -> Process ()
- linkNode :: NodeId -> Process ()
- linkPort :: SendPort a -> Process ()
- unlink :: ProcessId -> Process ()
- unlinkNode :: NodeId -> Process ()
- unlinkPort :: SendPort a -> Process ()
- monitor :: ProcessId -> Process MonitorRef
- monitorNode :: NodeId -> Process MonitorRef
- monitorPort :: forall a. Serializable a => SendPort a -> Process MonitorRef
- unmonitor :: MonitorRef -> Process ()
- data ProcessLinkException = ProcessLinkException !ProcessId !DiedReason
- data NodeLinkException = NodeLinkException !NodeId !DiedReason
- data PortLinkException = PortLinkException !SendPortId !DiedReason
- data MonitorRef
- data ProcessMonitorNotification = ProcessMonitorNotification !MonitorRef !ProcessId !DiedReason
- data NodeMonitorNotification = NodeMonitorNotification !MonitorRef !NodeId !DiedReason
- data PortMonitorNotification = PortMonitorNotification !MonitorRef !SendPortId !DiedReason
- data DiedReason
- data Closure a
- closure :: Static (ByteString -> a) -> ByteString -> Closure a
- data Static a
- unStatic :: Typeable a => Static a -> Process a
- unClosure :: Typeable a => Closure a -> Process a
- data RemoteTable
- say :: String -> Process ()
- register :: String -> ProcessId -> Process ()
- unregister :: String -> Process ()
- whereis :: String -> Process (Maybe ProcessId)
- nsend :: Serializable a => String -> a -> Process ()
- registerRemote :: NodeId -> String -> ProcessId -> Process ()
- unregisterRemote :: NodeId -> String -> Process ()
- whereisRemoteAsync :: NodeId -> String -> Process ()
- nsendRemote :: Serializable a => NodeId -> String -> a -> Process ()
- data WhereIsReply = WhereIsReply String (Maybe ProcessId)
- catch :: Exception e => Process a -> (e -> Process a) -> Process a
- mask :: ((forall a. Process a -> Process a) -> Process b) -> Process b
- onException :: Process a -> Process b -> Process a
- bracket :: Process a -> (a -> Process b) -> (a -> Process c) -> Process c
- bracket_ :: Process a -> Process b -> Process c -> Process c
- finally :: Process a -> Process b -> Process a
- expectTimeout :: forall a. Serializable a => Int -> Process (Maybe a)
- spawnAsync :: NodeId -> Closure (Process ()) -> Process SpawnRef
- spawnSupervised :: NodeId -> Closure (Process ()) -> Process (ProcessId, MonitorRef)
- spawnLink :: NodeId -> Closure (Process ()) -> Process ProcessId
- spawnMonitor :: NodeId -> Closure (Process ()) -> Process (ProcessId, MonitorRef)
- spawnChannel :: forall a. Typeable a => Static (SerializableDict a) -> NodeId -> Closure (ReceivePort a -> Process ()) -> Process (SendPort a)
- data DidSpawn = DidSpawn SpawnRef ProcessId
- spawnLocal :: Process () -> Process ProcessId
- spawnChannelLocal :: Serializable a => (ReceivePort a -> Process ()) -> Process (SendPort a)
- reconnect :: ProcessId -> Process ()
- reconnectPort :: SendPort a -> Process ()
Basic types
Process identifier
The Cloud Haskell Process
type
data SendPortId Source
A send port is identified by a SendPortId.
You cannot send directly to a SendPortId; instead, use newChan
to create a SendPort.
processNodeId :: ProcessId -> NodeIdSource
The ID of the node the process is running on
sendPortProcessId :: SendPortId -> ProcessIdSource
The ID of the process that will receive messages sent on this port
Basic messaging
send :: Serializable a => ProcessId -> a -> Process ()Source
Send a message
expect :: forall a. Serializable a => Process aSource
Wait for a message of a specific type
Channels
data ReceivePort a Source
The receive end of a typed channel (not serializable)
The send send of a typed channel (serializable)
sendPortId :: SendPort a -> SendPortIdSource
The (unique) ID of this send port
newChan :: Serializable a => Process (SendPort a, ReceivePort a)Source
Create a new typed channel
sendChan :: Serializable a => SendPort a -> a -> Process ()Source
Send a message on a typed channel
receiveChan :: Serializable a => ReceivePort a -> Process aSource
Wait for a message on a typed channel
mergePortsBiased :: Serializable a => [ReceivePort a] -> Process (ReceivePort a)Source
Merge a list of typed channels.
The result port is left-biased: if there are messages available on more than one port, the first available message is returned.
mergePortsRR :: Serializable a => [ReceivePort a] -> Process (ReceivePort a)Source
Like mergePortsBiased
, but with a round-robin scheduler (rather than
left-biased)
Advanced messaging
Opaque type used in receiveWait
and receiveTimeout
receiveWait :: [Match b] -> Process bSource
Test the matches in order against each message in the queue
receiveTimeout :: Int -> [Match b] -> Process (Maybe b)Source
Like receiveWait
but with a timeout.
If the timeout is zero do a non-blocking check for matching messages. A non-zero timeout is applied only when waiting for incoming messages (that is, after we have checked the messages that are already in the mailbox).
match :: forall a b. Serializable a => (a -> Process b) -> Match bSource
Match against any message of the right type
matchIf :: forall a b. Serializable a => (a -> Bool) -> (a -> Process b) -> Match bSource
Match against any message of the right type that satisfies a predicate
matchUnknown :: Process b -> Match bSource
Remove any message from the queue
matchAny :: forall b. (AbstractMessage -> Process b) -> Match bSource
Match against an arbitrary message
Process management
spawn :: NodeId -> Closure (Process ()) -> Process ProcessIdSource
Spawn a process
For more information about Closure
, see
Control.Distributed.Process.Closure.
See also call
.
call :: Serializable a => Static (SerializableDict a) -> NodeId -> Closure (Process a) -> Process aSource
Run a process remotely and wait for it to reply
We monitor the remote process: if it dies before it can send a reply, we die too.
For more information about Static
, SerializableDict
, and Closure
, see
Control.Distributed.Process.Closure.
See also spawn
.
data ProcessTerminationException Source
Thrown by terminate
SpawnRef
are used to return pids of spawned processes
getSelfPid :: Process ProcessIdSource
Our own process ID
getSelfNode :: Process NodeIdSource
Get the node ID of our local node
Monitoring and linking
link :: ProcessId -> Process ()Source
Link to a remote process (asynchronous)
Note that link
provides unidirectional linking (see spawnSupervised
).
Linking makes no distinction between normal and abnormal termination of
the remote process.
unlink :: ProcessId -> Process ()Source
Remove a link
This is synchronous in the sense that once it returns you are guaranteed that no exception will be raised if the remote process dies. However, it is asynchronous in the sense that we do not wait for a response from the remote node.
unlinkNode :: NodeId -> Process ()Source
Remove a node link
This has the same synchronous/asynchronous nature as unlink
.
unlinkPort :: SendPort a -> Process ()Source
Remove a channel (send port) link
This has the same synchronous/asynchronous nature as unlink
.
monitor :: ProcessId -> Process MonitorRefSource
Monitor another process (asynchronous)
monitorNode :: NodeId -> Process MonitorRefSource
Monitor a node (asynchronous)
monitorPort :: forall a. Serializable a => SendPort a -> Process MonitorRefSource
Monitor a typed channel (asynchronous)
unmonitor :: MonitorRef -> Process ()Source
Remove a monitor
This has the same synchronous/asynchronous nature as unlink
.
data ProcessLinkException Source
Exceptions thrown when a linked process dies
data NodeLinkException Source
Exception thrown when a linked node dies
data PortLinkException Source
Exception thrown when a linked channel (port) dies
data MonitorRef Source
MonitorRef is opaque for regular Cloud Haskell processes
data ProcessMonitorNotification Source
Message sent by process monitors
data NodeMonitorNotification Source
Message sent by node monitors
data PortMonitorNotification Source
Message sent by channel (port) monitors
data DiedReason Source
Why did a process die?
DiedNormal | Normal termination |
DiedException !String | The process exited with an exception
(provided as |
DiedDisconnect | We got disconnected from the process node |
DiedNodeDown | The process node died |
DiedUnknownId | Invalid (processnodechannel) identifier |
Closures
data Closure a
A closure is a static value and an encoded environment
:: Static (ByteString -> a) | Decoder |
-> ByteString | Encoded closure environment |
-> Closure a |
data Static a
A static value. Static is opaque; see staticLabel
and staticApply
.
data RemoteTable
Runtime dictionary for unstatic
lookups
Logging
say :: String -> Process ()Source
Log a string
say message
sends a message (time, pid of the current process, message)
to the process registered as logger
. By default, this process simply
sends the string to stderr
. Individual Cloud Haskell backends might
replace this with a different logger process, however.
Registry
register :: String -> ProcessId -> Process ()Source
Register a process with the local registry (asynchronous).
The process to be registered does not have to be local itself.
unregister :: String -> Process ()Source
Remove a process from the local registry (asynchronous).
nsend :: Serializable a => String -> a -> Process ()Source
Named send to a process in the local registry (asynchronous)
registerRemote :: NodeId -> String -> ProcessId -> Process ()Source
Register a process with a remote registry (asynchronous).
The process to be registered does not have to live on the same remote node.
unregisterRemote :: NodeId -> String -> Process ()Source
Remove a process from a remote registry (asynchronous).
whereisRemoteAsync :: NodeId -> String -> Process ()Source
Query a remote process registry (asynchronous)
Reply will come in the form of a WhereIsReply
message.
There is currently no synchronous version of whereisRemoteAsync
: if
you implement one yourself, be sure to take into account that the remote
node might die or get disconnect before it can respond (i.e. you should
use monitorNode
and take appropriate action when you receive a
NodeMonitorNotification
).
nsendRemote :: Serializable a => NodeId -> String -> a -> Process ()Source
Named send to a process in a remote registry (asynchronous)
data WhereIsReply Source
(Asynchronous) reply from whereis
Exception handling
onException :: Process a -> Process b -> Process aSource
Lift onException
Auxiliary API
expectTimeout :: forall a. Serializable a => Int -> Process (Maybe a)Source
Like expect
but with a timeout
spawnAsync :: NodeId -> Closure (Process ()) -> Process SpawnRefSource
Asynchronous version of spawn
(spawn
is defined in terms of spawnAsync
and expect
)
spawnSupervised :: NodeId -> Closure (Process ()) -> Process (ProcessId, MonitorRef)Source
Spawn a child process, have the child link to the parent and the parent monitor the child
spawnMonitor :: NodeId -> Closure (Process ()) -> Process (ProcessId, MonitorRef)Source
Like spawnLink
, but monitor the spawned process
spawnChannel :: forall a. Typeable a => Static (SerializableDict a) -> NodeId -> Closure (ReceivePort a -> Process ()) -> Process (SendPort a)Source
Spawn a new process, supplying it with a new ReceivePort
and return
the corresponding SendPort
.
(Asynchronius) reply from spawn
Local versions of spawn
spawnLocal :: Process () -> Process ProcessIdSource
Spawn a process on the local node
spawnChannelLocal :: Serializable a => (ReceivePort a -> Process ()) -> Process (SendPort a)Source
Create a new typed channel, spawn a process on the local node, passing it the receive port, and return the send port
Reconnecting
reconnect :: ProcessId -> Process ()Source
Cloud Haskell provides the illusion of connection-less, reliable, ordered
message passing. However, when network connections get disrupted this
illusion cannot always be maintained. Once a network connection breaks (even
temporarily) no further communication on that connection will be possible.
For example, if process A sends a message to process B, and A is then
notified (by monitor notification) that it got disconnected from B, A will
not be able to send any further messages to B, unless A explicitly
indicates that it is acceptable to attempt to reconnect to B using the
Cloud Haskell reconnect
primitive.
Importantly, when A calls reconnect
it acknowledges that some messages to
B might have been lost. For instance, if A sends messages m1 and m2 to B,
then receives a monitor notification that its connection to B has been lost,
calls reconnect
and then sends m3, it is possible that B will receive m1
and m3 but not m2.
Note that reconnect
does not mean reconnect now but rather /it is okay
to attempt to reconnect on the next send/. In particular, if no further
communication attempts are made to B then A can use reconnect to clean up
its connection to B.
reconnectPort :: SendPort a -> Process ()Source
Reconnect to a sendport. See reconnect
for more information.