Safe Haskell | None |
---|
Cloud Haskell primitives
We define these in a separate module so that we don't have to rely on the closure combinators
- send :: Serializable a => ProcessId -> a -> Process ()
- expect :: forall a. Serializable a => Process a
- newChan :: Serializable a => Process (SendPort a, ReceivePort a)
- sendChan :: Serializable a => SendPort a -> a -> Process ()
- receiveChan :: Serializable a => ReceivePort a -> Process a
- mergePortsBiased :: Serializable a => [ReceivePort a] -> Process (ReceivePort a)
- mergePortsRR :: Serializable a => [ReceivePort a] -> Process (ReceivePort a)
- data Match b
- receiveWait :: [Match b] -> Process b
- receiveTimeout :: Int -> [Match b] -> Process (Maybe b)
- match :: forall a b. Serializable a => (a -> Process b) -> Match b
- matchIf :: forall a b. Serializable a => (a -> Bool) -> (a -> Process b) -> Match b
- matchUnknown :: Process b -> Match b
- data AbstractMessage = AbstractMessage {
- forward :: ProcessId -> Process ()
- maybeHandleMessage :: forall a b. Serializable a => (a -> Process b) -> Process (Maybe b)
- matchAny :: forall b. (AbstractMessage -> Process b) -> Match b
- matchAnyIf :: forall a b. Serializable a => (a -> Bool) -> (AbstractMessage -> Process b) -> Match b
- matchChan :: ReceivePort a -> (a -> Process b) -> Match b
- terminate :: Process a
- data ProcessTerminationException = ProcessTerminationException
- die :: Serializable a => a -> Process b
- kill :: ProcessId -> String -> Process ()
- exit :: Serializable a => ProcessId -> a -> Process ()
- catchExit :: forall a b. (Show a, Serializable a) => Process b -> (ProcessId -> a -> Process b) -> Process b
- catchesExit :: Process b -> [ProcessId -> AbstractMessage -> Process (Maybe b)] -> Process b
- data ProcessExitException
- getSelfPid :: Process ProcessId
- getSelfNode :: Process NodeId
- data ProcessInfo = ProcessInfo {
- infoNode :: NodeId
- infoRegisteredNames :: [String]
- infoMessageQueueLength :: Maybe Int
- infoMonitors :: [(ProcessId, MonitorRef)]
- infoLinks :: [ProcessId]
- getProcessInfo :: ProcessId -> Process (Maybe ProcessInfo)
- link :: ProcessId -> Process ()
- unlink :: ProcessId -> Process ()
- monitor :: ProcessId -> Process MonitorRef
- unmonitor :: MonitorRef -> Process ()
- withMonitor :: ProcessId -> Process a -> Process a
- say :: String -> Process ()
- register :: String -> ProcessId -> Process ()
- reregister :: String -> ProcessId -> Process ()
- unregister :: String -> Process ()
- whereis :: String -> Process (Maybe ProcessId)
- nsend :: Serializable a => String -> a -> Process ()
- registerRemoteAsync :: NodeId -> String -> ProcessId -> Process ()
- reregisterRemoteAsync :: NodeId -> String -> ProcessId -> Process ()
- unregisterRemoteAsync :: NodeId -> String -> Process ()
- whereisRemoteAsync :: NodeId -> String -> Process ()
- nsendRemote :: Serializable a => NodeId -> String -> a -> Process ()
- unClosure :: Typeable a => Closure a -> Process a
- unStatic :: Typeable a => Static a -> Process a
- catch :: Exception e => Process a -> (e -> Process a) -> Process a
- data Handler a = forall e . Exception e => Handler (e -> Process a)
- catches :: Process a -> [Handler a] -> Process a
- try :: Exception e => Process a -> Process (Either e a)
- mask :: ((forall a. Process a -> Process a) -> Process b) -> Process b
- onException :: Process a -> Process b -> Process a
- bracket :: Process a -> (a -> Process b) -> (a -> Process c) -> Process c
- bracket_ :: Process a -> Process b -> Process c -> Process c
- finally :: Process a -> Process b -> Process a
- expectTimeout :: forall a. Serializable a => Int -> Process (Maybe a)
- receiveChanTimeout :: Serializable a => Int -> ReceivePort a -> Process (Maybe a)
- spawnAsync :: NodeId -> Closure (Process ()) -> Process SpawnRef
- linkNode :: NodeId -> Process ()
- linkPort :: SendPort a -> Process ()
- unlinkNode :: NodeId -> Process ()
- unlinkPort :: SendPort a -> Process ()
- monitorNode :: NodeId -> Process MonitorRef
- monitorPort :: forall a. Serializable a => SendPort a -> Process MonitorRef
- reconnect :: ProcessId -> Process ()
- reconnectPort :: SendPort a -> Process ()
- trace :: String -> Process ()
Basic messaging
send :: Serializable a => ProcessId -> a -> Process ()Source
Send a message
expect :: forall a. Serializable a => Process aSource
Wait for a message of a specific type
Channels
newChan :: Serializable a => Process (SendPort a, ReceivePort a)Source
Create a new typed channel
sendChan :: Serializable a => SendPort a -> a -> Process ()Source
Send a message on a typed channel
receiveChan :: Serializable a => ReceivePort a -> Process aSource
Wait for a message on a typed channel
mergePortsBiased :: Serializable a => [ReceivePort a] -> Process (ReceivePort a)Source
Merge a list of typed channels.
The result port is left-biased: if there are messages available on more than one port, the first available message is returned.
mergePortsRR :: Serializable a => [ReceivePort a] -> Process (ReceivePort a)Source
Like mergePortsBiased
, but with a round-robin scheduler (rather than
left-biased)
Advanced messaging
Opaque type used in receiveWait
and receiveTimeout
receiveWait :: [Match b] -> Process bSource
Test the matches in order against each message in the queue
receiveTimeout :: Int -> [Match b] -> Process (Maybe b)Source
Like receiveWait
but with a timeout.
If the timeout is zero do a non-blocking check for matching messages. A non-zero timeout is applied only when waiting for incoming messages (that is, after we have checked the messages that are already in the mailbox).
match :: forall a b. Serializable a => (a -> Process b) -> Match bSource
Match against any message of the right type
matchIf :: forall a b. Serializable a => (a -> Bool) -> (a -> Process b) -> Match bSource
Match against any message of the right type that satisfies a predicate
matchUnknown :: Process b -> Match bSource
Remove any message from the queue
data AbstractMessage Source
Represents a received message and provides two basic operations on it.
AbstractMessage | |
|
matchAny :: forall b. (AbstractMessage -> Process b) -> Match bSource
Match against an arbitrary message. matchAny
removes the first available
message from the process mailbox, and via the AbstractMessage
type,
supports forwarding or handling the message if it is of the correct
type. If not of the right type, then the AbstractMessage
maybeHandleMessage
function will not evaluate the supplied expression,
but the message will still have been removed from the process mailbox!
matchAnyIf :: forall a b. Serializable a => (a -> Bool) -> (AbstractMessage -> Process b) -> Match bSource
Match against an arbitrary message. matchAnyIf
will only remove the
message from the process mailbox, if the supplied condition matches. The
success (or failure) of runtime type checks in maybeHandleMessage
does not
count here, i.e., if the condition evaluates to True
then the message will
be removed from the process mailbox and decoded, but that does not
guarantee that an expression passed to maybeHandleMessage
will pass the
runtime type checks and therefore be evaluated. If the types do not match
up, then maybeHandleMessage
returns Nothing
.
matchChan :: ReceivePort a -> (a -> Process b) -> Match bSource
Process management
data ProcessTerminationException Source
Thrown by terminate
die :: Serializable a => a -> Process bSource
Die immediately - throws a ProcessExitException
with the given reason
.
exit :: Serializable a => ProcessId -> a -> Process ()Source
Graceful request to exit a process. Throws ProcessExitException
with the
supplied reason
encoded as a message. Any exit signal raised in this
manner can be handled using the catchExit
family of functions.
catchExit :: forall a b. (Show a, Serializable a) => Process b -> (ProcessId -> a -> Process b) -> Process bSource
Catches ProcessExitException
. The handler will not be applied unless its
type matches the encoded data stored in the exception (see the reason
argument given to the exit
primitive). If the handler cannot be applied,
the exception will be re-thrown.
To handle ProcessExitException
without regard for reason, see catch
.
To handle multiple reasons of differing types, see catchesExit
.
catchesExit :: Process b -> [ProcessId -> AbstractMessage -> Process (Maybe b)] -> Process bSource
Lift catches
(almost).
As ProcessExitException
stores the exit reason
as a typed, encoded
message, a handler must accept an input of the expected type. In order to
handle a list of potentially different handlers (and therefore input types),
a handler passed to catchesExit
must accept AbstractMessage
and return
Maybe
(i.e., Just p
if it handled the exit reason, otherwise Nothing
).
See maybeHandleMessage
and AsbtractMessage
for more details.
data ProcessExitException Source
Internal exception thrown indirectly by exit
getSelfPid :: Process ProcessIdSource
Our own process ID
getSelfNode :: Process NodeIdSource
Get the node ID of our local node
data ProcessInfo Source
Provide information about a running process
ProcessInfo | |
|
getProcessInfo :: ProcessId -> Process (Maybe ProcessInfo)Source
Get information about the specified process
Monitoring and linking
link :: ProcessId -> Process ()Source
Link to a remote process (asynchronous)
When process A links to process B (that is, process A calls
link pidB
) then an asynchronous exception will be thrown to process A
when process B terminates (normally or abnormally), or when process A gets
disconnected from process B. Although it is technically possible to catch
these exceptions, chances are if you find yourself trying to do so you should
probably be using monitor
rather than link
. In particular, code such as
link pidB -- Link to process B expect -- Wait for a message from process B unlink pidB -- Unlink again
doesn't quite do what one might expect: if process B sends a message to
process A, and subsequently terminates, then process A might or might not
be terminated too, depending on whether the exception is thrown before or
after the unlink
(i.e., this code has a race condition).
Linking is all-or-nothing: A is either linked to B, or it's not. A second
call to link
has no effect.
Note that link
provides unidirectional linking (see spawnSupervised
).
Linking makes no distinction between normal and abnormal termination of
the remote process.
unlink :: ProcessId -> Process ()Source
Remove a link
This is synchronous in the sense that once it returns you are guaranteed that no exception will be raised if the remote process dies. However, it is asynchronous in the sense that we do not wait for a response from the remote node.
monitor :: ProcessId -> Process MonitorRefSource
Monitor another process (asynchronous)
When process A monitors process B (that is, process A calls
monitor pidB
) then process A will receive a ProcessMonitorNotification
when process B terminates (normally or abnormally), or when process A gets
disconnected from process B. You receive this message like any other (using
expect
); the notification includes a reason (DiedNormal
, DiedException
,
DiedDisconnect
, etc.).
Every call to monitor
returns a new monitor reference MonitorRef
; if
multiple monitors are set up, multiple notifications will be delivered
and monitors can be disabled individually using unmonitor
.
unmonitor :: MonitorRef -> Process ()Source
Remove a monitor
This has the same synchronous/asynchronous nature as unlink
.
withMonitor :: ProcessId -> Process a -> Process aSource
Establishes temporary monitoring of another process.
withMonitor pid code
sets up monitoring of pid
for the duration
of code
. Note: although monitoring is no longer active when
withMonitor
returns, there might still be unreceived monitor
messages in the queue.
Logging
say :: String -> Process ()Source
Log a string
say message
sends a message (time, pid of the current process, message)
to the process registered as logger
. By default, this process simply
sends the string to stderr
. Individual Cloud Haskell backends might
replace this with a different logger process, however.
Registry
register :: String -> ProcessId -> Process ()Source
Register a process with the local registry (asynchronous).
This version will wait until a response is gotten from the
management process. The name must not already be registered.
The process need not be on this node.
A bad registration will result in a ProcessRegistrationException
The process to be registered does not have to be local itself.
reregister :: String -> ProcessId -> Process ()Source
Like register
, but will replace an existing registration.
The name must already be registered.
unregister :: String -> Process ()Source
Remove a process from the local registry (asynchronous). This version will wait until a response is gotten from the management process. The name must already be registered.
nsend :: Serializable a => String -> a -> Process ()Source
Named send to a process in the local registry (asynchronous)
registerRemoteAsync :: NodeId -> String -> ProcessId -> Process ()Source
Register a process with a remote registry (asynchronous).
The process to be registered does not have to live on the same remote node.
Reply wil come in the form of a RegisterReply
message
See comments in whereisRemoteAsync
unregisterRemoteAsync :: NodeId -> String -> Process ()Source
Remove a process from a remote registry (asynchronous).
Reply wil come in the form of a RegisterReply
message
See comments in whereisRemoteAsync
whereisRemoteAsync :: NodeId -> String -> Process ()Source
Query a remote process registry (asynchronous)
Reply will come in the form of a WhereIsReply
message.
There is currently no synchronous version of whereisRemoteAsync
: if
you implement one yourself, be sure to take into account that the remote
node might die or get disconnect before it can respond (i.e. you should
use monitorNode
and take appropriate action when you receive a
NodeMonitorNotification
).
nsendRemote :: Serializable a => NodeId -> String -> a -> Process ()Source
Named send to a process in a remote registry (asynchronous)
Closures
Exception handling
You need this when using catches
onException :: Process a -> Process b -> Process aSource
Lift onException
Auxiliary API
expectTimeout :: forall a. Serializable a => Int -> Process (Maybe a)Source
Like expect
but with a timeout
receiveChanTimeout :: Serializable a => Int -> ReceivePort a -> Process (Maybe a)Source
Like receiveChan
but with a timeout. If the timeout is 0, do a
non-blocking check for a message.
spawnAsync :: NodeId -> Closure (Process ()) -> Process SpawnRefSource
Asynchronous version of spawn
(spawn
is defined in terms of spawnAsync
and expect
)
unlinkNode :: NodeId -> Process ()Source
Remove a node link
This has the same synchronous/asynchronous nature as unlink
.
unlinkPort :: SendPort a -> Process ()Source
Remove a channel (send port) link
This has the same synchronous/asynchronous nature as unlink
.
monitorNode :: NodeId -> Process MonitorRefSource
Monitor a node (asynchronous)
monitorPort :: forall a. Serializable a => SendPort a -> Process MonitorRefSource
Monitor a typed channel (asynchronous)
Reconnecting
reconnect :: ProcessId -> Process ()Source
Cloud Haskell provides the illusion of connection-less, reliable, ordered
message passing. However, when network connections get disrupted this
illusion cannot always be maintained. Once a network connection breaks (even
temporarily) no further communication on that connection will be possible.
For example, if process A sends a message to process B, and A is then
notified (by monitor notification) that it got disconnected from B, A will
not be able to send any further messages to B, unless A explicitly
indicates that it is acceptable to attempt to reconnect to B using the
Cloud Haskell reconnect
primitive.
Importantly, when A calls reconnect
it acknowledges that some messages to
B might have been lost. For instance, if A sends messages m1 and m2 to B,
then receives a monitor notification that its connection to B has been lost,
calls reconnect
and then sends m3, it is possible that B will receive m1
and m3 but not m2.
Note that reconnect
does not mean reconnect now but rather /it is okay
to attempt to reconnect on the next send/. In particular, if no further
communication attempts are made to B then A can use reconnect to clean up
its connection to B.
reconnectPort :: SendPort a -> Process ()Source
Reconnect to a sendport. See reconnect
for more information.
Tracing/Debugging
trace :: String -> Process ()Source
Send a message to the internal (system) trace facility. If tracing is enabled, this will create a custom trace event. Note that several Cloud Haskell sub-systems also generate trace events for informational/debugging purposes, thus traces generated this way will not be the only output seen.
Just as with the Debug.Trace module, this is a debugging/tracing facility
for use in development, and should not be used in a production setting -
which is why the default behaviour is to trace to the GHC eventlog. For a
general purpose logging facility, you should consider say
.
Trace events can be written to the GHC event log, a text file, or to the
standard system logger process (see say
). The default behaviour for writing
to the eventlog requires specific intervention to work, without which traces
are silently dropped/ignored and no output will be generated.
The GHC eventlog documentation provides information about enabling, viewing
and working with event traces: http://hackage.haskell.org/trac/ghc/wiki/EventLog.
When a new local node is started, the contents of the environment variable
DISTRIBUTED_PROCESS_TRACE_FILE
are checked for a valid file path. If this
exists and the file can be opened for writing, all trace output will be directed
thence. If the environment variable is empty, the path invalid, or the file
unavailable for writing - e.g., because another node has already started
tracing to it - then the DISTRIBUTED_PROCESS_TRACE_CONSOLE
environment
variable is checked for any non-empty value. If this is set, then all trace
output will be directed to the system logger process. If neither evironment
variable provides a valid trace configuration, all internal traces are written
to Debug.Trace.traceEventIO, which writes to the GHC eventlog.
Users of the simplelocalnet Cloud Haskell backend should also note that
because the trace file option only supports trace output from a single node
(so as to avoid interleaving), a file trace configured for the master node will
prevent slaves from tracing to the file and they will fall back to using the
console/say
or eventlog instead.