distributed-process-0.7.5: Cloud Haskell: Erlang-style concurrency in Haskell
Safe HaskellNone
LanguageHaskell2010

Control.Distributed.Process

Description

Cloud Haskell

This is an implementation of Cloud Haskell, as described in Towards Haskell in the Cloud by Jeff Epstein, Andrew Black, and Simon Peyton Jones (see http://research.microsoft.com/en-us/um/people/simonpj/papers/parallel/), although some of the details are different. The precise message passing semantics are based on A unified semantics for future Erlang by Hans Svensson, Lars-Åke Fredlund and Clara Benac Earle.

For a detailed description of the package and other reference materials, please see the distributed-process wiki page on github: https://github.com/haskell-distributed/distributed-process/wiki.

Synopsis

Basic types

data ProcessId Source #

Process identifier

Instances

Instances details
Data ProcessId Source # 
Instance details

Defined in Control.Distributed.Process.Internal.Types

Methods

gfoldl :: (forall d b. Data d => c (d -> b) -> d -> c b) -> (forall g. g -> c g) -> ProcessId -> c ProcessId #

gunfold :: (forall b r. Data b => c (b -> r) -> c r) -> (forall r. r -> c r) -> Constr -> c ProcessId #

toConstr :: ProcessId -> Constr #

dataTypeOf :: ProcessId -> DataType #

dataCast1 :: Typeable t => (forall d. Data d => c (t d)) -> Maybe (c ProcessId) #

dataCast2 :: Typeable t => (forall d e. (Data d, Data e) => c (t d e)) -> Maybe (c ProcessId) #

gmapT :: (forall b. Data b => b -> b) -> ProcessId -> ProcessId #

gmapQl :: (r -> r' -> r) -> r -> (forall d. Data d => d -> r') -> ProcessId -> r #

gmapQr :: forall r r'. (r' -> r -> r) -> r -> (forall d. Data d => d -> r') -> ProcessId -> r #

gmapQ :: (forall d. Data d => d -> u) -> ProcessId -> [u] #

gmapQi :: Int -> (forall d. Data d => d -> u) -> ProcessId -> u #

gmapM :: Monad m => (forall d. Data d => d -> m d) -> ProcessId -> m ProcessId #

gmapMp :: MonadPlus m => (forall d. Data d => d -> m d) -> ProcessId -> m ProcessId #

gmapMo :: MonadPlus m => (forall d. Data d => d -> m d) -> ProcessId -> m ProcessId #

Generic ProcessId Source # 
Instance details

Defined in Control.Distributed.Process.Internal.Types

Associated Types

type Rep ProcessId 
Instance details

Defined in Control.Distributed.Process.Internal.Types

type Rep ProcessId = D1 ('MetaData "ProcessId" "Control.Distributed.Process.Internal.Types" "distributed-process-0.7.5-14hdXLMmdeU5goWgtdXS4F" 'False) (C1 ('MetaCons "ProcessId" 'PrefixI 'True) (S1 ('MetaSel ('Just "processNodeId") 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 NodeId) :*: S1 ('MetaSel ('Just "processLocalId") 'SourceUnpack 'SourceStrict 'DecidedStrict) (Rec0 LocalProcessId)))
Show ProcessId Source # 
Instance details

Defined in Control.Distributed.Process.Internal.Types

Binary ProcessId Source # 
Instance details

Defined in Control.Distributed.Process.Internal.Types

NFData ProcessId Source # 
Instance details

Defined in Control.Distributed.Process.Internal.Types

Methods

rnf :: ProcessId -> () #

Eq ProcessId Source # 
Instance details

Defined in Control.Distributed.Process.Internal.Types

Ord ProcessId Source # 
Instance details

Defined in Control.Distributed.Process.Internal.Types

Hashable ProcessId Source # 
Instance details

Defined in Control.Distributed.Process.Internal.Types

type Rep ProcessId Source # 
Instance details

Defined in Control.Distributed.Process.Internal.Types

type Rep ProcessId = D1 ('MetaData "ProcessId" "Control.Distributed.Process.Internal.Types" "distributed-process-0.7.5-14hdXLMmdeU5goWgtdXS4F" 'False) (C1 ('MetaCons "ProcessId" 'PrefixI 'True) (S1 ('MetaSel ('Just "processNodeId") 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 NodeId) :*: S1 ('MetaSel ('Just "processLocalId") 'SourceUnpack 'SourceStrict 'DecidedStrict) (Rec0 LocalProcessId)))

newtype NodeId Source #

Node identifier

Constructors

NodeId 

Instances

Instances details
Data NodeId Source # 
Instance details

Defined in Control.Distributed.Process.Internal.Types

Methods

gfoldl :: (forall d b. Data d => c (d -> b) -> d -> c b) -> (forall g. g -> c g) -> NodeId -> c NodeId #

gunfold :: (forall b r. Data b => c (b -> r) -> c r) -> (forall r. r -> c r) -> Constr -> c NodeId #

toConstr :: NodeId -> Constr #

dataTypeOf :: NodeId -> DataType #

dataCast1 :: Typeable t => (forall d. Data d => c (t d)) -> Maybe (c NodeId) #

dataCast2 :: Typeable t => (forall d e. (Data d, Data e) => c (t d e)) -> Maybe (c NodeId) #

gmapT :: (forall b. Data b => b -> b) -> NodeId -> NodeId #

gmapQl :: (r -> r' -> r) -> r -> (forall d. Data d => d -> r') -> NodeId -> r #

gmapQr :: forall r r'. (r' -> r -> r) -> r -> (forall d. Data d => d -> r') -> NodeId -> r #

gmapQ :: (forall d. Data d => d -> u) -> NodeId -> [u] #

gmapQi :: Int -> (forall d. Data d => d -> u) -> NodeId -> u #

gmapM :: Monad m => (forall d. Data d => d -> m d) -> NodeId -> m NodeId #

gmapMp :: MonadPlus m => (forall d. Data d => d -> m d) -> NodeId -> m NodeId #

gmapMo :: MonadPlus m => (forall d. Data d => d -> m d) -> NodeId -> m NodeId #

Generic NodeId Source # 
Instance details

Defined in Control.Distributed.Process.Internal.Types

Associated Types

type Rep NodeId 
Instance details

Defined in Control.Distributed.Process.Internal.Types

type Rep NodeId = D1 ('MetaData "NodeId" "Control.Distributed.Process.Internal.Types" "distributed-process-0.7.5-14hdXLMmdeU5goWgtdXS4F" 'True) (C1 ('MetaCons "NodeId" 'PrefixI 'True) (S1 ('MetaSel ('Just "nodeAddress") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 EndPointAddress)))

Methods

from :: NodeId -> Rep NodeId x #

to :: Rep NodeId x -> NodeId #

Show NodeId Source # 
Instance details

Defined in Control.Distributed.Process.Internal.Types

Binary NodeId Source # 
Instance details

Defined in Control.Distributed.Process.Internal.Types

Methods

put :: NodeId -> Put #

get :: Get NodeId #

putList :: [NodeId] -> Put #

NFData NodeId Source # 
Instance details

Defined in Control.Distributed.Process.Internal.Types

Methods

rnf :: NodeId -> () #

Eq NodeId Source # 
Instance details

Defined in Control.Distributed.Process.Internal.Types

Methods

(==) :: NodeId -> NodeId -> Bool #

(/=) :: NodeId -> NodeId -> Bool #

Ord NodeId Source # 
Instance details

Defined in Control.Distributed.Process.Internal.Types

Hashable NodeId Source # 
Instance details

Defined in Control.Distributed.Process.Internal.Types

Methods

hashWithSalt :: Int -> NodeId -> Int #

hash :: NodeId -> Int #

type Rep NodeId Source # 
Instance details

Defined in Control.Distributed.Process.Internal.Types

type Rep NodeId = D1 ('MetaData "NodeId" "Control.Distributed.Process.Internal.Types" "distributed-process-0.7.5-14hdXLMmdeU5goWgtdXS4F" 'True) (C1 ('MetaCons "NodeId" 'PrefixI 'True) (S1 ('MetaSel ('Just "nodeAddress") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 EndPointAddress)))

data Process a Source #

The Cloud Haskell Process type

Instances

Instances details
MonadFail Process Source # 
Instance details

Defined in Control.Distributed.Process.Internal.Types

Methods

fail :: String -> Process a #

MonadFix Process Source # 
Instance details

Defined in Control.Distributed.Process.Internal.Types

Methods

mfix :: (a -> Process a) -> Process a #

MonadIO Process Source # 
Instance details

Defined in Control.Distributed.Process.Internal.Types

Methods

liftIO :: IO a -> Process a #

Applicative Process Source # 
Instance details

Defined in Control.Distributed.Process.Internal.Types

Methods

pure :: a -> Process a #

(<*>) :: Process (a -> b) -> Process a -> Process b #

liftA2 :: (a -> b -> c) -> Process a -> Process b -> Process c #

(*>) :: Process a -> Process b -> Process b #

(<*) :: Process a -> Process b -> Process a #

Functor Process Source # 
Instance details

Defined in Control.Distributed.Process.Internal.Types

Methods

fmap :: (a -> b) -> Process a -> Process b #

(<$) :: a -> Process b -> Process a #

Monad Process Source # 
Instance details

Defined in Control.Distributed.Process.Internal.Types

Methods

(>>=) :: Process a -> (a -> Process b) -> Process b #

(>>) :: Process a -> Process b -> Process b #

return :: a -> Process a #

MonadCatch Process Source # 
Instance details

Defined in Control.Distributed.Process.Internal.Types

Methods

catch :: (HasCallStack, Exception e) => Process a -> (e -> Process a) -> Process a #

MonadMask Process Source # 
Instance details

Defined in Control.Distributed.Process.Internal.Types

Methods

mask :: HasCallStack => ((forall a. Process a -> Process a) -> Process b) -> Process b #

uninterruptibleMask :: HasCallStack => ((forall a. Process a -> Process a) -> Process b) -> Process b #

generalBracket :: HasCallStack => Process a -> (a -> ExitCase b -> Process c) -> (a -> Process b) -> Process (b, c) #

MonadThrow Process Source # 
Instance details

Defined in Control.Distributed.Process.Internal.Types

Methods

throwM :: (HasCallStack, Exception e) => e -> Process a #

MonadReader LocalProcess Process Source # 
Instance details

Defined in Control.Distributed.Process.Internal.Types

Serializable b => MkTDict (Process b) Source # 
Instance details

Defined in Control.Distributed.Process.Internal.Closure.Explicit

data SendPortId Source #

A send port is identified by a SendPortId.

You cannot send directly to a SendPortId; instead, use newChan to create a SendPort.

Instances

Instances details
Generic SendPortId Source # 
Instance details

Defined in Control.Distributed.Process.Internal.Types

Associated Types

type Rep SendPortId 
Instance details

Defined in Control.Distributed.Process.Internal.Types

type Rep SendPortId = D1 ('MetaData "SendPortId" "Control.Distributed.Process.Internal.Types" "distributed-process-0.7.5-14hdXLMmdeU5goWgtdXS4F" 'False) (C1 ('MetaCons "SendPortId" 'PrefixI 'True) (S1 ('MetaSel ('Just "sendPortProcessId") 'SourceUnpack 'SourceStrict 'DecidedStrict) (Rec0 ProcessId) :*: S1 ('MetaSel ('Just "sendPortLocalId") 'SourceUnpack 'SourceStrict 'DecidedStrict) (Rec0 LocalSendPortId)))
Show SendPortId Source # 
Instance details

Defined in Control.Distributed.Process.Internal.Types

Binary SendPortId Source # 
Instance details

Defined in Control.Distributed.Process.Internal.Types

NFData SendPortId Source # 
Instance details

Defined in Control.Distributed.Process.Internal.Types

Methods

rnf :: SendPortId -> () #

Eq SendPortId Source # 
Instance details

Defined in Control.Distributed.Process.Internal.Types

Ord SendPortId Source # 
Instance details

Defined in Control.Distributed.Process.Internal.Types

Hashable SendPortId Source # 
Instance details

Defined in Control.Distributed.Process.Internal.Types

type Rep SendPortId Source # 
Instance details

Defined in Control.Distributed.Process.Internal.Types

type Rep SendPortId = D1 ('MetaData "SendPortId" "Control.Distributed.Process.Internal.Types" "distributed-process-0.7.5-14hdXLMmdeU5goWgtdXS4F" 'False) (C1 ('MetaCons "SendPortId" 'PrefixI 'True) (S1 ('MetaSel ('Just "sendPortProcessId") 'SourceUnpack 'SourceStrict 'DecidedStrict) (Rec0 ProcessId) :*: S1 ('MetaSel ('Just "sendPortLocalId") 'SourceUnpack 'SourceStrict 'DecidedStrict) (Rec0 LocalSendPortId)))

processNodeId :: ProcessId -> NodeId Source #

The ID of the node the process is running on

sendPortProcessId :: SendPortId -> ProcessId Source #

The ID of the process that will receive messages sent on this port

liftIO :: MonadIO m => IO a -> m a #

Lift a computation from the IO monad. This allows us to run IO computations in any monadic stack, so long as it supports these kinds of operations (i.e. IO is the base monad for the stack).

Example

Expand
import Control.Monad.Trans.State -- from the "transformers" library

printState :: Show s => StateT s IO ()
printState = do
  state <- get
  liftIO $ print state

Had we omitted liftIO, we would have ended up with this error:

• Couldn't match type ‘IO’ with ‘StateT s IO’
 Expected type: StateT s IO ()
   Actual type: IO ()

The important part here is the mismatch between StateT s IO () and IO ().

Luckily, we know of a function that takes an IO a and returns an (m a): liftIO, enabling us to run the program and see the expected results:

> evalStateT printState "hello"
"hello"

> evalStateT printState 3
3

Basic messaging

send :: Serializable a => ProcessId -> a -> Process () Source #

Send a message

usend :: Serializable a => ProcessId -> a -> Process () Source #

Send a message unreliably.

Unlike send, this function is insensitive to reconnect. It will try to send the message regardless of the history of connection failures between the nodes.

Message passing with usend is ordered for a given sender and receiver if the messages arrive at all.

expect :: Serializable a => Process a Source #

Wait for a message of a specific type

expectTimeout :: Serializable a => Int -> Process (Maybe a) Source #

Like expect but with a timeout

Channels

data ReceivePort a Source #

The receive end of a typed channel (not serializable)

Note that ReceivePort implements Functor, Applicative, Alternative and Monad. This is especially useful when merging receive ports.

data SendPort a Source #

The send send of a typed channel (serializable)

Instances

Instances details
Generic (SendPort a) Source # 
Instance details

Defined in Control.Distributed.Process.Internal.Types

Associated Types

type Rep (SendPort a) 
Instance details

Defined in Control.Distributed.Process.Internal.Types

type Rep (SendPort a) = D1 ('MetaData "SendPort" "Control.Distributed.Process.Internal.Types" "distributed-process-0.7.5-14hdXLMmdeU5goWgtdXS4F" 'True) (C1 ('MetaCons "SendPort" 'PrefixI 'True) (S1 ('MetaSel ('Just "sendPortId") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 SendPortId)))

Methods

from :: SendPort a -> Rep (SendPort a) x #

to :: Rep (SendPort a) x -> SendPort a #

Show (SendPort a) Source # 
Instance details

Defined in Control.Distributed.Process.Internal.Types

Methods

showsPrec :: Int -> SendPort a -> ShowS #

show :: SendPort a -> String #

showList :: [SendPort a] -> ShowS #

Serializable a => Binary (SendPort a) Source # 
Instance details

Defined in Control.Distributed.Process.Internal.Types

Methods

put :: SendPort a -> Put #

get :: Get (SendPort a) #

putList :: [SendPort a] -> Put #

NFData a => NFData (SendPort a) Source # 
Instance details

Defined in Control.Distributed.Process.Internal.Types

Methods

rnf :: SendPort a -> () #

Eq (SendPort a) Source # 
Instance details

Defined in Control.Distributed.Process.Internal.Types

Methods

(==) :: SendPort a -> SendPort a -> Bool #

(/=) :: SendPort a -> SendPort a -> Bool #

Ord (SendPort a) Source # 
Instance details

Defined in Control.Distributed.Process.Internal.Types

Methods

compare :: SendPort a -> SendPort a -> Ordering #

(<) :: SendPort a -> SendPort a -> Bool #

(<=) :: SendPort a -> SendPort a -> Bool #

(>) :: SendPort a -> SendPort a -> Bool #

(>=) :: SendPort a -> SendPort a -> Bool #

max :: SendPort a -> SendPort a -> SendPort a #

min :: SendPort a -> SendPort a -> SendPort a #

Hashable a => Hashable (SendPort a) Source # 
Instance details

Defined in Control.Distributed.Process.Internal.Types

Methods

hashWithSalt :: Int -> SendPort a -> Int #

hash :: SendPort a -> Int #

type Rep (SendPort a) Source # 
Instance details

Defined in Control.Distributed.Process.Internal.Types

type Rep (SendPort a) = D1 ('MetaData "SendPort" "Control.Distributed.Process.Internal.Types" "distributed-process-0.7.5-14hdXLMmdeU5goWgtdXS4F" 'True) (C1 ('MetaCons "SendPort" 'PrefixI 'True) (S1 ('MetaSel ('Just "sendPortId") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 SendPortId)))

sendPortId :: SendPort a -> SendPortId Source #

The (unique) ID of this send port

newChan :: Serializable a => Process (SendPort a, ReceivePort a) Source #

Create a new typed channel, bound to the calling Process

Note that the channel is bound to the lifecycle of the process that evaluates this function, such that when it dies/exits, the channel will no longer function, but will remain accessible. Thus reading from the ReceivePort will fail silently thereafter, blocking indefinitely (unless a timeout is used).

sendChan :: Serializable a => SendPort a -> a -> Process () Source #

Send a message on a typed channel

receiveChan :: Serializable a => ReceivePort a -> Process a Source #

Wait for a message on a typed channel

receiveChanTimeout :: Serializable a => Int -> ReceivePort a -> Process (Maybe a) Source #

Like receiveChan but with a timeout. If the timeout is 0, do a non-blocking check for a message.

mergePortsBiased :: Serializable a => [ReceivePort a] -> Process (ReceivePort a) Source #

Merge a list of typed channels.

The result port is left-biased: if there are messages available on more than one port, the first available message is returned.

mergePortsRR :: Serializable a => [ReceivePort a] -> Process (ReceivePort a) Source #

Like mergePortsBiased, but with a round-robin scheduler (rather than left-biased)

Unsafe messaging variants

unsafeSend :: Serializable a => ProcessId -> a -> Process () Source #

Unsafe variant of send. This function makes no attempt to serialize and (in the case when the destination process resides on the same local node) therefore ensure that the payload is fully evaluated before it is delivered.

unsafeUSend :: Serializable a => ProcessId -> a -> Process () Source #

Unsafe variant of usend. This function makes no attempt to serialize the message when the destination process resides on the same local node. Therefore, a local receiver would need to be prepared to cope with any errors resulting from evaluation of the message.

unsafeSendChan :: Serializable a => SendPort a -> a -> Process () Source #

Send a message on a typed channel. This function makes no attempt to serialize and (in the case when the ReceivePort resides on the same local node) therefore ensure that the payload is fully evaluated before it is delivered.

unsafeNSend :: Serializable a => String -> a -> Process () Source #

Named send to a process in the local registry (asynchronous). This function makes no attempt to serialize and (in the case when the destination process resides on the same local node) therefore ensure that the payload is fully evaluated before it is delivered.

unsafeNSendRemote :: Serializable a => NodeId -> String -> a -> Process () Source #

Named send to a process in a remote registry (asynchronous) This function makes no attempt to serialize and (in the case when the destination process resides on the same local node) therefore ensure that the payload is fully evaluated before it is delivered.

unsafeWrapMessage :: Serializable a => a -> Message Source #

This is the unsafe variant of wrapMessage. See Control.Distributed.Process.UnsafePrimitives for details.

Advanced messaging

data Match b Source #

Opaque type used in receiveWait and receiveTimeout

Instances

Instances details
Functor Match Source # 
Instance details

Defined in Control.Distributed.Process.Internal.Primitives

Methods

fmap :: (a -> b) -> Match a -> Match b #

(<$) :: a -> Match b -> Match a #

receiveWait :: [Match b] -> Process b Source #

Test the matches in order against each message in the queue

receiveTimeout :: Int -> [Match b] -> Process (Maybe b) Source #

Like receiveWait but with a timeout.

If the timeout is zero do a non-blocking check for matching messages. A non-zero timeout is applied only when waiting for incoming messages (that is, after we have checked the messages that are already in the mailbox).

match :: Serializable a => (a -> Process b) -> Match b Source #

Match against any message of the right type

matchIf :: Serializable a => (a -> Bool) -> (a -> Process b) -> Match b Source #

Match against any message of the right type that satisfies a predicate

matchUnknown :: Process b -> Match b Source #

Remove any message from the queue

matchAny :: (Message -> Process b) -> Match b Source #

Match against an arbitrary message. matchAny removes the first available message from the process mailbox. To handle arbitrary raw messages once removed from the mailbox, see handleMessage and unwrapMessage.

matchAnyIf :: Serializable a => (a -> Bool) -> (Message -> Process b) -> Match b Source #

Match against an arbitrary message. Intended for use with handleMessage and unwrapMessage, this function only removes a message from the process mailbox, if the supplied condition matches. The success (or failure) of runtime type checks deferred to handleMessage and friends is irrelevant here, i.e., if the condition evaluates to True then the message will be removed from the process mailbox and decoded, but that does not guarantee that an expression passed to handleMessage will pass the runtime type checks and therefore be evaluated.

matchChan :: ReceivePort a -> (a -> Process b) -> Match b Source #

Match on a typed channel

matchSTM :: STM a -> (a -> Process b) -> Match b Source #

Match on an arbitrary STM action.

This rather unusaul match primitive allows us to compose arbitrary STM actions with checks against our process' mailbox and/or any typed channel ReceivePorts we may hold.

This allows us to process multiple input streams along with our mailbox, in just the same way that matchChan supports checking both the mailbox and an arbitrary set of typed channels in one atomic transaction.

Note there are no ordering guarnatees with respect to these disparate input sources.

data Message Source #

Messages consist of their typeRep fingerprint and their encoding

Instances

Instances details
Show Message Source # 
Instance details

Defined in Control.Distributed.Process.Internal.Types

Binary Message Source # 
Instance details

Defined in Control.Distributed.Process.Internal.Types

Methods

put :: Message -> Put #

get :: Get Message #

putList :: [Message] -> Put #

NFData Message Source # 
Instance details

Defined in Control.Distributed.Process.Internal.Types

Methods

rnf :: Message -> () #

matchMessage :: (Message -> Process Message) -> Match Message Source #

Match against any message, regardless of the underlying (contained) type

matchMessageIf :: (Message -> Bool) -> (Message -> Process Message) -> Match Message Source #

Match against any message (regardless of underlying type) that satisfies a predicate

isEncoded :: Message -> Bool Source #

internal use only.

wrapMessage :: Serializable a => a -> Message Source #

Wrap a Serializable value in a Message. Note that Messages are Serializable - like the datum they contain - but also note, deserialising such a Message will yield a Message, not the type within it! To obtain the wrapped datum, use unwrapMessage or handleMessage with a specific type.

do
   self <- getSelfPid
   send self (wrapMessage "blah")
   Nothing  <- expectTimeout 1000000 :: Process (Maybe String)
   (Just m) <- expectTimeout 1000000 :: Process (Maybe Message)
   (Just "blah") <- unwrapMessage m :: Process (Maybe String)

unwrapMessage :: (Monad m, Serializable a) => Message -> m (Maybe a) Source #

Attempt to unwrap a raw Message. If the type of the decoded message payload matches the expected type, the value will be returned with Just, otherwise Nothing indicates the types do not match.

This expression, for example, will evaluate to Nothing > unwrapMessage (wrapMessage "foobar") :: Process (Maybe Int)

Whereas this expression, will yield Just "foo" > unwrapMessage (wrapMessage "foo") :: Process (Maybe String)

handleMessage :: (Monad m, Serializable a) => Message -> (a -> m b) -> m (Maybe b) Source #

Attempt to handle a raw Message. If the type of the message matches the type of the first argument to the supplied expression, then the message will be decoded and the expression evaluated against its value. If this runtime type checking fails however, Nothing will be returned to indicate the fact. If the check succeeds and evaluation proceeds, the resulting value with be wrapped with Just.

Intended for use in catchesExit and matchAny primitives.

handleMessageIf :: (Monad m, Serializable a) => Message -> (a -> Bool) -> (a -> m b) -> m (Maybe b) Source #

Conditionally handle a raw Message. If the predicate (a -> Bool) evaluates to True, invokes the supplied handler, other returns Nothing to indicate failure. See handleMessage for further information about runtime type checking.

handleMessage_ :: (Monad m, Serializable a) => Message -> (a -> m ()) -> m () Source #

As handleMessage but ignores result, which is useful if you don't care whether or not the handler succeeded.

handleMessageIf_ :: (Monad m, Serializable a) => Message -> (a -> Bool) -> (a -> m ()) -> m () Source #

Conditional version of handleMessage_.

forward :: Message -> ProcessId -> Process () Source #

Forward a raw Message to the given ProcessId.

uforward :: Message -> ProcessId -> Process () Source #

Forward a raw Message to the given ProcessId.

Unlike forward, this function is insensitive to reconnect. It will try to send the message regardless of the history of connection failures between the nodes.

delegate :: ProcessId -> (Message -> Bool) -> Process () Source #

Receives messages and forwards them to pid if p msg == True.

relay :: ProcessId -> Process () Source #

A straight relay that forwards all messages to the supplied pid.

proxy :: Serializable a => ProcessId -> (a -> Process Bool) -> Process () Source #

Proxies pid and forwards messages whenever proc evaluates to True. Unlike delegate the predicate proc runs in the Process monad, allowing for richer proxy behaviour. If proc returns False or the runtime type check fails, no action is taken and the proxy process will continue running.

Process management

spawn :: NodeId -> Closure (Process ()) -> Process ProcessId Source #

Spawn a process

For more information about Closure, see Control.Distributed.Process.Closure.

See also call.

call :: Serializable a => Static (SerializableDict a) -> NodeId -> Closure (Process a) -> Process a Source #

Run a process remotely and wait for it to reply

We monitor the remote process: if it dies before it can send a reply, we die too.

For more information about Static, SerializableDict, and Closure, see Control.Distributed.Process.Closure.

See also spawn.

terminate :: Process a Source #

Terminate immediately (throws a ProcessTerminationException)

die :: Serializable a => a -> Process b Source #

Die immediately - throws a ProcessExitException with the given reason.

kill :: ProcessId -> String -> Process () Source #

Forceful request to kill a process. Where exit provides an exception that can be caught and handled, kill throws an unexposed exception type which cannot be handled explicitly (by type).

exit :: Serializable a => ProcessId -> a -> Process () Source #

Graceful request to exit a process. Throws ProcessExitException with the supplied reason encoded as a message. Any exit signal raised in this manner can be handled using the catchExit family of functions.

catchExit :: (Show a, Serializable a) => Process b -> (ProcessId -> a -> Process b) -> Process b Source #

Catches ProcessExitException. The handler will not be applied unless its type matches the encoded data stored in the exception (see the reason argument given to the exit primitive). If the handler cannot be applied, the exception will be re-thrown.

To handle ProcessExitException without regard for reason, see catch. To handle multiple reasons of differing types, see catchesExit.

catchesExit :: Process b -> [ProcessId -> Message -> Process (Maybe b)] -> Process b Source #

Lift catches (almost).

As ProcessExitException stores the exit reason as a typed, encoded message, a handler must accept inputs of the expected type. In order to handle a list of potentially different handlers (and therefore input types), a handler passed to catchesExit must accept Message and return Maybe (i.e., Just p if it handled the exit reason, otherwise Nothing).

See maybeHandleMessage and Message for more details.

data ProcessRegistrationException Source #

Exception thrown when a process attempts to register a process under an already-registered name or to unregister a name that hasn't been registered. Returns the name and the identifier of the process that owns it, if any.

data SpawnRef Source #

SpawnRef are used to return pids of spawned processes

getSelfPid :: Process ProcessId Source #

Our own process ID

getSelfNode :: Process NodeId Source #

Get the node ID of our local node

getProcessInfo :: ProcessId -> Process (Maybe ProcessInfo) Source #

Get information about the specified process

getNodeStats :: NodeId -> Process (Either DiedReason NodeStats) Source #

Get statistics about the specified node

getLocalNodeStats :: Process NodeStats Source #

Get statistics about our local node

Monitoring and linking

link :: ProcessId -> Process () Source #

Link to a remote process (asynchronous)

When process A links to process B (that is, process A calls link pidB) then an asynchronous exception will be thrown to process A when process B terminates (normally or abnormally), or when process A gets disconnected from process B. Although it is technically possible to catch these exceptions, chances are if you find yourself trying to do so you should probably be using monitor rather than link. In particular, code such as

link pidB   -- Link to process B
expect      -- Wait for a message from process B
unlink pidB -- Unlink again

doesn't quite do what one might expect: if process B sends a message to process A, and subsequently terminates, then process A might or might not be terminated too, depending on whether the exception is thrown before or after the unlink (i.e., this code has a race condition).

Linking is all-or-nothing: A is either linked to B, or it's not. A second call to link has no effect.

Note that link provides unidirectional linking (see spawnSupervised). Linking makes no distinction between normal and abnormal termination of the remote process.

linkNode :: NodeId -> Process () Source #

Link to a node (asynchronous)

linkPort :: SendPort a -> Process () Source #

Link to a channel (asynchronous)

unlink :: ProcessId -> Process () Source #

Remove a link

This is synchronous in the sense that once it returns you are guaranteed that no exception will be raised if the remote process dies. However, it is asynchronous in the sense that we do not wait for a response from the remote node.

unlinkNode :: NodeId -> Process () Source #

Remove a node link

This has the same synchronous/asynchronous nature as unlink.

unlinkPort :: SendPort a -> Process () Source #

Remove a channel (send port) link

This has the same synchronous/asynchronous nature as unlink.

monitor :: ProcessId -> Process MonitorRef Source #

Monitor another process (asynchronous)

When process A monitors process B (that is, process A calls monitor pidB) then process A will receive a ProcessMonitorNotification when process B terminates (normally or abnormally), or when process A gets disconnected from process B. You receive this message like any other (using expect); the notification includes a reason (DiedNormal, DiedException, DiedDisconnect, etc.).

Every call to monitor returns a new monitor reference MonitorRef; if multiple monitors are set up, multiple notifications will be delivered and monitors can be disabled individually using unmonitor.

monitorNode :: NodeId -> Process MonitorRef Source #

Monitor a node (asynchronous)

monitorPort :: Serializable a => SendPort a -> Process MonitorRef Source #

Monitor a typed channel (asynchronous)

unmonitor :: MonitorRef -> Process () Source #

Remove a monitor

This has the same synchronous/asynchronous nature as unlink.

ProcessMonitorNotification messages for the given MonitorRef are removed from the mailbox.

withMonitor :: ProcessId -> (MonitorRef -> Process a) -> Process a Source #

Establishes temporary monitoring of another process.

withMonitor pid code sets up monitoring of pid for the duration of code. Note: although monitoring is no longer active when withMonitor returns, there might still be unreceived monitor messages in the queue.

withMonitor_ :: ProcessId -> Process a -> Process a Source #

Establishes temporary monitoring of another process.

withMonitor_ pid code sets up monitoring of pid for the duration of code. Note: although monitoring is no longer active when withMonitor_ returns, there might still be unreceived monitor messages in the queue.

Since 0.6.1

data MonitorRef Source #

MonitorRef is opaque for regular Cloud Haskell processes

Instances

Instances details
Generic MonitorRef Source # 
Instance details

Defined in Control.Distributed.Process.Internal.Types

Associated Types

type Rep MonitorRef 
Instance details

Defined in Control.Distributed.Process.Internal.Types

type Rep MonitorRef = D1 ('MetaData "MonitorRef" "Control.Distributed.Process.Internal.Types" "distributed-process-0.7.5-14hdXLMmdeU5goWgtdXS4F" 'False) (C1 ('MetaCons "MonitorRef" 'PrefixI 'True) (S1 ('MetaSel ('Just "monitorRefIdent") 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 Identifier) :*: S1 ('MetaSel ('Just "monitorRefCounter") 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 Int32)))
Show MonitorRef Source # 
Instance details

Defined in Control.Distributed.Process.Internal.Types

Binary MonitorRef Source # 
Instance details

Defined in Control.Distributed.Process.Internal.Types

NFData MonitorRef Source # 
Instance details

Defined in Control.Distributed.Process.Internal.Types

Methods

rnf :: MonitorRef -> () #

Eq MonitorRef Source # 
Instance details

Defined in Control.Distributed.Process.Internal.Types

Ord MonitorRef Source # 
Instance details

Defined in Control.Distributed.Process.Internal.Types

Hashable MonitorRef Source # 
Instance details

Defined in Control.Distributed.Process.Internal.Types

type Rep MonitorRef Source # 
Instance details

Defined in Control.Distributed.Process.Internal.Types

type Rep MonitorRef = D1 ('MetaData "MonitorRef" "Control.Distributed.Process.Internal.Types" "distributed-process-0.7.5-14hdXLMmdeU5goWgtdXS4F" 'False) (C1 ('MetaCons "MonitorRef" 'PrefixI 'True) (S1 ('MetaSel ('Just "monitorRefIdent") 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 Identifier) :*: S1 ('MetaSel ('Just "monitorRefCounter") 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 Int32)))

data DiedReason Source #

Why did a process die?

Constructors

DiedNormal

Normal termination

DiedException !String

The process exited with an exception (provided as String because Exception does not implement Binary)

DiedDisconnect

We got disconnected from the process node

DiedNodeDown

The process node died

DiedUnknownId

Invalid (processnodechannel) identifier

Closures

data Closure a #

A closure is a static value and an encoded environment

Instances

Instances details
Show (Closure a) 
Instance details

Defined in Control.Distributed.Static

Methods

showsPrec :: Int -> Closure a -> ShowS #

show :: Closure a -> String #

showList :: [Closure a] -> ShowS #

Binary (Closure a) 
Instance details

Defined in Control.Distributed.Static

Methods

put :: Closure a -> Put #

get :: Get (Closure a) #

putList :: [Closure a] -> Put #

NFData (Closure a) 
Instance details

Defined in Control.Distributed.Static

Methods

rnf :: Closure a -> () #

Eq (Closure a) 
Instance details

Defined in Control.Distributed.Static

Methods

(==) :: Closure a -> Closure a -> Bool #

(/=) :: Closure a -> Closure a -> Bool #

Ord (Closure a) 
Instance details

Defined in Control.Distributed.Static

Methods

compare :: Closure a -> Closure a -> Ordering #

(<) :: Closure a -> Closure a -> Bool #

(<=) :: Closure a -> Closure a -> Bool #

(>) :: Closure a -> Closure a -> Bool #

(>=) :: Closure a -> Closure a -> Bool #

max :: Closure a -> Closure a -> Closure a #

min :: Closure a -> Closure a -> Closure a #

closure #

Arguments

:: Static (ByteString -> a)

Decoder

-> ByteString

Encoded closure environment

-> Closure a 

data Static a #

A static value. Static is opaque; see staticLabel and staticApply.

Instances

Instances details
Show (Static a) 
Instance details

Defined in Control.Distributed.Static

Methods

showsPrec :: Int -> Static a -> ShowS #

show :: Static a -> String #

showList :: [Static a] -> ShowS #

Binary (Static a) 
Instance details

Defined in Control.Distributed.Static

Methods

put :: Static a -> Put #

get :: Get (Static a) #

putList :: [Static a] -> Put #

NFData (Static a) 
Instance details

Defined in Control.Distributed.Static

Methods

rnf :: Static a -> () #

Eq (Static a) 
Instance details

Defined in Control.Distributed.Static

Methods

(==) :: Static a -> Static a -> Bool #

(/=) :: Static a -> Static a -> Bool #

Ord (Static a) 
Instance details

Defined in Control.Distributed.Static

Methods

compare :: Static a -> Static a -> Ordering #

(<) :: Static a -> Static a -> Bool #

(<=) :: Static a -> Static a -> Bool #

(>) :: Static a -> Static a -> Bool #

(>=) :: Static a -> Static a -> Bool #

max :: Static a -> Static a -> Static a #

min :: Static a -> Static a -> Static a #

unStatic :: Typeable a => Static a -> Process a Source #

Resolve a static value

unClosure :: Typeable a => Closure a -> Process a Source #

Resolve a closure

data RemoteTable #

Runtime dictionary for unstatic lookups

Logging

say :: String -> Process () Source #

Log a string

say message sends a message of type SayMessage with the current time and ProcessId of the current process to the process registered as logger. By default, this process simply sends the string to stderr. Individual Cloud Haskell backends might replace this with a different logger process, however.

Registry

register :: String -> ProcessId -> Process () Source #

Register a process with the local registry (synchronous). The name must not already be registered. The process need not be on this node. A bad registration will result in a ProcessRegistrationException

The process to be registered does not have to be local itself.

reregister :: String -> ProcessId -> Process () Source #

Like register, but will replace an existing registration. The name must already be registered.

unregister :: String -> Process () Source #

Remove a process from the local registry (asynchronous). This version will wait until a response is gotten from the management process. The name must already be registered.

whereis :: String -> Process (Maybe ProcessId) Source #

Query the local process registry

nsend :: Serializable a => String -> a -> Process () Source #

Named send to a process in the local registry (asynchronous)

registerRemoteAsync :: NodeId -> String -> ProcessId -> Process () Source #

Register a process with a remote registry (asynchronous).

The process to be registered does not have to live on the same remote node. Reply wil come in the form of a RegisterReply message

See comments in whereisRemoteAsync

unregisterRemoteAsync :: NodeId -> String -> Process () Source #

Remove a process from a remote registry (asynchronous).

Reply wil come in the form of a RegisterReply message

See comments in whereisRemoteAsync

whereisRemoteAsync :: NodeId -> String -> Process () Source #

Query a remote process registry (asynchronous)

Reply will come in the form of a WhereIsReply message.

There is currently no synchronous version of whereisRemoteAsync: if you implement one yourself, be sure to take into account that the remote node might die or get disconnect before it can respond (i.e. you should use monitorNode and take appropriate action when you receive a NodeMonitorNotification).

nsendRemote :: Serializable a => NodeId -> String -> a -> Process () Source #

Named send to a process in a remote registry (asynchronous)

data WhereIsReply Source #

(Asynchronous) reply from whereis

data RegisterReply Source #

(Asynchronous) reply from register and unregister

Exception handling

catch :: Exception e => Process a -> (e -> Process a) -> Process a Source #

Deprecated: Use Control.Monad.Catch.catch instead

Lift catch

data Handler a Source #

You need this when using catches

Constructors

Exception e => Handler (e -> Process a) 

Instances

Instances details
Functor Handler Source # 
Instance details

Defined in Control.Distributed.Process.Internal.Primitives

Methods

fmap :: (a -> b) -> Handler a -> Handler b #

(<$) :: a -> Handler b -> Handler a #

try :: Exception e => Process a -> Process (Either e a) Source #

Deprecated: Use Control.Monad.Catch.try instead

Lift try

mask :: ((forall a. Process a -> Process a) -> Process b) -> Process b Source #

Deprecated: Use Control.Monad.Catch.mask_ instead

Lift mask

mask_ :: Process a -> Process a Source #

Deprecated: Use Control.Monad.Catch.mask_ instead

Lift mask_

onException :: Process a -> Process b -> Process a Source #

Deprecated: Use Control.Monad.Catch.onException instead

Lift onException

bracket :: Process a -> (a -> Process b) -> (a -> Process c) -> Process c Source #

Deprecated: Use Control.Monad.Catch.bracket instead

Lift bracket

bracket_ :: Process a -> Process b -> Process c -> Process c Source #

Deprecated: Use Control.Monad.Catch.bracket_ instead

Lift bracket_

finally :: Process a -> Process b -> Process a Source #

Deprecated: Use Control.Monad.Catch.finally instead

Lift finally

Auxiliary API

spawnAsync :: NodeId -> Closure (Process ()) -> Process SpawnRef Source #

Asynchronous version of spawn

(spawn is defined in terms of spawnAsync and expect)

spawnSupervised :: NodeId -> Closure (Process ()) -> Process (ProcessId, MonitorRef) Source #

Spawn a child process, have the child link to the parent and the parent monitor the child

spawnLink :: NodeId -> Closure (Process ()) -> Process ProcessId Source #

Spawn a process and link to it

Note that this is just the sequential composition of spawn and link. (The Unified semantics that underlies Cloud Haskell does not even support a synchronous link operation)

spawnMonitor :: NodeId -> Closure (Process ()) -> Process (ProcessId, MonitorRef) Source #

Like spawnLink, but monitor the spawned process

spawnChannel :: Serializable a => Static (SerializableDict a) -> NodeId -> Closure (ReceivePort a -> Process ()) -> Process (SendPort a) Source #

Spawn a new process, supplying it with a new ReceivePort and return the corresponding SendPort.

data DidSpawn Source #

(Asynchronius) reply from spawn

Instances

Instances details
Show DidSpawn Source # 
Instance details

Defined in Control.Distributed.Process.Internal.Types

Binary DidSpawn Source # 
Instance details

Defined in Control.Distributed.Process.Internal.Types

Methods

put :: DidSpawn -> Put #

get :: Get DidSpawn #

putList :: [DidSpawn] -> Put #

Local versions of spawn

spawnLocal :: Process () -> Process ProcessId Source #

Spawn a process on the local node

spawnChannelLocal :: Serializable a => (ReceivePort a -> Process ()) -> Process (SendPort a) Source #

Create a new typed channel, spawn a process on the local node, passing it the receive port, and return the send port

callLocal :: Process a -> Process a Source #

Local version of call. Running a process in this way isolates it from messages sent to the caller process, and also allows silently dropping late or duplicate messages sent to the isolated process after it exits. Silently dropping messages may not always be the best approach.

Reconnecting

reconnect :: ProcessId -> Process () Source #

Cloud Haskell provides the illusion of connection-less, reliable, ordered message passing. However, when network connections get disrupted this illusion cannot always be maintained. Once a network connection breaks (even temporarily) no further communication on that connection will be possible. For example, if process A sends a message to process B, and A is then notified (by monitor notification) that it got disconnected from B, A will not be able to send any further messages to B, unless A explicitly indicates that it is acceptable to attempt to reconnect to B using the Cloud Haskell reconnect primitive.

Importantly, when A calls reconnect it acknowledges that some messages to B might have been lost. For instance, if A sends messages m1 and m2 to B, then receives a monitor notification that its connection to B has been lost, calls reconnect and then sends m3, it is possible that B will receive m1 and m3 but not m2.

Note that reconnect does not mean reconnect now but rather /it is okay to attempt to reconnect on the next send/. In particular, if no further communication attempts are made to B then A can use reconnect to clean up its connection to B.

reconnectPort :: SendPort a -> Process () Source #

Reconnect to a sendport. See reconnect for more information.