Safe Haskell | None |
---|
- Cloud Haskell
This is an implementation of Cloud Haskell, as described in Towards Haskell in the Cloud by Jeff Epstein, Andrew Black, and Simon Peyton Jones (http://research.microsoft.com/en-us/um/people/simonpj/papers/parallel/), although some of the details are different. The precise message passing semantics are based on A unified semantics for future Erlang by Hans Svensson, Lars-Åke Fredlund and Clara Benac Earle.
- data ProcessId
- data NodeId
- data Process a
- data SendPortId
- processNodeId :: ProcessId -> NodeId
- sendPortProcessId :: SendPortId -> ProcessId
- liftIO :: MonadIO m => forall a. IO a -> m a
- send :: Serializable a => ProcessId -> a -> Process ()
- expect :: forall a. Serializable a => Process a
- data ReceivePort a
- data SendPort a
- sendPortId :: SendPort a -> SendPortId
- newChan :: Serializable a => Process (SendPort a, ReceivePort a)
- sendChan :: Serializable a => SendPort a -> a -> Process ()
- receiveChan :: Serializable a => ReceivePort a -> Process a
- mergePortsBiased :: Serializable a => [ReceivePort a] -> Process (ReceivePort a)
- mergePortsRR :: Serializable a => [ReceivePort a] -> Process (ReceivePort a)
- data Match b
- receiveWait :: [Match b] -> Process b
- receiveTimeout :: Int -> [Match b] -> Process (Maybe b)
- match :: forall a b. Serializable a => (a -> Process b) -> Match b
- matchIf :: forall a b. Serializable a => (a -> Bool) -> (a -> Process b) -> Match b
- matchUnknown :: Process b -> Match b
- spawn :: NodeId -> Closure (Process ()) -> Process ProcessId
- call :: Serializable a => Static (SerializableDict a) -> NodeId -> Closure (Process a) -> Process a
- terminate :: Process a
- data ProcessTerminationException = ProcessTerminationException
- data SpawnRef
- getSelfPid :: Process ProcessId
- getSelfNode :: Process NodeId
- link :: ProcessId -> Process ()
- linkNode :: NodeId -> Process ()
- linkPort :: SendPort a -> Process ()
- unlink :: ProcessId -> Process ()
- unlinkNode :: NodeId -> Process ()
- unlinkPort :: SendPort a -> Process ()
- monitor :: ProcessId -> Process MonitorRef
- monitorNode :: NodeId -> Process MonitorRef
- monitorPort :: forall a. Serializable a => SendPort a -> Process MonitorRef
- unmonitor :: MonitorRef -> Process ()
- data ProcessLinkException = ProcessLinkException ProcessId DiedReason
- data NodeLinkException = NodeLinkException NodeId DiedReason
- data PortLinkException = PortLinkException SendPortId DiedReason
- data MonitorRef
- data ProcessMonitorNotification = ProcessMonitorNotification MonitorRef ProcessId DiedReason
- data NodeMonitorNotification = NodeMonitorNotification MonitorRef NodeId DiedReason
- data PortMonitorNotification = PortMonitorNotification MonitorRef SendPortId DiedReason
- data DiedReason
- data Closure a
- data Static a
- unClosure :: forall a. Typeable a => Closure a -> Process a
- data RemoteTable
- say :: String -> Process ()
- register :: String -> ProcessId -> Process ()
- unregister :: String -> Process ()
- whereis :: String -> Process (Maybe ProcessId)
- nsend :: Serializable a => String -> a -> Process ()
- registerRemote :: NodeId -> String -> ProcessId -> Process ()
- unregisterRemote :: NodeId -> String -> Process ()
- whereisRemote :: NodeId -> String -> Process (Maybe ProcessId)
- whereisRemoteAsync :: NodeId -> String -> Process ()
- nsendRemote :: Serializable a => NodeId -> String -> a -> Process ()
- data WhereIsReply = WhereIsReply String (Maybe ProcessId)
- catch :: Exception e => Process a -> (e -> Process a) -> Process a
- expectTimeout :: forall a. Serializable a => Int -> Process (Maybe a)
- spawnAsync :: NodeId -> Closure (Process ()) -> Process SpawnRef
- spawnSupervised :: NodeId -> Closure (Process ()) -> Process (ProcessId, MonitorRef)
- spawnLink :: NodeId -> Closure (Process ()) -> Process ProcessId
- spawnMonitor :: NodeId -> Closure (Process ()) -> Process (ProcessId, MonitorRef)
- spawnChannel :: forall a. Typeable a => Static (SerializableDict a) -> NodeId -> Closure (ReceivePort a -> Process ()) -> Process (SendPort a)
- data DidSpawn = DidSpawn SpawnRef ProcessId
Basic types
Process identifier
The Cloud Haskell Process
type
data SendPortId Source
A send port is identified by a SendPortId.
You cannot send directly to a SendPortId; instead, use newChan
to create a SendPort.
processNodeId :: ProcessId -> NodeIdSource
The ID of the node the process is running on
sendPortProcessId :: SendPortId -> ProcessIdSource
The ID of the process that will receive messages sent on this port
Basic messaging
expect :: forall a. Serializable a => Process aSource
Wait for a message of a specific type
Channels
data ReceivePort a Source
The receive end of a typed channel (not serializable)
The send send of a typed channel (serializable)
sendPortId :: SendPort a -> SendPortIdSource
The (unique) ID of this send port
newChan :: Serializable a => Process (SendPort a, ReceivePort a)Source
Create a new typed channel
receiveChan :: Serializable a => ReceivePort a -> Process aSource
Wait for a message on a typed channel
mergePortsBiased :: Serializable a => [ReceivePort a] -> Process (ReceivePort a)Source
Merge a list of typed channels.
The result port is left-biased: if there are messages available on more than one port, the first available message is returned.
mergePortsRR :: Serializable a => [ReceivePort a] -> Process (ReceivePort a)Source
Like mergePortsBiased
, but with a round-robin scheduler (rather than
left-biased)
Advanced messaging
Opaque type used in receiveWait
and receiveTimeout
receiveWait :: [Match b] -> Process bSource
Test the matches in order against each message in the queue
receiveTimeout :: Int -> [Match b] -> Process (Maybe b)Source
Like receiveWait
but with a timeout.
If the timeout is zero do a non-blocking check for matching messages. A non-zero timeout is applied only when waiting for incoming messages (that is, after we have checked the messages that are already in the mailbox).
match :: forall a b. Serializable a => (a -> Process b) -> Match bSource
Match against any message of the right type
matchIf :: forall a b. Serializable a => (a -> Bool) -> (a -> Process b) -> Match bSource
Match against any message of the right type that satisfies a predicate
matchUnknown :: Process b -> Match bSource
Remove any message from the queue
Process management
call :: Serializable a => Static (SerializableDict a) -> NodeId -> Closure (Process a) -> Process aSource
Run a process remotely and wait for it to reply
We monitor the remote process; if it dies before it can send a reply, we die too
data ProcessTerminationException Source
Thrown by terminate
SpawnRef
are used to return pids of spawned processes
getSelfPid :: Process ProcessIdSource
Our own process ID
getSelfNode :: Process NodeIdSource
Get the node ID of our local node
Monitoring and linking
link :: ProcessId -> Process ()Source
Link to a remote process (asynchronous)
Note that link
provides unidirectional linking (see spawnSupervised
).
Linking makes no distinction between normal and abnormal termination of
the remote process.
unlinkNode :: NodeId -> Process ()Source
Remove a node link (synchronous)
unlinkPort :: SendPort a -> Process ()Source
Remove a channel (send port) link (synchronous)
monitor :: ProcessId -> Process MonitorRefSource
Monitor another process (asynchronous)
monitorNode :: NodeId -> Process MonitorRefSource
Monitor a node
monitorPort :: forall a. Serializable a => SendPort a -> Process MonitorRefSource
Monitor a typed channel
unmonitor :: MonitorRef -> Process ()Source
Remove a monitor (synchronous)
data ProcessLinkException Source
Exceptions thrown when a linked process dies
data NodeLinkException Source
Exception thrown when a linked node dies
data PortLinkException Source
Exception thrown when a linked channel (port) dies
data MonitorRef Source
MonitorRef is opaque for regular Cloud Haskell processes
data ProcessMonitorNotification Source
Message sent by process monitors
data NodeMonitorNotification Source
Message sent by node monitors
data PortMonitorNotification Source
Message sent by channel (port) monitors
data DiedReason Source
Why did a process die?
DiedNormal | Normal termination |
DiedException String | The process exited with an exception
(provided as |
DiedDisconnect | We got disconnected from the process node |
DiedNodeDown | The process node died |
DiedUnknownId | Invalid (processnodechannel) identifier |
Closures
A closure is a static value and an encoded environment
A static value is top-level bound or the application of two static values
data RemoteTable Source
Runtime dictionary for unstatic
lookups
Logging
say :: String -> Process ()Source
Log a string
say message
sends a message (time, pid of the current process, message)
to the process registered as logger
. By default, this process simply
sends the string to stderr
. Individual Cloud Haskell backends might
replace this with a different logger process, however.
Registry
register :: String -> ProcessId -> Process ()Source
Register a process with the local registry (asynchronous).
The process to be registered does not have to be local itself.
unregister :: String -> Process ()Source
Remove a process from the local registry (asynchronous).
nsend :: Serializable a => String -> a -> Process ()Source
Named send to a process in the local registry (asynchronous)
registerRemote :: NodeId -> String -> ProcessId -> Process ()Source
Register a process with a remote registry (asynchronous).
The process to be registered does not have to live on the same remote node.
unregisterRemote :: NodeId -> String -> Process ()Source
Remove a process from a remote registry (asynchronous).
whereisRemote :: NodeId -> String -> Process (Maybe ProcessId)Source
Query a remote process registry (synchronous)
whereisRemoteAsync :: NodeId -> String -> Process ()Source
Query a remote process registry (asynchronous)
Reply will come in the form of a WhereIsReply
message
nsendRemote :: Serializable a => NodeId -> String -> a -> Process ()Source
Named send to a process in a remote registry (asynchronous)
data WhereIsReply Source
(Asynchronous) reply from whereis
Auxiliary API
catch :: Exception e => Process a -> (e -> Process a) -> Process aSource
Catch exceptions within a process
expectTimeout :: forall a. Serializable a => Int -> Process (Maybe a)Source
Like expect
but with a timeout
spawnAsync :: NodeId -> Closure (Process ()) -> Process SpawnRefSource
Asynchronous version of spawn
(spawn
is defined in terms of spawnAsync
and expect
)
spawnSupervised :: NodeId -> Closure (Process ()) -> Process (ProcessId, MonitorRef)Source
Spawn a child process, have the child link to the parent and the parent monitor the child
spawnMonitor :: NodeId -> Closure (Process ()) -> Process (ProcessId, MonitorRef)Source
Like spawnLink
, but monitor the spawned process
spawnChannel :: forall a. Typeable a => Static (SerializableDict a) -> NodeId -> Closure (ReceivePort a -> Process ()) -> Process (SendPort a)Source
Spawn a new process, supplying it with a new ReceivePort
and return
the corresponding SendPort
.