distributed-process- Cloud Haskell: Erlang-style concurrency in Haskell

Safe HaskellNone




Cloud Haskell

This is an implementation of Cloud Haskell, as described in Towards Haskell in the Cloud by Jeff Epstein, Andrew Black, and Simon Peyton Jones (http://research.microsoft.com/en-us/um/people/simonpj/papers/parallel/), although some of the details are different. The precise message passing semantics are based on A unified semantics for future Erlang by Hans Svensson, Lars-Åke Fredlund and Clara Benac Earle.


Basic types

data NodeId Source

Node identifier

data SendPortId Source

A send port is identified by a SendPortId.

You cannot send directly to a SendPortId; instead, use newChan to create a SendPort.

processNodeId :: ProcessId -> NodeIdSource

The ID of the node the process is running on

sendPortProcessId :: SendPortId -> ProcessIdSource

The ID of the process that will receive messages sent on this port

liftIO :: MonadIO m => forall a. IO a -> m a

Lift a computation from the IO monad.

Basic messaging

send :: Serializable a => ProcessId -> a -> Process ()Source

Send a message

expect :: forall a. Serializable a => Process aSource

Wait for a message of a specific type


data ReceivePort a Source

The receive end of a typed channel (not serializable)

data SendPort a Source

The send send of a typed channel (serializable)

sendPortId :: SendPort a -> SendPortIdSource

The (unique) ID of this send port

newChan :: Serializable a => Process (SendPort a, ReceivePort a)Source

Create a new typed channel

sendChan :: Serializable a => SendPort a -> a -> Process ()Source

Send a message on a typed channel

receiveChan :: Serializable a => ReceivePort a -> Process aSource

Wait for a message on a typed channel

mergePortsBiased :: Serializable a => [ReceivePort a] -> Process (ReceivePort a)Source

Merge a list of typed channels.

The result port is left-biased: if there are messages available on more than one port, the first available message is returned.

mergePortsRR :: Serializable a => [ReceivePort a] -> Process (ReceivePort a)Source

Like mergePortsBiased, but with a round-robin scheduler (rather than left-biased)

Advanced messaging

data Match b Source

Opaque type used in receiveWait and receiveTimeout

receiveWait :: [Match b] -> Process bSource

Test the matches in order against each message in the queue

receiveTimeout :: Int -> [Match b] -> Process (Maybe b)Source

Like receiveWait but with a timeout.

If the timeout is zero do a non-blocking check for matching messages. A non-zero timeout is applied only when waiting for incoming messages (that is, after we have checked the messages that are already in the mailbox).

match :: forall a b. Serializable a => (a -> Process b) -> Match bSource

Match against any message of the right type

matchIf :: forall a b. Serializable a => (a -> Bool) -> (a -> Process b) -> Match bSource

Match against any message of the right type that satisfies a predicate

matchUnknown :: Process b -> Match bSource

Remove any message from the queue

Process management

spawn :: NodeId -> Closure (Process ()) -> Process ProcessIdSource

Spawn a process

call :: Serializable a => Static (SerializableDict a) -> NodeId -> Closure (Process a) -> Process aSource

Run a process remotely and wait for it to reply

We monitor the remote process; if it dies before it can send a reply, we die too

terminate :: Process aSource

Terminate (throws a ProcessTerminationException)

data SpawnRef Source

SpawnRef are used to return pids of spawned processes

getSelfPid :: Process ProcessIdSource

Our own process ID

getSelfNode :: Process NodeIdSource

Get the node ID of our local node

Monitoring and linking

link :: ProcessId -> Process ()Source

Link to a remote process (asynchronous)

Note that link provides unidirectional linking (see spawnSupervised). Linking makes no distinction between normal and abnormal termination of the remote process.

linkNode :: NodeId -> Process ()Source

Link to a node

linkPort :: SendPort a -> Process ()Source

Link to a channel (send port)

unlink :: ProcessId -> Process ()Source

Remove a link (synchronous)

unlinkNode :: NodeId -> Process ()Source

Remove a node link (synchronous)

unlinkPort :: SendPort a -> Process ()Source

Remove a channel (send port) link (synchronous)

monitor :: ProcessId -> Process MonitorRefSource

Monitor another process (asynchronous)

monitorPort :: forall a. Serializable a => SendPort a -> Process MonitorRefSource

Monitor a typed channel

unmonitor :: MonitorRef -> Process ()Source

Remove a monitor (synchronous)

data PortLinkException Source

Exception thrown when a linked channel (port) dies

data MonitorRef Source

MonitorRef is opaque for regular Cloud Haskell processes

data DiedReason Source

Why did a process die?



Normal termination

DiedException String

The process exited with an exception (provided as String because Exception does not implement Binary)


We got disconnected from the process node


The process node died


Invalid (processnodechannel) identifier


data Closure a Source

A closure is a static value and an encoded environment

data Static a Source

A static value is top-level bound or the application of two static values


unClosure :: forall a. Typeable a => Closure a -> Process aSource

Deserialize a closure

data RemoteTable Source

Runtime dictionary for unstatic lookups


say :: String -> Process ()Source

Log a string

say message sends a message (time, pid of the current process, message) to the process registered as logger. By default, this process simply sends the string to stderr. Individual Cloud Haskell backends might replace this with a different logger process, however.


register :: String -> ProcessId -> Process ()Source

Register a process with the local registry (asynchronous).

The process to be registered does not have to be local itself.

unregister :: String -> Process ()Source

Remove a process from the local registry (asynchronous).

whereis :: String -> Process (Maybe ProcessId)Source

Query the local process registry (synchronous).

nsend :: Serializable a => String -> a -> Process ()Source

Named send to a process in the local registry (asynchronous)

registerRemote :: NodeId -> String -> ProcessId -> Process ()Source

Register a process with a remote registry (asynchronous).

The process to be registered does not have to live on the same remote node.

unregisterRemote :: NodeId -> String -> Process ()Source

Remove a process from a remote registry (asynchronous).

whereisRemote :: NodeId -> String -> Process (Maybe ProcessId)Source

Query a remote process registry (synchronous)

whereisRemoteAsync :: NodeId -> String -> Process ()Source

Query a remote process registry (asynchronous)

Reply will come in the form of a WhereIsReply message

nsendRemote :: Serializable a => NodeId -> String -> a -> Process ()Source

Named send to a process in a remote registry (asynchronous)

data WhereIsReply Source

(Asynchronous) reply from whereis

Auxiliary API

catch :: Exception e => Process a -> (e -> Process a) -> Process aSource

Catch exceptions within a process

expectTimeout :: forall a. Serializable a => Int -> Process (Maybe a)Source

Like expect but with a timeout

spawnAsync :: NodeId -> Closure (Process ()) -> Process SpawnRefSource

Asynchronous version of spawn

(spawn is defined in terms of spawnAsync and expect)

spawnSupervised :: NodeId -> Closure (Process ()) -> Process (ProcessId, MonitorRef)Source

Spawn a child process, have the child link to the parent and the parent monitor the child

spawnLink :: NodeId -> Closure (Process ()) -> Process ProcessIdSource

Spawn a process and link to it

Note that this is just the sequential composition of spawn and link. (The Unified semantics that underlies Cloud Haskell does not even support a synchronous link operation)

spawnMonitor :: NodeId -> Closure (Process ()) -> Process (ProcessId, MonitorRef)Source

Like spawnLink, but monitor the spawned process

spawnChannel :: forall a. Typeable a => Static (SerializableDict a) -> NodeId -> Closure (ReceivePort a -> Process ()) -> Process (SendPort a)Source

Spawn a new process, supplying it with a new ReceivePort and return the corresponding SendPort.

data DidSpawn Source

(Asynchronius) reply from spawn