-- Hoogle documentation, generated by Haddock -- See Hoogle, http://www.haskell.org/hoogle/ -- | Cloud Haskell: Erlang-style concurrency in Haskell -- @package distributed-process @version 0.5.5 module Control.Distributed.Process.Internal.StrictContainerAccessors mapMaybe :: Ord key => key -> Accessor (Map key elem) (Maybe elem) mapDefault :: Ord key => elem -> key -> Accessor (Map key elem) elem -- | Clone of Control.Concurrent.STM.TQueue with support for mkWeakTQueue -- -- Not all functionality from the original module is available: -- unGetTQueue, peekTQueue and tryPeekTQueue are missing. In order to -- implement these we'd need to be able to touch# the write end of the -- queue inside unGetTQueue, but that means we need a version of touch# -- that works within the STM monad. module Control.Distributed.Process.Internal.WeakTQueue -- | TQueue is an abstract type representing an unbounded FIFO -- channel. data TQueue a -- | Build and returns a new instance of TQueue newTQueue :: STM (TQueue a) -- | IO version of newTQueue. This is useful for creating -- top-level TQueues using unsafePerformIO, because using -- atomically inside unsafePerformIO isn't possible. newTQueueIO :: IO (TQueue a) -- | Read the next value from the TQueue. readTQueue :: TQueue a -> STM a -- | A version of readTQueue which does not retry. Instead it -- returns Nothing if no value is available. tryReadTQueue :: TQueue a -> STM (Maybe a) -- | Write a value to a TQueue. writeTQueue :: TQueue a -> a -> STM () -- | Returns True if the supplied TQueue is empty. isEmptyTQueue :: TQueue a -> STM Bool mkWeakTQueue :: TQueue a -> IO () -> IO (Weak (TQueue a)) instance Typeable TQueue instance Eq (TQueue a) -- | Spine and element strict list module Control.Distributed.Process.Internal.StrictList -- | Strict list data StrictList a Cons :: !a -> !(StrictList a) -> StrictList a Nil :: StrictList a Snoc :: !(StrictList a) -> !a -> StrictList a Append :: !(StrictList a) -> !(StrictList a) -> StrictList a append :: StrictList a -> StrictList a -> StrictList a foldr :: (a -> b -> b) -> b -> StrictList a -> b -- | Like Control.Concurrent.MVar.Strict but reduce to HNF, not NF module Control.Distributed.Process.Internal.StrictMVar newtype StrictMVar a StrictMVar :: (MVar a) -> StrictMVar a newEmptyMVar :: IO (StrictMVar a) newMVar :: a -> IO (StrictMVar a) takeMVar :: StrictMVar a -> IO a putMVar :: StrictMVar a -> a -> IO () readMVar :: StrictMVar a -> IO a withMVar :: StrictMVar a -> (a -> IO b) -> IO b modifyMVar_ :: StrictMVar a -> (a -> IO a) -> IO () modifyMVar :: StrictMVar a -> (a -> IO (a, b)) -> IO b modifyMVarMasked :: StrictMVar a -> (a -> IO (a, b)) -> IO b mkWeakMVar :: StrictMVar a -> IO () -> IO (Weak (StrictMVar a)) -- | Concurrent queue for single reader, single writer module Control.Distributed.Process.Internal.CQueue data CQueue a data BlockSpec NonBlocking :: BlockSpec Blocking :: BlockSpec Timeout :: Int -> BlockSpec data MatchOn m a MatchMsg :: (m -> Maybe a) -> MatchOn m a MatchChan :: (STM a) -> MatchOn m a newCQueue :: IO (CQueue a) -- | Enqueue an element -- -- Enqueue is strict. enqueue :: CQueue a -> a -> IO () -- | Variant of enqueue for use in the STM monad. enqueueSTM :: CQueue a -> a -> STM () -- | Dequeue an element -- -- The timeout (if any) is applied only to waiting for incoming messages, -- not to checking messages that have already arrived dequeue :: CQueue m -> BlockSpec -> [MatchOn m a] -> IO (Maybe a) -- | Weak reference to a CQueue mkWeakCQueue :: CQueue a -> IO () -> IO (Weak (CQueue a)) module Control.Distributed.Process.Serializable -- | Objects that can be sent across the network class (Binary a, Typeable a) => Serializable a -- | Encode type representation as a bytestring encodeFingerprint :: Fingerprint -> ByteString -- | Decode a bytestring into a fingerprint. Throws an IO exception on -- failure decodeFingerprint :: ByteString -> Fingerprint -- | The fingerprint of the typeRep of the argument fingerprint :: Typeable a => a -> Fingerprint -- | Size of a fingerprint sizeOfFingerprint :: Int data Fingerprint :: * -- | Show fingerprint (for debugging purposes) showFingerprint :: Fingerprint -> ShowS -- | Reification of Serializable (see -- Control.Distributed.Process.Closure) data SerializableDict a SerializableDict :: SerializableDict a -- | Reification of Typeable. data TypeableDict a TypeableDict :: TypeableDict a instance Typeable TypeableDict instance Typeable SerializableDict instance (Binary a, Typeable a) => Serializable a -- | Types used throughout the Cloud Haskell framework -- -- We collect all types used internally in a single module because many -- of these data types are mutually recursive and cannot be split across -- modules. module Control.Distributed.Process.Internal.Types -- | Node identifier newtype NodeId NodeId :: EndPointAddress -> NodeId nodeAddress :: NodeId -> EndPointAddress -- | A local process ID consists of a seed which distinguishes processes -- from different instances of the same local node and a counter data LocalProcessId LocalProcessId :: {-# UNPACK #-} !Int32 -> {-# UNPACK #-} !Int32 -> LocalProcessId lpidUnique :: LocalProcessId -> {-# UNPACK #-} !Int32 lpidCounter :: LocalProcessId -> {-# UNPACK #-} !Int32 -- | Process identifier data ProcessId ProcessId :: !NodeId -> {-# UNPACK #-} !LocalProcessId -> ProcessId -- | The ID of the node the process is running on processNodeId :: ProcessId -> !NodeId -- | Node-local identifier for the process processLocalId :: ProcessId -> {-# UNPACK #-} !LocalProcessId -- | Union of all kinds of identifiers data Identifier NodeIdentifier :: !NodeId -> Identifier ProcessIdentifier :: !ProcessId -> Identifier SendPortIdentifier :: !SendPortId -> Identifier nodeOf :: Identifier -> NodeId firstNonReservedProcessId :: Int32 nullProcessId :: NodeId -> ProcessId -- | Local nodes data LocalNode LocalNode :: !NodeId -> !EndPoint -> !(StrictMVar LocalNodeState) -> !(Chan NCMsg) -> !MxEventBus -> !RemoteTable -> LocalNode -- | NodeId of the node localNodeId :: LocalNode -> !NodeId -- | The network endpoint associated with this node localEndPoint :: LocalNode -> !EndPoint -- | Local node state localState :: LocalNode -> !(StrictMVar LocalNodeState) -- | Channel for the node controller localCtrlChan :: LocalNode -> !(Chan NCMsg) -- | Internal management event bus localEventBus :: LocalNode -> !MxEventBus -- | Runtime lookup table for supporting closures TODO: this should be part -- of the CH state, not the local endpoint state remoteTable :: LocalNode -> !RemoteTable -- | Provides access to the trace controller data Tracer Tracer :: !ProcessId -> !(Weak (CQueue Message)) -> Tracer -- | Process id for the currently active trace handler tracerPid :: Tracer -> !ProcessId -- | Weak reference to the tracer controller's mailbox weakQ :: Tracer -> !(Weak (CQueue Message)) -- | Local system management event bus state data MxEventBus MxEventBusInitialising :: MxEventBus MxEventBus :: !ProcessId -> !Tracer -> !(Weak (CQueue Message)) -> !(((TChan Message, TChan Message) -> Process ()) -> IO ProcessId) -> MxEventBus -- | Process id of the management agent controller process agent :: MxEventBus -> !ProcessId -- | Configuration for the local trace controller tracer :: MxEventBus -> !Tracer -- | Weak reference to the management agent controller's mailbox evbuss :: MxEventBus -> !(Weak (CQueue Message)) -- | API for adding management agents to a running node mxNew :: MxEventBus -> !(((TChan Message, TChan Message) -> Process ()) -> IO ProcessId) -- | Local node state data LocalNodeState LocalNodeState :: !(Map LocalProcessId LocalProcess) -> !Int32 -> !Int32 -> !(Map (Identifier, Identifier) (Connection, ImplicitReconnect)) -> LocalNodeState -- | Processes running on this node _localProcesses :: LocalNodeState -> !(Map LocalProcessId LocalProcess) -- | Counter to assign PIDs _localPidCounter :: LocalNodeState -> !Int32 -- | The unique value used to create PIDs (so that processes on -- restarted nodes have new PIDs) _localPidUnique :: LocalNodeState -> !Int32 -- | Outgoing connections _localConnections :: LocalNodeState -> !(Map (Identifier, Identifier) (Connection, ImplicitReconnect)) -- | Processes running on our local node data LocalProcess LocalProcess :: !(CQueue Message) -> !(Weak (CQueue Message)) -> !ProcessId -> !(StrictMVar LocalProcessState) -> !ThreadId -> !LocalNode -> LocalProcess processQueue :: LocalProcess -> !(CQueue Message) processWeakQ :: LocalProcess -> !(Weak (CQueue Message)) processId :: LocalProcess -> !ProcessId processState :: LocalProcess -> !(StrictMVar LocalProcessState) processThread :: LocalProcess -> !ThreadId processNode :: LocalProcess -> !LocalNode -- | Local process state data LocalProcessState LocalProcessState :: !Int32 -> !Int32 -> !Int32 -> !(Map LocalSendPortId TypedChannel) -> LocalProcessState _monitorCounter :: LocalProcessState -> !Int32 _spawnCounter :: LocalProcessState -> !Int32 _channelCounter :: LocalProcessState -> !Int32 _typedChannels :: LocalProcessState -> !(Map LocalSendPortId TypedChannel) -- | The Cloud Haskell Process type newtype Process a Process :: ReaderT LocalProcess IO a -> Process a unProcess :: Process a -> ReaderT LocalProcess IO a -- | Deconstructor for Process (not exported to the public API) runLocalProcess :: LocalProcess -> Process a -> IO a data ImplicitReconnect WithImplicitReconnect :: ImplicitReconnect NoImplicitReconnect :: ImplicitReconnect type LocalSendPortId = Int32 -- | A send port is identified by a SendPortId. -- -- You cannot send directly to a SendPortId; instead, use -- newChan to create a SendPort. data SendPortId SendPortId :: {-# UNPACK #-} !ProcessId -> {-# UNPACK #-} !LocalSendPortId -> SendPortId -- | The ID of the process that will receive messages sent on this port sendPortProcessId :: SendPortId -> {-# UNPACK #-} !ProcessId -- | Process-local ID of the channel sendPortLocalId :: SendPortId -> {-# UNPACK #-} !LocalSendPortId data TypedChannel TypedChannel :: (Weak (TQueue a)) -> TypedChannel -- | The send send of a typed channel (serializable) newtype SendPort a SendPort :: SendPortId -> SendPort a -- | The (unique) ID of this send port sendPortId :: SendPort a -> SendPortId -- | The receive end of a typed channel (not serializable) -- -- Note that ReceivePort implements Functor, -- Applicative, Alternative and Monad. This is -- especially useful when merging receive ports. newtype ReceivePort a ReceivePort :: STM a -> ReceivePort a receiveSTM :: ReceivePort a -> STM a -- | Messages consist of their typeRep fingerprint and their encoding data Message EncodedMessage :: !Fingerprint -> !ByteString -> Message messageFingerprint :: Message -> !Fingerprint messageEncoding :: Message -> !ByteString UnencodedMessage :: !Fingerprint -> !a -> Message messageFingerprint :: Message -> !Fingerprint messagePayload :: Message -> !a -- | internal use only. isEncoded :: Message -> Bool -- | Turn any serialiable term into a message createMessage :: Serializable a => a -> Message -- | Turn any serializable term into an unencoded/local message createUnencodedMessage :: Serializable a => a -> Message -- | Turn any serializable term into an unencodede/local message, without -- evalutaing it! This is a dangerous business. unsafeCreateUnencodedMessage :: Serializable a => a -> Message -- | Serialize a message messageToPayload :: Message -> [ByteString] -- | Deserialize a message payloadToMessage :: [ByteString] -> Message -- | MonitorRef is opaque for regular Cloud Haskell processes data MonitorRef MonitorRef :: !Identifier -> !Int32 -> MonitorRef -- | ID of the entity to be monitored monitorRefIdent :: MonitorRef -> !Identifier -- | Unique to distinguish multiple monitor requests by the same process monitorRefCounter :: MonitorRef -> !Int32 -- | Message sent by process monitors data ProcessMonitorNotification ProcessMonitorNotification :: !MonitorRef -> !ProcessId -> !DiedReason -> ProcessMonitorNotification -- | Message sent by node monitors data NodeMonitorNotification NodeMonitorNotification :: !MonitorRef -> !NodeId -> !DiedReason -> NodeMonitorNotification -- | Message sent by channel (port) monitors data PortMonitorNotification PortMonitorNotification :: !MonitorRef -> !SendPortId -> !DiedReason -> PortMonitorNotification -- | Internal exception thrown indirectly by exit data ProcessExitException ProcessExitException :: !ProcessId -> !Message -> ProcessExitException -- | Exceptions thrown when a linked process dies data ProcessLinkException ProcessLinkException :: !ProcessId -> !DiedReason -> ProcessLinkException -- | Exception thrown when a linked node dies data NodeLinkException NodeLinkException :: !NodeId -> !DiedReason -> NodeLinkException -- | Exception thrown when a linked channel (port) dies data PortLinkException PortLinkException :: !SendPortId -> !DiedReason -> PortLinkException -- | Exception thrown when a process attempts to register a process under -- an already-registered name or to unregister a name that hasn't been -- registered data ProcessRegistrationException ProcessRegistrationException :: !String -> ProcessRegistrationException -- | Why did a process die? data DiedReason -- | Normal termination DiedNormal :: DiedReason -- | The process exited with an exception (provided as String -- because Exception does not implement Binary) DiedException :: !String -> DiedReason -- | We got disconnected from the process node DiedDisconnect :: DiedReason -- | The process node died DiedNodeDown :: DiedReason -- | Invalid (processnodechannel) identifier DiedUnknownId :: DiedReason -- | (Asynchronous) reply from unmonitor newtype DidUnmonitor DidUnmonitor :: MonitorRef -> DidUnmonitor -- | (Asynchronous) reply from unlink newtype DidUnlinkProcess DidUnlinkProcess :: ProcessId -> DidUnlinkProcess -- | (Asynchronous) reply from unlinkNode newtype DidUnlinkNode DidUnlinkNode :: NodeId -> DidUnlinkNode -- | (Asynchronous) reply from unlinkPort newtype DidUnlinkPort DidUnlinkPort :: SendPortId -> DidUnlinkPort -- | SpawnRef are used to return pids of spawned processes newtype SpawnRef SpawnRef :: Int32 -> SpawnRef -- | (Asynchronius) reply from spawn data DidSpawn DidSpawn :: SpawnRef -> ProcessId -> DidSpawn -- | (Asynchronous) reply from whereis data WhereIsReply WhereIsReply :: String -> (Maybe ProcessId) -> WhereIsReply -- | (Asynchronous) reply from register and unregister data RegisterReply RegisterReply :: String -> Bool -> RegisterReply -- | Provide information about a running process data ProcessInfo ProcessInfo :: NodeId -> [String] -> Maybe Int -> [(ProcessId, MonitorRef)] -> [ProcessId] -> ProcessInfo infoNode :: ProcessInfo -> NodeId infoRegisteredNames :: ProcessInfo -> [String] infoMessageQueueLength :: ProcessInfo -> Maybe Int infoMonitors :: ProcessInfo -> [(ProcessId, MonitorRef)] infoLinks :: ProcessInfo -> [ProcessId] data ProcessInfoNone ProcessInfoNone :: DiedReason -> ProcessInfoNone data NodeStats NodeStats :: NodeId -> Int -> Int -> Int -> Int -> NodeStats nodeStatsNode :: NodeStats -> NodeId nodeStatsRegisteredNames :: NodeStats -> Int nodeStatsMonitors :: NodeStats -> Int nodeStatsLinks :: NodeStats -> Int nodeStatsProcesses :: NodeStats -> Int -- | Messages to the node controller data NCMsg NCMsg :: !Identifier -> !ProcessSignal -> NCMsg ctrlMsgSender :: NCMsg -> !Identifier ctrlMsgSignal :: NCMsg -> !ProcessSignal -- | Signals to the node controller (see NCMsg) data ProcessSignal Link :: !Identifier -> ProcessSignal Unlink :: !Identifier -> ProcessSignal Monitor :: !MonitorRef -> ProcessSignal Unmonitor :: !MonitorRef -> ProcessSignal Died :: Identifier -> !DiedReason -> ProcessSignal Spawn :: !(Closure (Process ())) -> !SpawnRef -> ProcessSignal WhereIs :: !String -> ProcessSignal Register :: !String -> !NodeId -> !(Maybe ProcessId) -> !Bool -> ProcessSignal NamedSend :: !String -> !Message -> ProcessSignal LocalSend :: !ProcessId -> !Message -> ProcessSignal LocalPortSend :: !SendPortId -> !Message -> ProcessSignal Kill :: !ProcessId -> !String -> ProcessSignal Exit :: !ProcessId -> !Message -> ProcessSignal GetInfo :: !ProcessId -> ProcessSignal SigShutdown :: ProcessSignal GetNodeStats :: !NodeId -> ProcessSignal localProcesses :: Accessor LocalNodeState (Map LocalProcessId LocalProcess) localPidCounter :: Accessor LocalNodeState Int32 localPidUnique :: Accessor LocalNodeState Int32 localConnections :: Accessor LocalNodeState (Map (Identifier, Identifier) (Connection, ImplicitReconnect)) localProcessWithId :: LocalProcessId -> Accessor LocalNodeState (Maybe LocalProcess) localConnectionBetween :: Identifier -> Identifier -> Accessor LocalNodeState (Maybe (Connection, ImplicitReconnect)) monitorCounter :: Accessor LocalProcessState Int32 spawnCounter :: Accessor LocalProcessState Int32 channelCounter :: Accessor LocalProcessState LocalSendPortId typedChannels :: Accessor LocalProcessState (Map LocalSendPortId TypedChannel) typedChannelWithId :: LocalSendPortId -> Accessor LocalProcessState (Maybe TypedChannel) forever' :: Monad m => m a -> m b instance Typeable NodeId instance Typeable LocalProcessId instance Typeable ProcessId instance Typeable SendPortId instance Typeable SendPort instance Typeable ReceivePort instance Typeable Message instance Typeable MonitorRef instance Typeable ProcessRegistrationException instance Typeable ProcessExitException instance Typeable PortLinkException instance Typeable NodeLinkException instance Typeable ProcessLinkException instance Typeable PortMonitorNotification instance Typeable NodeMonitorNotification instance Typeable ProcessMonitorNotification instance Typeable DidUnmonitor instance Typeable DidUnlinkProcess instance Typeable DidUnlinkNode instance Typeable DidUnlinkPort instance Typeable SpawnRef instance Typeable DidSpawn instance Typeable WhereIsReply instance Typeable RegisterReply instance Typeable NodeStats instance Typeable ProcessInfo instance Typeable ProcessInfoNone instance Typeable Process instance Eq NodeId instance Ord NodeId instance Data NodeId instance Generic NodeId instance Eq LocalProcessId instance Ord LocalProcessId instance Data LocalProcessId instance Generic LocalProcessId instance Show LocalProcessId instance Eq ProcessId instance Ord ProcessId instance Data ProcessId instance Generic ProcessId instance Eq ImplicitReconnect instance Show ImplicitReconnect instance Eq SendPortId instance Ord SendPortId instance Generic SendPortId instance Eq Identifier instance Ord Identifier instance Generic Identifier instance Generic (SendPort a) instance Show (SendPort a) instance Eq (SendPort a) instance Ord (SendPort a) instance Functor ReceivePort instance Applicative ReceivePort instance Alternative ReceivePort instance Monad ReceivePort instance Eq MonitorRef instance Ord MonitorRef instance Show MonitorRef instance Generic MonitorRef instance Show ProcessRegistrationException instance Show DiedReason instance Eq DiedReason instance Show PortLinkException instance Show NodeLinkException instance Show ProcessLinkException instance Show PortMonitorNotification instance Show NodeMonitorNotification instance Show ProcessMonitorNotification instance Binary DidUnmonitor instance Binary DidUnlinkProcess instance Binary DidUnlinkNode instance Binary DidUnlinkPort instance Show SpawnRef instance Binary SpawnRef instance Eq SpawnRef instance Show DidSpawn instance Show WhereIsReply instance Show RegisterReply instance Show NodeStats instance Eq NodeStats instance Show ProcessInfo instance Eq ProcessInfo instance Show ProcessInfoNone instance Show ProcessSignal instance Functor Process instance Monad Process instance MonadIO Process instance MonadReader LocalProcess Process instance Applicative Process instance Show NCMsg instance Datatype D1NodeId instance Constructor C1_0NodeId instance Selector S1_0_0NodeId instance Datatype D1LocalProcessId instance Constructor C1_0LocalProcessId instance Selector S1_0_0LocalProcessId instance Selector S1_0_1LocalProcessId instance Datatype D1ProcessId instance Constructor C1_0ProcessId instance Selector S1_0_0ProcessId instance Selector S1_0_1ProcessId instance Datatype D1SendPortId instance Constructor C1_0SendPortId instance Selector S1_0_0SendPortId instance Selector S1_0_1SendPortId instance Datatype D1Identifier instance Constructor C1_0Identifier instance Constructor C1_1Identifier instance Constructor C1_2Identifier instance Datatype D1SendPort instance Constructor C1_0SendPort instance Selector S1_0_0SendPort instance Datatype D1MonitorRef instance Constructor C1_0MonitorRef instance Selector S1_0_0MonitorRef instance Selector S1_0_1MonitorRef instance Binary ProcessInfoNone instance Binary NodeStats instance Binary ProcessInfo instance Binary RegisterReply instance Binary WhereIsReply instance Binary Identifier instance Binary SendPortId instance Binary DidSpawn instance Binary DiedReason instance Binary ProcessSignal instance Binary MonitorRef instance Binary NCMsg instance Binary PortMonitorNotification instance Binary NodeMonitorNotification instance Binary ProcessMonitorNotification instance Binary LocalProcessId instance Binary Message instance NFData DiedReason instance Exception ProcessRegistrationException instance Exception PortLinkException instance Exception NodeLinkException instance Exception ProcessLinkException instance Show ProcessExitException instance Exception ProcessExitException instance NFData MonitorRef instance Hashable MonitorRef instance Show Message instance NFData Message instance NFData a => NFData (SendPort a) instance Hashable a => Hashable (SendPort a) instance Serializable a => Binary (SendPort a) instance NFData SendPortId instance Show SendPortId instance Hashable SendPortId instance Show Identifier instance NFData Identifier instance Hashable Identifier instance Show ProcessId instance Hashable ProcessId instance NFData ProcessId instance Binary ProcessId instance Hashable LocalProcessId instance Show NodeId instance Hashable NodeId instance NFData NodeId instance Binary NodeId module Control.Distributed.Process.Management.Internal.Types -- | A newtype wrapper for an agent id (which is a string). newtype MxAgentId MxAgentId :: String -> MxAgentId agentId :: MxAgentId -> String data MxTableId MxForAgent :: !MxAgentId -> MxTableId MxForPid :: !ProcessId -> MxTableId data MxAgentState s MxAgentState :: !MxAgentId -> !(TChan Message) -> !ProcessId -> !s -> MxAgentState s mxAgentId :: MxAgentState s -> !MxAgentId mxBus :: MxAgentState s -> !(TChan Message) mxSharedTable :: MxAgentState s -> !ProcessId mxLocalState :: MxAgentState s -> !s -- | Monad for management agents. newtype MxAgent s a MxAgent :: StateT (MxAgentState s) Process a -> MxAgent s a unAgent :: MxAgent s a -> StateT (MxAgentState s) Process a -- | Represents the actions a management agent can take when evaluating an -- event sink. data MxAction MxAgentDeactivate :: !String -> MxAction MxAgentPrioritise :: !ChannelSelector -> MxAction MxAgentReady :: MxAction MxAgentSkip :: MxAction data ChannelSelector InputChan :: ChannelSelector Mailbox :: ChannelSelector data MxAgentStart MxAgentStart :: SendPort ProcessId -> MxAgentId -> MxAgentStart mxAgentTableChan :: MxAgentStart -> SendPort ProcessId mxAgentIdStart :: MxAgentStart -> MxAgentId -- | Gross though it is, this synonym represents a function used to forking -- new processes, which has to be passed as a HOF when calling -- mxAgentController, since there's no other way to avoid a circular -- dependency with Node.hs type Fork = Process () -> IO ProcessId -- | Type of a management agent's event sink. type MxSink s = Message -> MxAgent s (Maybe MxAction) -- | This is the default management event, fired for various -- internal events around the NT connection and Process lifecycle. All -- published events that conform to this type, are eligible for tracing - -- i.e., they will be delivered to the trace controller. data MxEvent -- | fired whenever a local process is spawned MxSpawned :: ProcessId -> MxEvent -- | fired whenever a process/name is registered (locally) MxRegistered :: ProcessId -> String -> MxEvent -- | fired whenever a process/name is unregistered (locally) MxUnRegistered :: ProcessId -> String -> MxEvent -- | fired whenever a process dies MxProcessDied :: ProcessId -> DiedReason -> MxEvent -- | fired whenever a node dies (i.e., the connection is -- broken/disconnected) MxNodeDied :: NodeId -> DiedReason -> MxEvent -- | fired whenever a message is sent from a local process MxSent :: ProcessId -> ProcessId -> Message -> MxEvent -- | fired whenever a message is received by a local process MxReceived :: ProcessId -> Message -> MxEvent -- | fired when a network-transport connection is first established MxConnected :: ConnectionId -> EndPointAddress -> MxEvent -- | fired when a network-transport connection is broken/disconnected MxDisconnected :: ConnectionId -> EndPointAddress -> MxEvent -- | a user defined trace event MxUser :: Message -> MxEvent -- | a logging event - used for debugging purposes only MxLog :: String -> MxEvent -- | notifies a trace listener that all subsequent traces will be sent to -- pid MxTraceTakeover :: ProcessId -> MxEvent -- | notifies a trace listener that it has been disabled/removed MxTraceDisable :: MxEvent -- | The class of things that we might be able to resolve to a -- ProcessId (or not). class Addressable a resolveToPid :: Addressable a => a -> Maybe ProcessId instance Typeable MxEvent instance Typeable MxAgentId instance Typeable MxTableId instance Typeable MxAgent instance Typeable MxAgentStart instance Generic MxEvent instance Show MxEvent instance Binary MxAgentId instance Eq MxAgentId instance Ord MxAgentId instance Generic MxTableId instance Functor (MxAgent s) instance Monad (MxAgent s) instance MonadIO (MxAgent s) instance MonadState (MxAgentState s) (MxAgent s) instance Applicative (MxAgent s) instance Generic MxAgentStart instance Datatype D1MxEvent instance Constructor C1_0MxEvent instance Constructor C1_1MxEvent instance Constructor C1_2MxEvent instance Constructor C1_3MxEvent instance Constructor C1_4MxEvent instance Constructor C1_5MxEvent instance Constructor C1_6MxEvent instance Constructor C1_7MxEvent instance Constructor C1_8MxEvent instance Constructor C1_9MxEvent instance Constructor C1_10MxEvent instance Constructor C1_11MxEvent instance Constructor C1_12MxEvent instance Datatype D1MxTableId instance Constructor C1_0MxTableId instance Constructor C1_1MxTableId instance Datatype D1MxAgentStart instance Constructor C1_0MxAgentStart instance Selector S1_0_0MxAgentStart instance Selector S1_0_1MxAgentStart instance Binary MxAgentStart instance Binary MxTableId instance Addressable MxEvent instance Binary MxEvent module Control.Distributed.Process.Internal.Messaging sendPayload :: LocalNode -> Identifier -> Identifier -> ImplicitReconnect -> [ByteString] -> IO () sendBinary :: Binary a => LocalNode -> Identifier -> Identifier -> ImplicitReconnect -> a -> IO () sendMessage :: Serializable a => LocalNode -> Identifier -> Identifier -> ImplicitReconnect -> a -> IO () disconnect :: LocalNode -> Identifier -> Identifier -> IO () closeImplicitReconnections :: LocalNode -> Identifier -> IO () -- | a impliesDeathOf b is true if the death of a -- (for instance, a node) implies the death of b (for instance, -- a process on that node) impliesDeathOf :: Identifier -> Identifier -> Bool sendCtrlMsg :: Maybe NodeId -> ProcessSignal -> Process () -- | -- -- Cloud Haskell's semantics do not mention evaluation/strictness -- properties at all; Only the promise (or lack) of signal/message -- delivery between processes is considered. For practical purposes, -- Cloud Haskell optimises communication between (intra-node) local -- processes by skipping the network-transport layer. In order to ensure -- the same strictness semantics however, messages still undergo -- binary serialisation before (internal) transmission takes place. Thus -- we ensure that in both (the local and remote) cases, message payloads -- are fully evaluated. Whilst this provides the user with -- unsurprising behaviour, the resulting performance overhead is -- quite severe. Communicating data between Haskell threads -- without forcing binary serialisation reduces (intra-node, -- inter-process) communication overheads by several orders of magnitude. -- -- This module provides variants of Cloud Haskell's messaging primitives -- (send, sendChan, nsend and wrapMessage) -- which do not force binary serialisation in the local case, -- thereby offering superior intra-node messaging performance. The -- catch is that any evaluation problems lurking within the passed -- data structure (e.g., fields set to undefined and so on) will -- show up in the receiver rather than in the caller (as they would with -- the normal strategy). -- -- Use of the functions in this module can potentially change the runtime -- behaviour of your application. You have been warned! -- -- This module is exported so that you can replace the use of Cloud -- Haskell's safe messaging primitives. If you want to use both -- variants, then you can take advantage of qualified imports, however -- Control.Distributed.Process also re-exports these functions -- under different names, using the unsafe prefix. module Control.Distributed.Process.UnsafePrimitives -- | Send a message send :: Serializable a => ProcessId -> a -> Process () -- | Send a message on a typed channel sendChan :: Serializable a => SendPort a -> a -> Process () -- | Named send to a process in the local registry (asynchronous) nsend :: Serializable a => String -> a -> Process () -- | Create an unencoded Message for any Serializable -- type. wrapMessage :: Serializable a => a -> Message -- | Interface to the management event bus. module Control.Distributed.Process.Management.Internal.Bus publishEvent :: MxEventBus -> Message -> IO () -- | Tracing/Debugging support - Types module Control.Distributed.Process.Management.Internal.Trace.Types data SetTrace TraceEnable :: !ProcessId -> SetTrace TraceDisable :: SetTrace -- | Defines which processes will be traced by a given TraceFlag, -- either by name, or ProcessId. Choosing TraceAll is -- by far the most efficient approach, as the tracer process -- therefore avoids deciding whether or not a trace event is viable. data TraceSubject TraceAll :: TraceSubject TraceProcs :: !(Set ProcessId) -> TraceSubject TraceNames :: !(Set String) -> TraceSubject -- | Defines what will be traced. Flags that control tracing of -- Process events, take a TraceSubject controlling which -- processes should generate trace events in the target process. data TraceFlags TraceFlags :: !(Maybe TraceSubject) -> !(Maybe TraceSubject) -> !(Maybe TraceSubject) -> !(Maybe TraceSubject) -> !(Maybe TraceSubject) -> !(Maybe TraceSubject) -> !Bool -> !Bool -> TraceFlags traceSpawned :: TraceFlags -> !(Maybe TraceSubject) traceDied :: TraceFlags -> !(Maybe TraceSubject) traceRegistered :: TraceFlags -> !(Maybe TraceSubject) traceUnregistered :: TraceFlags -> !(Maybe TraceSubject) traceSend :: TraceFlags -> !(Maybe TraceSubject) traceRecv :: TraceFlags -> !(Maybe TraceSubject) traceNodes :: TraceFlags -> !Bool traceConnections :: TraceFlags -> !Bool data TraceArg TraceStr :: String -> TraceArg Trace :: a -> TraceArg -- | A generic ok response from the trace coordinator. data TraceOk TraceOk :: TraceOk traceLog :: MxEventBus -> String -> IO () traceLogFmt :: MxEventBus -> String -> [TraceArg] -> IO () traceEvent :: MxEventBus -> MxEvent -> IO () traceMessage :: Serializable m => MxEventBus -> m -> IO () defaultTraceFlags :: TraceFlags enableTrace :: MxEventBus -> ProcessId -> IO () enableTraceSync :: MxEventBus -> SendPort TraceOk -> ProcessId -> IO () disableTrace :: MxEventBus -> IO () disableTraceSync :: MxEventBus -> SendPort TraceOk -> IO () getTraceFlags :: MxEventBus -> SendPort TraceFlags -> IO () setTraceFlags :: MxEventBus -> TraceFlags -> IO () setTraceFlagsSync :: MxEventBus -> SendPort TraceOk -> TraceFlags -> IO () getCurrentTraceClient :: MxEventBus -> SendPort (Maybe ProcessId) -> IO () instance Typeable SetTrace instance Typeable TraceSubject instance Typeable TraceFlags instance Typeable TraceOk instance Generic SetTrace instance Eq SetTrace instance Show SetTrace instance Generic TraceSubject instance Show TraceSubject instance Generic TraceFlags instance Show TraceFlags instance Generic TraceOk instance Datatype D1SetTrace instance Constructor C1_0SetTrace instance Constructor C1_1SetTrace instance Datatype D1TraceSubject instance Constructor C1_0TraceSubject instance Constructor C1_1TraceSubject instance Constructor C1_2TraceSubject instance Datatype D1TraceFlags instance Constructor C1_0TraceFlags instance Selector S1_0_0TraceFlags instance Selector S1_0_1TraceFlags instance Selector S1_0_2TraceFlags instance Selector S1_0_3TraceFlags instance Selector S1_0_4TraceFlags instance Selector S1_0_5TraceFlags instance Selector S1_0_6TraceFlags instance Selector S1_0_7TraceFlags instance Datatype D1TraceOk instance Constructor C1_0TraceOk instance Traceable String instance Traceable ProcessId instance Binary TraceOk instance Binary TraceFlags instance Binary TraceSubject instance Binary SetTrace -- | Cloud Haskell primitives -- -- We define these in a separate module so that we don't have to rely on -- the closure combinators module Control.Distributed.Process.Internal.Primitives -- | Send a message send :: Serializable a => ProcessId -> a -> Process () -- | Wait for a message of a specific type expect :: Serializable a => Process a -- | Create a new typed channel newChan :: Serializable a => Process (SendPort a, ReceivePort a) -- | Send a message on a typed channel sendChan :: Serializable a => SendPort a -> a -> Process () -- | Wait for a message on a typed channel receiveChan :: Serializable a => ReceivePort a -> Process a -- | Merge a list of typed channels. -- -- The result port is left-biased: if there are messages available on -- more than one port, the first available message is returned. mergePortsBiased :: Serializable a => [ReceivePort a] -> Process (ReceivePort a) -- | Like mergePortsBiased, but with a round-robin scheduler (rather -- than left-biased) mergePortsRR :: Serializable a => [ReceivePort a] -> Process (ReceivePort a) -- | Unsafe variant of send. This function makes no -- attempt to serialize and (in the case when the destination process -- resides on the same local node) therefore ensure that the payload is -- fully evaluated before it is delivered. unsafeSend :: Serializable a => ProcessId -> a -> Process () -- | Send a message on a typed channel. This function makes no -- attempt to serialize and (in the case when the ReceivePort -- resides on the same local node) therefore ensure that the payload is -- fully evaluated before it is delivered. unsafeSendChan :: Serializable a => SendPort a -> a -> Process () -- | Named send to a process in the local registry (asynchronous). This -- function makes no attempt to serialize and (in the case when -- the destination process resides on the same local node) therefore -- ensure that the payload is fully evaluated before it is delivered. unsafeNSend :: Serializable a => String -> a -> Process () -- | Opaque type used in receiveWait and receiveTimeout data Match b -- | Test the matches in order against each message in the queue receiveWait :: [Match b] -> Process b -- | Like receiveWait but with a timeout. -- -- If the timeout is zero do a non-blocking check for matching messages. -- A non-zero timeout is applied only when waiting for incoming messages -- (that is, after we have checked the messages that are already -- in the mailbox). receiveTimeout :: Int -> [Match b] -> Process (Maybe b) -- | Match against any message of the right type match :: Serializable a => (a -> Process b) -> Match b -- | Match against any message of the right type that satisfies a predicate matchIf :: Serializable a => (a -> Bool) -> (a -> Process b) -> Match b -- | Remove any message from the queue matchUnknown :: Process b -> Match b -- | Match against an arbitrary message. matchAny removes the first -- available message from the process mailbox. To handle arbitrary -- raw messages once removed from the mailbox, see -- handleMessage and unwrapMessage. matchAny :: (Message -> Process b) -> Match b -- | Match against an arbitrary message. Intended for use with -- handleMessage and unwrapMessage, this function -- only removes a message from the process mailbox, if the -- supplied condition matches. The success (or failure) of runtime type -- checks deferred to handleMessage and friends is irrelevant -- here, i.e., if the condition evaluates to True then the -- message will be removed from the process mailbox and decoded, but that -- does not guarantee that an expression passed to -- handleMessage will pass the runtime type checks and therefore -- be evaluated. matchAnyIf :: Serializable a => (a -> Bool) -> (Message -> Process b) -> Match b -- | Match on a typed channel matchChan :: ReceivePort a -> (a -> Process b) -> Match b -- | Match on an arbitrary STM action. -- -- This rather unusaul match primitive allows us to compose -- arbitrary STM actions with checks against our process' mailbox and/or -- any typed channel ReceivePorts we may hold. -- -- This allows us to process multiple input streams along with our -- mailbox, in just the same way that matchChan supports -- checking both the mailbox and an arbitrary set of typed -- channels in one atomic transaction. -- -- Note there are no ordering guarnatees with respect to these disparate -- input sources. matchSTM :: STM a -> (a -> Process b) -> Match b -- | Match against any message, regardless of the underlying (contained) -- type matchMessage :: (Message -> Process Message) -> Match Message -- | Match against any message (regardless of underlying type) that -- satisfies a predicate matchMessageIf :: (Message -> Bool) -> (Message -> Process Message) -> Match Message -- | internal use only. isEncoded :: Message -> Bool -- | Wrap a Serializable value in a Message. Note that -- Messages are Serializable - like the datum they contain -- - but also note, deserialising such a Message will yield a -- Message, not the type within it! To obtain the wrapped datum, -- use unwrapMessage or handleMessage with a specific type. -- --
--   do
--      self <- getSelfPid
--      send self (wrapMessage "blah")
--      Nothing  <- expectTimeout 1000000 :: Process (Maybe String)
--      (Just m) <- expectTimeout 1000000 :: Process (Maybe Message)
--      (Just "blah") <- unwrapMessage m :: Process (Maybe String)
--   
wrapMessage :: Serializable a => a -> Message -- | This is the unsafe variant of wrapMessage. See -- Control.Distributed.Process.UnsafePrimitives for details. unsafeWrapMessage :: Serializable a => a -> Message -- | Attempt to unwrap a raw Message. If the type of the decoded -- message payload matches the expected type, the value will be returned -- with Just, otherwise Nothing indicates the types do -- not match. -- -- This expression, for example, will evaluate to Nothing > -- unwrapMessage (wrapMessage "foobar") :: Process (Maybe Int) -- -- Whereas this expression, will yield Just "foo" > -- unwrapMessage (wrapMessage "foo") :: Process (Maybe String) unwrapMessage :: (Monad m, Serializable a) => Message -> m (Maybe a) -- | Attempt to handle a raw Message. If the type of the message -- matches the type of the first argument to the supplied expression, -- then the message will be decoded and the expression evaluated against -- its value. If this runtime type checking fails however, -- Nothing will be returned to indicate the fact. If the check -- succeeds and evaluation proceeds, the resulting value with be wrapped -- with Just. -- -- Intended for use in catchesExit and matchAny primitives. handleMessage :: (Monad m, Serializable a) => Message -> (a -> m b) -> m (Maybe b) -- | Conditionally handle a raw Message. If the predicate (a -- -> Bool) evaluates to True, invokes the supplied -- handler, other returns Nothing to indicate failure. See -- handleMessage for further information about runtime type -- checking. handleMessageIf :: (Monad m, Serializable a) => Message -> (a -> Bool) -> (a -> m b) -> m (Maybe b) -- | As handleMessage but ignores result, which is useful if you -- don't care whether or not the handler succeeded. handleMessage_ :: (Monad m, Serializable a) => Message -> (a -> m ()) -> m () -- | Conditional version of handleMessage_. handleMessageIf_ :: (Monad m, Serializable a) => Message -> (a -> Bool) -> (a -> m ()) -> m () -- | Forward a raw Message to the given ProcessId. forward :: Message -> ProcessId -> Process () -- | Receives messages and forwards them to pid if p msg == -- True. delegate :: ProcessId -> (Message -> Bool) -> Process () -- | A straight relay that forwards all messages to the supplied pid. relay :: ProcessId -> Process () -- | Proxies pid and forwards messages whenever proc -- evaluates to True. Unlike delegate the predicate -- proc runs in the Process monad, allowing for richer -- proxy behaviour. If proc returns False or the -- runtime type check fails, no action is taken and the proxy -- process will continue running. proxy :: Serializable a => ProcessId -> (a -> Process Bool) -> Process () -- | Terminate immediately (throws a ProcessTerminationException) terminate :: Process a -- | Thrown by terminate data ProcessTerminationException ProcessTerminationException :: ProcessTerminationException -- | Die immediately - throws a ProcessExitException with the given -- reason. die :: Serializable a => a -> Process b -- | Forceful request to kill a process. Where exit provides an -- exception that can be caught and handled, kill throws an -- unexposed exception type which cannot be handled explicitly (by type). kill :: ProcessId -> String -> Process () -- | Graceful request to exit a process. Throws ProcessExitException -- with the supplied reason encoded as a message. Any exit -- signal raised in this manner can be handled using the -- catchExit family of functions. exit :: Serializable a => ProcessId -> a -> Process () -- | Catches ProcessExitException. The handler will not be applied -- unless its type matches the encoded data stored in the exception (see -- the reason argument given to the exit primitive). If the -- handler cannot be applied, the exception will be re-thrown. -- -- To handle ProcessExitException without regard for -- reason, see catch. To handle multiple reasons of -- differing types, see catchesExit. catchExit :: (Show a, Serializable a) => Process b -> (ProcessId -> a -> Process b) -> Process b -- | Lift catches (almost). -- -- As ProcessExitException stores the exit reason as a -- typed, encoded message, a handler must accept inputs of the expected -- type. In order to handle a list of potentially different handlers (and -- therefore input types), a handler passed to catchesExit must -- accept Message and return Maybe (i.e., Just p -- if it handled the exit reason, otherwise Nothing). -- -- See maybeHandleMessage and Message for more details. catchesExit :: Process b -> [(ProcessId -> Message -> (Process (Maybe b)))] -> Process b -- | Internal exception thrown indirectly by exit data ProcessExitException -- | Our own process ID getSelfPid :: Process ProcessId -- | Get the node ID of our local node getSelfNode :: Process NodeId -- | Provide information about a running process data ProcessInfo ProcessInfo :: NodeId -> [String] -> Maybe Int -> [(ProcessId, MonitorRef)] -> [ProcessId] -> ProcessInfo infoNode :: ProcessInfo -> NodeId infoRegisteredNames :: ProcessInfo -> [String] infoMessageQueueLength :: ProcessInfo -> Maybe Int infoMonitors :: ProcessInfo -> [(ProcessId, MonitorRef)] infoLinks :: ProcessInfo -> [ProcessId] -- | Get information about the specified process getProcessInfo :: ProcessId -> Process (Maybe ProcessInfo) data NodeStats NodeStats :: NodeId -> Int -> Int -> Int -> Int -> NodeStats nodeStatsNode :: NodeStats -> NodeId nodeStatsRegisteredNames :: NodeStats -> Int nodeStatsMonitors :: NodeStats -> Int nodeStatsLinks :: NodeStats -> Int nodeStatsProcesses :: NodeStats -> Int -- | Get statistics about the specified node getNodeStats :: NodeId -> Process (Either DiedReason NodeStats) -- | Get statistics about our local node getLocalNodeStats :: Process NodeStats -- | Link to a remote process (asynchronous) -- -- When process A links to process B (that is, process A calls link -- pidB) then an asynchronous exception will be thrown to process A -- when process B terminates (normally or abnormally), or when process A -- gets disconnected from process B. Although it is technically -- possible to catch these exceptions, chances are if you find yourself -- trying to do so you should probably be using monitor rather -- than link. In particular, code such as -- --
--   link pidB   -- Link to process B
--   expect      -- Wait for a message from process B
--   unlink pidB -- Unlink again
--   
-- -- doesn't quite do what one might expect: if process B sends a message -- to process A, and subsequently terminates, then process A might -- or might not be terminated too, depending on whether the exception is -- thrown before or after the unlink (i.e., this code has a race -- condition). -- -- Linking is all-or-nothing: A is either linked to B, or it's not. A -- second call to link has no effect. -- -- Note that link provides unidirectional linking (see -- spawnSupervised). Linking makes no distinction between normal -- and abnormal termination of the remote process. link :: ProcessId -> Process () -- | Remove a link -- -- This is synchronous in the sense that once it returns you are -- guaranteed that no exception will be raised if the remote process -- dies. However, it is asynchronous in the sense that we do not wait for -- a response from the remote node. unlink :: ProcessId -> Process () -- | Monitor another process (asynchronous) -- -- When process A monitors process B (that is, process A calls -- monitor pidB) then process A will receive a -- ProcessMonitorNotification when process B terminates -- (normally or abnormally), or when process A gets disconnected from -- process B. You receive this message like any other (using -- expect); the notification includes a reason (DiedNormal, -- DiedException, DiedDisconnect, etc.). -- -- Every call to monitor returns a new monitor reference -- MonitorRef; if multiple monitors are set up, multiple -- notifications will be delivered and monitors can be disabled -- individually using unmonitor. monitor :: ProcessId -> Process MonitorRef -- | Remove a monitor -- -- This has the same synchronous/asynchronous nature as unlink. unmonitor :: MonitorRef -> Process () -- | Establishes temporary monitoring of another process. -- -- withMonitor pid code sets up monitoring of pid for -- the duration of code. Note: although monitoring is no longer -- active when withMonitor returns, there might still be -- unreceived monitor messages in the queue. withMonitor :: ProcessId -> Process a -> Process a -- | Log a string -- -- say message sends a message (time, pid of the current -- process, message) to the process registered as logger. By -- default, this process simply sends the string to stderr. -- Individual Cloud Haskell backends might replace this with a different -- logger process, however. say :: String -> Process () -- | Register a process with the local registry (synchronous). The name -- must not already be registered. The process need not be on this node. -- A bad registration will result in a -- ProcessRegistrationException -- -- The process to be registered does not have to be local itself. register :: String -> ProcessId -> Process () -- | Like register, but will replace an existing registration. The -- name must already be registered. reregister :: String -> ProcessId -> Process () -- | Remove a process from the local registry (asynchronous). This version -- will wait until a response is gotten from the management process. The -- name must already be registered. unregister :: String -> Process () -- | Query the local process registry whereis :: String -> Process (Maybe ProcessId) -- | Named send to a process in the local registry (asynchronous) nsend :: Serializable a => String -> a -> Process () -- | Register a process with a remote registry (asynchronous). -- -- The process to be registered does not have to live on the same remote -- node. Reply wil come in the form of a RegisterReply message -- -- See comments in whereisRemoteAsync registerRemoteAsync :: NodeId -> String -> ProcessId -> Process () reregisterRemoteAsync :: NodeId -> String -> ProcessId -> Process () -- | Remove a process from a remote registry (asynchronous). -- -- Reply wil come in the form of a RegisterReply message -- -- See comments in whereisRemoteAsync unregisterRemoteAsync :: NodeId -> String -> Process () -- | Query a remote process registry (asynchronous) -- -- Reply will come in the form of a WhereIsReply message. -- -- There is currently no synchronous version of -- whereisRemoteAsync: if you implement one yourself, be sure to -- take into account that the remote node might die or get disconnect -- before it can respond (i.e. you should use monitorNode and take -- appropriate action when you receive a NodeMonitorNotification). whereisRemoteAsync :: NodeId -> String -> Process () -- | Named send to a process in a remote registry (asynchronous) nsendRemote :: Serializable a => NodeId -> String -> a -> Process () -- | Resolve a closure unClosure :: Typeable a => Closure a -> Process a -- | Resolve a static value unStatic :: Typeable a => Static a -> Process a -- | Lift catch catch :: Exception e => Process a -> (e -> Process a) -> Process a -- | You need this when using catches data Handler a Handler :: (e -> Process a) -> Handler a -- | Lift catches catches :: Process a -> [Handler a] -> Process a -- | Lift try try :: Exception e => Process a -> Process (Either e a) -- | Lift mask mask :: ((forall a. Process a -> Process a) -> Process b) -> Process b -- | Lift mask_ mask_ :: Process a -> Process a -- | Lift onException onException :: Process a -> Process b -> Process a -- | Lift bracket bracket :: Process a -> (a -> Process b) -> (a -> Process c) -> Process c -- | Lift bracket_ bracket_ :: Process a -> Process b -> Process c -> Process c -- | Lift finally finally :: Process a -> Process b -> Process a -- | Like expect but with a timeout expectTimeout :: Serializable a => Int -> Process (Maybe a) -- | Like receiveChan but with a timeout. If the timeout is 0, do a -- non-blocking check for a message. receiveChanTimeout :: Serializable a => Int -> ReceivePort a -> Process (Maybe a) -- | Asynchronous version of spawn -- -- (spawn is defined in terms of spawnAsync and -- expect) spawnAsync :: NodeId -> Closure (Process ()) -> Process SpawnRef -- | Link to a node (asynchronous) linkNode :: NodeId -> Process () -- | Link to a channel (asynchronous) linkPort :: SendPort a -> Process () -- | Remove a node link -- -- This has the same synchronous/asynchronous nature as unlink. unlinkNode :: NodeId -> Process () -- | Remove a channel (send port) link -- -- This has the same synchronous/asynchronous nature as unlink. unlinkPort :: SendPort a -> Process () -- | Monitor a node (asynchronous) monitorNode :: NodeId -> Process MonitorRef -- | Monitor a typed channel (asynchronous) monitorPort :: Serializable a => SendPort a -> Process MonitorRef -- | Cloud Haskell provides the illusion of connection-less, reliable, -- ordered message passing. However, when network connections get -- disrupted this illusion cannot always be maintained. Once a network -- connection breaks (even temporarily) no further communication on that -- connection will be possible. For example, if process A sends a message -- to process B, and A is then notified (by monitor notification) that it -- got disconnected from B, A will not be able to send any further -- messages to B, unless A explicitly indicates that it is -- acceptable to attempt to reconnect to B using the Cloud Haskell -- reconnect primitive. -- -- Importantly, when A calls reconnect it acknowledges that some -- messages to B might have been lost. For instance, if A sends messages -- m1 and m2 to B, then receives a monitor notification that its -- connection to B has been lost, calls reconnect and then sends -- m3, it is possible that B will receive m1 and m3 but not m2. -- -- Note that reconnect does not mean reconnect now but -- rather /it is okay to attempt to reconnect on the next send/. In -- particular, if no further communication attempts are made to B then A -- can use reconnect to clean up its connection to B. reconnect :: ProcessId -> Process () -- | Reconnect to a sendport. See reconnect for more information. reconnectPort :: SendPort a -> Process () sendCtrlMsg :: Maybe NodeId -> ProcessSignal -> Process () instance Typeable ProcessTerminationException instance Show ProcessTerminationException instance Functor Handler instance Exception ProcessTerminationException module Control.Distributed.Process.Internal.Closure.BuiltIn remoteTable :: RemoteTable -> RemoteTable -- | Static decoder, given a static serialization dictionary. -- -- See module documentation of Control.Distributed.Process.Closure -- for an example. staticDecode :: Typeable a => Static (SerializableDict a) -> Static (ByteString -> a) -- | Serialization dictionary for '()' sdictUnit :: Static (SerializableDict ()) -- | Serialization dictionary for ProcessId sdictProcessId :: Static (SerializableDict ProcessId) -- | Serialization dictionary for SendPort sdictSendPort :: Typeable a => Static (SerializableDict a) -> Static (SerializableDict (SendPort a)) -- | Serialization dictionary for Static. sdictStatic :: Typeable a => Static (TypeableDict a) -> Static (SerializableDict (Static a)) -- | Serialization dictionary for Closure. sdictClosure :: Typeable a => Static (TypeableDict a) -> Static (SerializableDict (Closure a)) sndStatic :: Static ((a, b) -> b) -- | CP a b is a process with input of type a and output -- of type b type CP a b = Closure (a -> Process b) -- | CP version of id idCP :: Typeable a => CP a a -- | CP version of (***) splitCP :: (Typeable a, Typeable b, Typeable c, Typeable d) => CP a c -> CP b d -> CP (a, b) (c, d) -- | CP version of return returnCP :: Serializable a => Static (SerializableDict a) -> a -> Closure (Process a) -- | (Not quite the) CP version of (>>=) bindCP :: (Typeable a, Typeable b) => Closure (Process a) -> CP a b -> Closure (Process b) -- | CP version of (>>) seqCP :: (Typeable a, Typeable b) => Closure (Process a) -> Closure (Process b) -> Closure (Process b) decodeProcessIdStatic :: Static (ByteString -> ProcessId) -- | CP version of link cpLink :: ProcessId -> Closure (Process ()) -- | CP version of unlink cpUnlink :: ProcessId -> Closure (Process ()) -- | CP version of relay cpRelay :: ProcessId -> Closure (Process ()) -- | CP version of send cpSend :: Typeable a => Static (SerializableDict a) -> ProcessId -> CP a () -- | CP version of expect cpExpect :: Typeable a => Static (SerializableDict a) -> Closure (Process a) -- | CP version of newChan cpNewChan :: Typeable a => Static (SerializableDict a) -> Closure (Process (SendPort a, ReceivePort a)) -- | CP version of delay cpDelayed :: ProcessId -> Closure (Process ()) -> Closure (Process ()) cpEnableTraceRemote :: ProcessId -> Closure (Process ()) module Control.Distributed.Process.Internal.Spawn -- | Spawn a process -- -- For more information about Closure, see -- Control.Distributed.Process.Closure. -- -- See also call. spawn :: NodeId -> Closure (Process ()) -> Process ProcessId -- | Spawn a process and link to it -- -- Note that this is just the sequential composition of spawn and -- link. (The Unified semantics that underlies Cloud -- Haskell does not even support a synchronous link operation) spawnLink :: NodeId -> Closure (Process ()) -> Process ProcessId -- | Like spawnLink, but monitor the spawned process spawnMonitor :: NodeId -> Closure (Process ()) -> Process (ProcessId, MonitorRef) -- | Run a process remotely and wait for it to reply -- -- We monitor the remote process: if it dies before it can send a reply, -- we die too. -- -- For more information about Static, SerializableDict, and -- Closure, see Control.Distributed.Process.Closure. -- -- See also spawn. call :: Serializable a => Static (SerializableDict a) -> NodeId -> Closure (Process a) -> Process a -- | Spawn a child process, have the child link to the parent and the -- parent monitor the child spawnSupervised :: NodeId -> Closure (Process ()) -> Process (ProcessId, MonitorRef) -- | Spawn a new process, supplying it with a new ReceivePort and -- return the corresponding SendPort. spawnChannel :: Serializable a => Static (SerializableDict a) -> NodeId -> Closure (ReceivePort a -> Process ()) -> Process (SendPort a) -- | Template Haskell support module Control.Distributed.Process.Internal.Closure.TH -- | Create the closure, decoder, and metadata definitions for the given -- list of functions remotable :: [Name] -> Q [Dec] -- | Like remotable, but parameterized by the declaration of a -- function instead of the function name. So where for remotable -- you'd do -- --
--   f :: T1 -> T2
--   f = ...
--   
--   remotable ['f]
--   
-- -- with remotableDecl you would instead do -- --
--   remotableDecl [
--      [d| f :: T1 -> T2 ;
--          f = ...
--        |]
--    ]
--   
-- -- remotableDecl creates the function specified as well as the -- various dictionaries and static versions that remotable also -- creates. remotableDecl is sometimes necessary when you want to -- refer to, say, $(mkClosure 'f) within the definition of -- f itself. -- -- NOTE: remotableDecl creates __remoteTableDecl instead -- of __remoteTable so that you can use both remotable -- and remotableDecl within the same module. remotableDecl :: [Q [Dec]] -> Q [Dec] -- | Construct a static value. -- -- If f : forall a1 .. an. T then $(mkStatic 'f) :: forall -- a1 .. an. Static T. Be sure to pass f to -- remotable. mkStatic :: Name -> Q Exp -- | If f : T1 -> T2 is a monomorphic function then -- $(functionSDict 'f) :: Static (SerializableDict T1). -- -- Be sure to pass f to remotable. functionSDict :: Name -> Q Exp -- | If f : T1 -> Process T2 is a monomorphic function then -- $(functionTDict 'f) :: Static (SerializableDict T2). -- -- Be sure to pass f to remotable. functionTDict :: Name -> Q Exp -- | If f : T1 -> T2 then $(mkClosure 'f) :: T1 -> -- Closure T2. -- -- TODO: The current version of mkClosure is too polymorphic (@forall a. -- Binary a => a -> Closure T2). mkClosure :: Name -> Q Exp -- | Make a Closure from a static function. This is useful for -- making a closure for a top-level Process () function, because -- using mkClosure would require adding a dummy () -- argument. mkStaticClosure :: Name -> Q Exp -- | Keeps the tracing API calls separate from the Tracer implementation, -- which allows us to avoid a nasty import cycle between tracing and the -- messaging primitives that rely on it, and also between the node -- controller (which requires access to the tracing related elements of -- our RemoteTable) and the Debug module, which requires -- forkProcess. This module is also used by the management -- agent, which relies on the tracing infrastructure's messaging fabric. module Control.Distributed.Process.Management.Internal.Trace.Primitives -- | Send a log message to the internal tracing facility. If tracing is -- enabled, this will create a custom trace log event. traceLog :: String -> Process () -- | Send a log message to the internal tracing facility, using the given -- list of printable TraceArgs interspersed with the preceding -- delimiter. traceLogFmt :: String -> [TraceArg] -> Process () -- | Send an arbitrary Message to the tracer process. traceMessage :: Serializable m => m -> Process () defaultTraceFlags :: TraceFlags -- | Enable tracing to the supplied process and wait for a TraceOk -- response from the trace coordinator process. enableTrace :: ProcessId -> Process () -- | Enable tracing to the supplied process. enableTraceAsync :: ProcessId -> Process () -- | Disable the currently configured trace and wait for a TraceOk -- response from the trace coordinator process. disableTrace :: Process () -- | Disable the currently configured trace. disableTraceAsync :: Process () getTraceFlags :: Process TraceFlags -- | Set the given flags for the current tracer and wait for a -- TraceOk response from the trace coordinator process. setTraceFlags :: TraceFlags -> Process () -- | Set the given flags for the current tracer. setTraceFlagsAsync :: TraceFlags -> Process () -- | Turn tracing for for a subset of trace targets. traceOnly :: Traceable a => [a] -> Maybe TraceSubject -- | Trace all targets. traceOn :: Maybe TraceSubject -- | Trace no targets. traceOff :: Maybe TraceSubject withLocalTracer :: (MxEventBus -> Process ()) -> Process () withRegisteredTracer :: (ProcessId -> Process a) -> Process a instance Traceable String instance Traceable ProcessId -- | Tracing/Debugging support - Trace Implementation module Control.Distributed.Process.Management.Internal.Trace.Tracer traceController :: MVar ((Weak (CQueue Message))) -> Process () defaultTracer :: Process () systemLoggerTracer :: Process () logfileTracer :: FilePath -> Process () eventLogTracer :: Process () module Control.Distributed.Process.Management.Internal.Trace.Remote -- | Set the given flags for a remote node (asynchronous). setTraceFlagsRemote :: TraceFlags -> NodeId -> Process () -- | Starts a trace relay process on the remote node, which forwards -- all trace events to the registered tracer on this (the calling -- process') node. startTraceRelay :: NodeId -> Process ProcessId -- | Remote Table. remoteTable :: RemoteTable -> RemoteTable module Control.Distributed.Process.Management.Internal.Table data MxTableRequest Delete :: MxTableRequest Purge :: MxTableRequest Clear :: !String -> MxTableRequest Set :: !String -> !Message -> MxTableRequest Get :: !String -> !(SendPort (Maybe Message)) -> MxTableRequest data MxTableId MxForAgent :: !MxAgentId -> MxTableId MxForPid :: !ProcessId -> MxTableId mxTableCoordinator :: String startTableCoordinator :: Fork -> Process () delete :: MxTableId -> Process () purge :: MxTableId -> Process () clear :: String -> MxTableId -> Process () set :: String -> Message -> MxTableId -> Process () get :: Serializable a => ProcessId -> String -> Process (Maybe a) fetch :: Serializable a => MxTableId -> String -> Process (Maybe a) instance Typeable MxTableRequest instance Generic MxTableRequest instance Datatype D1MxTableRequest instance Constructor C1_0MxTableRequest instance Constructor C1_1MxTableRequest instance Constructor C1_2MxTableRequest instance Constructor C1_3MxTableRequest instance Constructor C1_4MxTableRequest instance Binary MxTableRequest -- | -- -- This module presents an API for creating Management Agents: -- special processes that are capable of receiving and responding to a -- node's internal system events. These system events are -- delivered by the management event bus: An internal subsystem -- maintained for each running node, to which all agents are -- automatically subscribed. -- -- Agents are defined in terms of event sinks, taking a -- particular Serializable type and evaluating to an action in -- the MxAgent monad in response. Each MxSink evaluates to -- an MxAction that specifies whether the agent should continue -- processing it's inputs or stop. If the type of a message cannot be -- matched to any of the agent's sinks, it will be discarded. A sink can -- also deliberately skip processing a message, deferring to the -- remaining handlers. This is the only way that more than one -- event sink can handle the same data type, since otherwise the first -- type match will win every time a message arrives. See -- mxSkip for details. -- -- Various events are published to the management event bus -- automatically, the full list of which can be found in the definition -- of the MxEvent data type. Additionally, clients of the -- Management API can publish arbitrary Serializable data -- to the event bus using mxNotify. All running agents receive all -- events (from the primary event bus to which they're subscribed). -- -- Agent processes are automatically registered on the local node, and -- can receive messages via their mailbox just like ordinary processes. -- Unlike ordinary Process code however, it is unnecessary -- (though possible) for agents to use the base expect and -- receiveX primitives to do this, since the management -- infrastructure will continuously read from both the primary event bus -- and the process' own mailbox. Messages are transparently passed -- to the agent's event sinks from both sources, so an agent need only -- concern itself with how to respond to its inputs. -- -- Some agents may wish to prioritise messages from their mailbox over -- traffic on the management event bus, or vice versa. The -- mxReceive and mxReceiveChan API calls do this for the -- mailbox and event bus, respectively. The prioritisation these -- APIs offer is simply that the chosen data stream will be checked -- first. No blocking will occur if the chosen (prioritised) source is -- devoid of input messages, instead the agent handling code will revert -- to switching between the alternatives in round-robin as usual. -- If messages exist in one or more channels, they will be consumed as -- soon as they're available, priority is effectively a hint about which -- channel to consume from, should messages be available in both. -- -- Prioritisation then, is a hint about the preference of data -- source from which the next input should be chosen. No guarantee can be -- made that the chosen source will in fact be selected at runtime. -- -- -- -- The management API provides no guarantees whatsoever, viz: -- -- -- -- -- -- Both management agents and clients of the API have access to a variety -- of data storage capabilities, to facilitate publishing and consuming -- useful system information. Agents maintain their own internal state -- privately (via a state transformer - see mxGetLocal et al), -- however it is possible for agents to share additional data with each -- other (and the outside world) using data tables. -- -- Each agent is assigned its own data table, which acts as a shared map, -- where the keys are Strings and the values are -- Serializable datum of whatever type the agent or its clients -- stores. -- -- Because an agent's data table stores its values in raw -- Message format, it works effectively as an un-typed -- dictionary, into which data of varying types can be fed and later -- retrieved. The upside of this is that different keys can be mapped to -- various types without any additional work on the part of the -- developer. The downside is that the code reading these values must -- know in advance what type(s) to expect, and the API provides no -- additional support for handling that. -- -- Publishing is accomplished using the mxPublish and mxSet -- APIs, whilst querying and deletion are handled by mxGet, -- mxClear, mxPurgeTable and mxDropTable -- respectively. -- -- When a management agent terminates, their tables are left in memory -- despite termination, such that an agent may resume its role (by -- restarting) or have its MxAgentId taken over by another -- subsequent agent, leaving the data originally captured in place. -- -- -- -- New agents are defined with mxAgent and require a unique -- MxAgentId, an initial state - MxAgent runs in a state -- transformer - and a list of the agent's event sinks. Each -- MxSink is defined in terms of a specific Serializable -- type, via the mxSink function, binding the event handler -- expression to inputs of only that type. -- -- Apart from modifying its own local state, an agent can execute -- arbitrary Process a code via lifting (see liftMX) and -- even publish its own messages back to the primary event bus (see -- mxBroadcast). -- -- Since messages are delivered to agents from both the management event -- bus and the agent processes mailbox, agents (i.e., event sinks) will -- generally have no idea as to their origin. An agent can, however, -- choose to prioritise the choice of input (source) each time one of its -- event sinks runs. The standard way for an event sink to -- indicate that the agent is ready for its next input is to evaluate -- mxReady. When this happens, the management infrastructure will -- obtain data from the event bus and process' mailbox in a round robbin -- fashion, i.e., one after the other, changing each time. -- -- -- -- What follows is a grossly over-simplified example of a management -- agent that provides a basic name monitoring facility. Whenever a -- process name is registered or unregistered, clients are informed of -- the fact. -- --
--   -- simple notification data type
--   
--   data Registration = Reg { added  :: Bool
--                           , procId :: ProcessId
--                           , name   :: String
--                           }
--   
--   -- start a /name monitoring agent/
--   nameMonitorAgent = do
--     mxAgent (MxAgentId "name-monitor") Set.empty [
--           (mxSink $ \(pid :: ProcessId) -> do
--              mxUpdateState $ Set.insert pid
--              mxReady)
--         , (mxSink $
--               let act =
--                     case ev of
--                       (MxRegistered   p n) -> notify True  n p
--                       (MxUnRegistered p n) -> notify False n p
--                       _                    -> return ()
--               act >> mxReady)
--       ]
--     where
--       notify a n p = do
--         Foldable.mapM_ (liftMX . deliver (Reg a n p)) =<< mxGetLocal
--   
-- -- The client interface (for sending their pid) can take one of two -- forms: -- --
--   monitorNames = getSelfPid >>= nsend "name-monitor"
--   monitorNames2 = getSelfPid >>= mxNotify
--   
-- -- For some real-world examples, see the distributed-process-platform -- package. -- -- -- -- Management Agents offer numerous advantages over regular -- processes: broadcast communication with them can have a lower latency, -- they offer simplified messgage (i.e., input type) handling and they -- have access to internal system information that would be otherwise -- unobtainable. -- -- Do not be tempted to implement everything (e.g., the kitchen sink) -- using the management API though. There are overheads associated with -- management agents which is why they're presented as tools for -- consuming low level system information, instead of as application -- level development tools. -- -- Agents that rely heavily on a busy mailbox can cause the management -- event bus to backlog un-GC'ed data, leading to increased heap space. -- Producers that do not take care to avoid passing unevaluated thunks to -- the API can crash all the agents in the system. Agents are not -- monitored or managed in any way, and those that crash will not be -- restarted. -- -- The management event bus can receive a great deal of traffic. Every -- time a message is sent and/or received, an event is passed to the -- agent controller and broadcast to all agents (plus the trace -- controller, if tracing is enabled for the node). This is already a -- significant overhead - though profiling and benchmarks have -- demonstrated that it does not adversely affect performance if few -- agents are installed. Agents will typically use more cycles than plain -- processes, since they perform additional work: selecting input data -- from both the event bus and their own mailboxes, plus searching -- through the set of event sinks (for each agent) to determine the right -- handler for the event. -- -- Each management agent requires not only its own Process (in -- which the agent code is run), but also a peer process that provides -- its data table. These data tables also have to be coordinated -- and manaaged on each agent's behalf. -- -- -- -- The architecture of the management event bus is internal and subject -- to change without prior notice. The description that follows is -- provided for informational purposes only. -- -- When a node initially starts, two special, internal system processes -- are started to support the management infrastructure. The first, known -- as the trace controller, is responsible for consuming -- MxEvents and forwarding them to the configured tracer - see -- Control.Distributed.Process.Debug for further details. The -- second is the management agent controller, and is the primary -- worker process underpinning the management infrastructure. All -- published management events are routed to this process, which places -- them onto a system wide event bus and additionally passes them -- directly to the trace controller. -- -- There are several reasons for segregating the tracing and management -- control planes in this fashion. Tracing can be enabled or disabled by -- clients, whilst the management event bus cannot, since in addition to -- providing runtime instrumentation, its intended use-cases include node -- monitoring, peer discovery (via topology providing backends) and other -- essential system services that require knowledge of otherwise hidden -- system internals. Tracing is also subject to trace flags that -- limit the specific MxEvents delivered to trace clients - an -- overhead/complexity not shared by management agents. Finally, tracing -- and management agents are implemented using completely different -- signalling techniques - more on this later - which would introduce -- considerable complexity if the shared the same event loop. -- -- The management control plane is driven by a shared broadcast channel, -- which is written to by the agent controller and subscribed to by all -- agent processes. Agents are spawned as regular processes, whose -- primary implementation (i.e., server loop) is responsible for -- consuming messages from both the broadcast channel and their own -- mailbox. Once consumed, messages are applied to the agent's event -- sinks until one matches the input, at which point it is applied -- and the loop continues. The implementation chooses from the event bus -- and the mailbox in a round-robin fashion, until a message is received. -- This polling activity would lead to management agents consuming -- considerable system resources if left unchecked, therefore the -- implementation will poll for a limitted number of retries, after which -- it will perform a blocking read on the event bus. module Control.Distributed.Process.Management -- | This is the default management event, fired for various -- internal events around the NT connection and Process lifecycle. All -- published events that conform to this type, are eligible for tracing - -- i.e., they will be delivered to the trace controller. data MxEvent -- | fired whenever a local process is spawned MxSpawned :: ProcessId -> MxEvent -- | fired whenever a process/name is registered (locally) MxRegistered :: ProcessId -> String -> MxEvent -- | fired whenever a process/name is unregistered (locally) MxUnRegistered :: ProcessId -> String -> MxEvent -- | fired whenever a process dies MxProcessDied :: ProcessId -> DiedReason -> MxEvent -- | fired whenever a node dies (i.e., the connection is -- broken/disconnected) MxNodeDied :: NodeId -> DiedReason -> MxEvent -- | fired whenever a message is sent from a local process MxSent :: ProcessId -> ProcessId -> Message -> MxEvent -- | fired whenever a message is received by a local process MxReceived :: ProcessId -> Message -> MxEvent -- | fired when a network-transport connection is first established MxConnected :: ConnectionId -> EndPointAddress -> MxEvent -- | fired when a network-transport connection is broken/disconnected MxDisconnected :: ConnectionId -> EndPointAddress -> MxEvent -- | a user defined trace event MxUser :: Message -> MxEvent -- | a logging event - used for debugging purposes only MxLog :: String -> MxEvent -- | notifies a trace listener that all subsequent traces will be sent to -- pid MxTraceTakeover :: ProcessId -> MxEvent -- | notifies a trace listener that it has been disabled/removed MxTraceDisable :: MxEvent -- | Publishes an arbitrary Serializable message to the management -- event bus. Note that no attempt is made to force the argument, -- therefore it is very important that you do not pass unevaluated thunks -- that might crash the receiving process via this API, since all -- registered agents will gain access to the data structure once it is -- broadcast by the agent controller. mxNotify :: Serializable a => a -> Process () -- | Represents the actions a management agent can take when evaluating an -- event sink. data MxAction -- | A newtype wrapper for an agent id (which is a string). newtype MxAgentId MxAgentId :: String -> MxAgentId agentId :: MxAgentId -> String -- | Monad for management agents. data MxAgent s a -- | Activates a new agent. mxAgent :: MxAgentId -> s -> [MxSink s] -> Process ProcessId -- | Activates a new agent. This variant takes a finalizer -- expression, that is run once the agent shuts down (even in case of -- failure/exceptions). The finalizer expression runs in the mx -- monad - MxAgent s () - such that the agent's internal state -- remains accessible to the shutdown/cleanup code. mxAgentWithFinalize :: MxAgentId -> s -> [MxSink s] -> MxAgent s () -> Process ProcessId -- | Type of a management agent's event sink. type MxSink s = Message -> MxAgent s (Maybe MxAction) -- | Create an MxSink from an expression taking a -- Serializable type m, that yields an MxAction -- in the MxAgent monad. mxSink :: Serializable m => (m -> MxAgent s MxAction) -> MxSink s -- | Return the MxAgentId for the currently executing agent. mxGetId :: MxAgent s MxAgentId -- | Gracefully terminate an agent. mxDeactivate :: String -> MxAgent s MxAction -- | Continue executing (i.e., receiving and processing messages). mxReady :: MxAgent s MxAction -- | Causes the currently executing event sink to be skipped. The -- remaining declared event sinks will be evaluated to find a matching -- handler. Can be used to allow multiple event sinks to process data of -- the same type. mxSkip :: MxAgent s MxAction -- | Continue exeucting, prioritising inputs from the process' own -- mailbox ahead of data from the management event bus. mxReceive :: MxAgent s MxAction -- | Continue exeucting, prioritising inputs from the management event bus -- over the process' own mailbox. mxReceiveChan :: MxAgent s MxAction -- | The MxAgent version of mxNotify. mxBroadcast :: Serializable m => m -> MxAgent s () -- | Set the agent's local state. mxSetLocal :: s -> MxAgent s () -- | Fetch the agent's local state. mxGetLocal :: MxAgent s s -- | Update the agent's local state. mxUpdateLocal :: (s -> s) -> MxAgent s () -- | Lift a Process action. liftMX :: Process a -> MxAgent s a -- | Publish an arbitrary Message as a property in the management -- database. -- -- For publishing Serializable data, use mxSet instead. mxPublish :: MxAgentId -> String -> Message -> Process () -- | Sets an arbitrary Serializable datum against a key in the -- management database. Note that no attempt is made to force the -- argument, therefore it is very important that you do not pass -- unevaluated thunks that might crash some other, arbitrary process (or -- management agent!) that obtains and attempts to force the value later -- on. mxSet :: Serializable a => MxAgentId -> String -> a -> Process () -- | Fetches a property from the management database for the given key. If -- the property is not set, or does not match the expected type when -- typechecked (at runtime), returns Nothing. mxGet :: Serializable a => MxAgentId -> String -> Process (Maybe a) -- | Clears a property from the management database using the given key. If -- the key does not exist in the database, this is a noop. mxClear :: MxAgentId -> String -> Process () -- | Purges a table in the management database of all its stored -- properties. mxPurgeTable :: MxAgentId -> Process () -- | Deletes a table from the management database. mxDropTable :: MxAgentId -> Process () module Control.Distributed.Process.Management.Internal.Agent -- | A triple containing a configured tracer, weak pointer to the agent -- controller's mailbox (CQueue) and an expression used to instantiate -- new agents on the current node. type AgentConfig = (Tracer, Weak (CQueue Message), ((TChan Message, TChan Message) -> Process ()) -> IO ProcessId) -- | Starts a management agent for the current node. The agent process must -- not crash or be killed, so we generally avoid publishing its -- ProcessId where possible. -- -- Our process is also responsible for forwarding messages to the trace -- controller, since having two special processes handled via the -- LocalNode would be inelegant. We forward messages directly to -- the trace controller's message queue, just as the MxEventBus -- that's set up on the LocalNode forwards messages directly to -- us. This optimises the code path for tracing and avoids overloading -- the node node controller's internal control plane with additional -- routing, at the cost of a little more complexity and two cases where -- we break encapsulation. mxAgentController :: Fork -> MVar AgentConfig -> Process () -- | Forks a new process in which an mxAgent is run. mxStartAgent :: Fork -> TChan Message -> ((TChan Message, TChan Message) -> Process ()) -> IO ProcessId -- | Start the tracer controller. startTracing :: Fork -> IO Tracer -- | Start a dead letter (agent) queue. -- -- If no agents are registered on the system, the management event bus -- will fill up and its data won't be GC'ed until someone comes along and -- reads from the broadcast channel (via dupTChan of course). This is -- effectively a leak, so to mitigate it, we start a dead letter -- queue that drains the event bus continuously, thus ensuring if -- there are no other consumers that we won't use up heap space -- unnecessarily. startDeadLetterQueue :: TChan Message -> IO () -- | Local nodes module Control.Distributed.Process.Node -- | Local nodes data LocalNode -- | Initialize a new local node. newLocalNode :: Transport -> RemoteTable -> IO LocalNode -- | Force-close a local node -- -- TODO: for now we just close the associated endpoint closeLocalNode :: LocalNode -> IO () -- | Spawn a new process on a local node forkProcess :: LocalNode -> Process () -> IO ProcessId -- | Run a process on a local node and wait for it to finish runProcess :: LocalNode -> Process () -> IO () initRemoteTable :: RemoteTable -- | NodeId of the node localNodeId :: LocalNode -> NodeId instance Typeable ProcessKillException instance Applicative NC instance Functor NC instance Monad NC instance MonadIO NC instance MonadState NCState NC instance MonadReader LocalNode NC instance Show ProcessKillException instance Exception ProcessKillException -- | -- -- Cloud Haskell provides a general purpose tracing mechanism, allowing a -- user supplied tracer process to receive messages when certain -- classes of system events occur. It's possible to use this facility to -- aid in debugging and/or perform other diagnostic tasks to a program at -- runtime. -- -- -- -- Throughout the lifecycle of a local node, the distributed-process -- runtime generates trace events, describing internal runtime -- activities such as the spawning and death of processes, message -- sending, delivery and so on. See the MxEvent type's -- documentation for a list of all the published event types, which -- correspond directly to the types of management events. Users -- can additionally publish custom trace events in the form of -- MxLog log messages or pass custom (i.e., completely user -- defined) event data using the traceMessage function. -- -- All published traces are forwarded to a tracer process, which -- can be specified (and changed) at runtime using traceEnable. -- Some pre-defined tracer processes are provided for conveniently -- printing to stderr, a log file or the GHC eventlog. -- -- If a tracer process crashes, no attempt is made to restart it. -- -- -- -- The tracing facility only ever writes to a single tracer process. This -- invariant insulates the tracer controller and ensures a fast path for -- handling all trace events. This module provides facilities for -- layering trace handlers using Cloud Haskell's built-in delegation -- primitives. -- -- The startTracer function wraps the registered tracer -- process with the supplied handler and also forwards trace events to -- the original tracer. The corresponding stopTracer function -- terminates tracer processes in reverse of the order in which they were -- started, and re-registers the previous tracer process. -- -- -- -- The built in tracers provide a simple logging facility that -- writes trace events out to either a log file, stderr or the -- GHC eventlog. These tracers can be configured using environment -- variables, or specified manually using the traceEnable -- function. -- -- When a new local node is started, the contents of several environment -- variables are checked to determine which default tracer process is -- selected. If none of these variables is set, a no-op tracer process is -- installed, which effectively ignores all trace messages. Note that in -- this case, trace events are still generated and passed through the -- system. Only one default tracer will be chosen - the first that -- contains a (valid) value. These environment variables, in the order -- they're examined, are: -- --
    --
  1. DISTRIBUTED_PROCESS_TRACE_FILE This is checked for a -- valid file path. If it exists and the file can be opened for writing, -- all trace output will be directed thence. If the supplied path is -- invalid, or the file is unavailable for writing, this tracer will not -- be selected.
  2. --
  3. DISTRIBUTED_PROCESS_TRACE_CONSOLE This is checked for -- any non-empty value. If set, then all trace output will be -- directed to the system logger process.
  4. --
  5. DISTRIBUTED_PROCESS_TRACE_EVENTLOG This is checked for -- any non-empty value. If set, all internal traces are written to -- the GHC eventlog.
  6. --
-- -- By default, the built in tracers will ignore all trace events! In -- order to enable tracing the incoming MxEvent stream, the -- DISTRIBUTED_PROCESS_TRACE_FLAGS environment variable accepts -- the following flags, which enable tracing specific event types: -- -- p = trace the spawning of new processes d = trace the death of -- processes n = trace registration of names (i.e., named processes) u = -- trace un-registration of names (i.e., named processes) s = trace the -- sending of messages to other processes r = trace the receipt of -- messages from other processes l = trace node up/down events -- -- Users of the simplelocalnet Cloud Haskell backend should also -- note that because the trace file option only supports trace output -- from a single node (so as to avoid interleaving), a file trace -- configured for the master node will prevent slaves from tracing to the -- file. They will need to fall back to the console or eventlog tracers -- instead, which can be accomplished by setting one of these environment -- variables as well, since the latter will only be selected on -- slaves (when the file tracer selection fails). -- -- Support for writing to the eventlog requires specific intervention to -- work, without which, written traces are silently dropped/ignored and -- no output will be generated. The GHC eventlog documentation provides -- information about enabling, viewing and working with event traces at -- http://hackage.haskell.org/trac/ghc/wiki/EventLog. module Control.Distributed.Process.Debug data TraceArg TraceStr :: String -> TraceArg Trace :: a -> TraceArg -- | Defines what will be traced. Flags that control tracing of -- Process events, take a TraceSubject controlling which -- processes should generate trace events in the target process. data TraceFlags TraceFlags :: !(Maybe TraceSubject) -> !(Maybe TraceSubject) -> !(Maybe TraceSubject) -> !(Maybe TraceSubject) -> !(Maybe TraceSubject) -> !(Maybe TraceSubject) -> !Bool -> !Bool -> TraceFlags traceSpawned :: TraceFlags -> !(Maybe TraceSubject) traceDied :: TraceFlags -> !(Maybe TraceSubject) traceRegistered :: TraceFlags -> !(Maybe TraceSubject) traceUnregistered :: TraceFlags -> !(Maybe TraceSubject) traceSend :: TraceFlags -> !(Maybe TraceSubject) traceRecv :: TraceFlags -> !(Maybe TraceSubject) traceNodes :: TraceFlags -> !Bool traceConnections :: TraceFlags -> !Bool -- | Defines which processes will be traced by a given TraceFlag, -- either by name, or ProcessId. Choosing TraceAll is -- by far the most efficient approach, as the tracer process -- therefore avoids deciding whether or not a trace event is viable. data TraceSubject TraceAll :: TraceSubject TraceProcs :: !(Set ProcessId) -> TraceSubject TraceNames :: !(Set String) -> TraceSubject -- | Enable tracing to the supplied process and wait for a TraceOk -- response from the trace coordinator process. enableTrace :: ProcessId -> Process () -- | Enable tracing to the supplied process. enableTraceAsync :: ProcessId -> Process () -- | Disable the currently configured trace and wait for a TraceOk -- response from the trace coordinator process. disableTrace :: Process () -- | Evaluate proc with tracing enabled via handler, and -- immediately disable tracing thereafter, before giving the result (or -- exception in case of failure). withTracer :: (MxEvent -> Process ()) -> Process a -> Process (Either SomeException a) -- | Evaluate proc with the supplied flags enabled. Any previously -- set trace flags are restored immediately afterwards. withFlags :: TraceFlags -> Process a -> Process (Either SomeException a) getTraceFlags :: Process TraceFlags -- | Set the given flags for the current tracer and wait for a -- TraceOk response from the trace coordinator process. setTraceFlags :: TraceFlags -> Process () -- | Set the given flags for the current tracer. setTraceFlagsAsync :: TraceFlags -> Process () defaultTraceFlags :: TraceFlags -- | Trace all targets. traceOn :: Maybe TraceSubject -- | Turn tracing for for a subset of trace targets. traceOnly :: Traceable a => [a] -> Maybe TraceSubject -- | Trace no targets. traceOff :: Maybe TraceSubject -- | Starts a new tracer, using the supplied trace function. Only one -- tracer can be registered at a time, however this function -- overlays the registered tracer with the supplied handler, allowing the -- user to layer multiple tracers on top of one another, with trace -- events forwarded down through all the layers in turn. Once the top -- layer is stopped, the user is responsible for re-registering the -- original (prior) tracer pid before terminating. See withTracer -- for a mechanism that handles that. startTracer :: (MxEvent -> Process ()) -> Process ProcessId -- | Stops a user supplied tracer started with startTracer. Note -- that only one tracer process can be active at any given time. This -- process will stop the last process started with startTracer. If -- startTracer is called multiple times, successive calls to this -- function will stop the tracers in the reverse order which they were -- started. -- -- This function will never stop the system tracer (i.e., the tracer -- initially started when the node is created), therefore once all user -- supplied tracers (i.e., processes started via startTracer) have -- exited, subsequent calls to this function will have no effect. -- -- If the last tracer to have been registered was not started with -- startTracer then the behaviour of this function is -- undefined. stopTracer :: Process () -- | Send a log message to the internal tracing facility. If tracing is -- enabled, this will create a custom trace log event. traceLog :: String -> Process () -- | Send a log message to the internal tracing facility, using the given -- list of printable TraceArgs interspersed with the preceding -- delimiter. traceLogFmt :: String -> [TraceArg] -> Process () -- | Send an arbitrary Message to the tracer process. traceMessage :: Serializable m => m -> Process () -- | Remote Table. remoteTable :: RemoteTable -> RemoteTable -- | Starts a trace relay process on the remote node, which forwards -- all trace events to the registered tracer on this (the calling -- process') node. startTraceRelay :: NodeId -> Process ProcessId -- | Set the given flags for a remote node (asynchronous). setTraceFlagsRemote :: TraceFlags -> NodeId -> Process () systemLoggerTracer :: Process () logfileTracer :: FilePath -> Process () eventLogTracer :: Process () -- | -- -- This is an implementation of Cloud Haskell, as described in Towards -- Haskell in the Cloud by Jeff Epstein, Andrew Black, and Simon -- Peyton Jones (see -- http://research.microsoft.com/en-us/um/people/simonpj/papers/parallel/), -- although some of the details are different. The precise message -- passing semantics are based on A unified semantics for future -- Erlang by Hans Svensson, Lars-Åke Fredlund and Clara Benac Earle. -- -- For a detailed description of the package and other reference -- materials, please see the distributed-process wiki page on github: -- https://github.com/haskell-distributed/distributed-process/wiki. module Control.Distributed.Process -- | Process identifier data ProcessId -- | Node identifier newtype NodeId NodeId :: EndPointAddress -> NodeId nodeAddress :: NodeId -> EndPointAddress -- | The Cloud Haskell Process type data Process a -- | A send port is identified by a SendPortId. -- -- You cannot send directly to a SendPortId; instead, use -- newChan to create a SendPort. data SendPortId -- | The ID of the node the process is running on processNodeId :: ProcessId -> NodeId -- | The ID of the process that will receive messages sent on this port sendPortProcessId :: SendPortId -> ProcessId -- | Lift a computation from the IO monad. liftIO :: MonadIO m => forall a. IO a -> m a -- | Send a message send :: Serializable a => ProcessId -> a -> Process () -- | Wait for a message of a specific type expect :: Serializable a => Process a -- | Like expect but with a timeout expectTimeout :: Serializable a => Int -> Process (Maybe a) -- | The receive end of a typed channel (not serializable) -- -- Note that ReceivePort implements Functor, -- Applicative, Alternative and Monad. This is -- especially useful when merging receive ports. data ReceivePort a -- | The send send of a typed channel (serializable) data SendPort a -- | The (unique) ID of this send port sendPortId :: SendPort a -> SendPortId -- | Create a new typed channel newChan :: Serializable a => Process (SendPort a, ReceivePort a) -- | Send a message on a typed channel sendChan :: Serializable a => SendPort a -> a -> Process () -- | Wait for a message on a typed channel receiveChan :: Serializable a => ReceivePort a -> Process a -- | Like receiveChan but with a timeout. If the timeout is 0, do a -- non-blocking check for a message. receiveChanTimeout :: Serializable a => Int -> ReceivePort a -> Process (Maybe a) -- | Merge a list of typed channels. -- -- The result port is left-biased: if there are messages available on -- more than one port, the first available message is returned. mergePortsBiased :: Serializable a => [ReceivePort a] -> Process (ReceivePort a) -- | Like mergePortsBiased, but with a round-robin scheduler (rather -- than left-biased) mergePortsRR :: Serializable a => [ReceivePort a] -> Process (ReceivePort a) -- | Unsafe variant of send. This function makes no -- attempt to serialize and (in the case when the destination process -- resides on the same local node) therefore ensure that the payload is -- fully evaluated before it is delivered. unsafeSend :: Serializable a => ProcessId -> a -> Process () -- | Send a message on a typed channel. This function makes no -- attempt to serialize and (in the case when the ReceivePort -- resides on the same local node) therefore ensure that the payload is -- fully evaluated before it is delivered. unsafeSendChan :: Serializable a => SendPort a -> a -> Process () -- | Named send to a process in the local registry (asynchronous). This -- function makes no attempt to serialize and (in the case when -- the destination process resides on the same local node) therefore -- ensure that the payload is fully evaluated before it is delivered. unsafeNSend :: Serializable a => String -> a -> Process () -- | This is the unsafe variant of wrapMessage. See -- Control.Distributed.Process.UnsafePrimitives for details. unsafeWrapMessage :: Serializable a => a -> Message -- | Opaque type used in receiveWait and receiveTimeout data Match b -- | Test the matches in order against each message in the queue receiveWait :: [Match b] -> Process b -- | Like receiveWait but with a timeout. -- -- If the timeout is zero do a non-blocking check for matching messages. -- A non-zero timeout is applied only when waiting for incoming messages -- (that is, after we have checked the messages that are already -- in the mailbox). receiveTimeout :: Int -> [Match b] -> Process (Maybe b) -- | Match against any message of the right type match :: Serializable a => (a -> Process b) -> Match b -- | Match against any message of the right type that satisfies a predicate matchIf :: Serializable a => (a -> Bool) -> (a -> Process b) -> Match b -- | Remove any message from the queue matchUnknown :: Process b -> Match b -- | Match against an arbitrary message. matchAny removes the first -- available message from the process mailbox. To handle arbitrary -- raw messages once removed from the mailbox, see -- handleMessage and unwrapMessage. matchAny :: (Message -> Process b) -> Match b -- | Match against an arbitrary message. Intended for use with -- handleMessage and unwrapMessage, this function -- only removes a message from the process mailbox, if the -- supplied condition matches. The success (or failure) of runtime type -- checks deferred to handleMessage and friends is irrelevant -- here, i.e., if the condition evaluates to True then the -- message will be removed from the process mailbox and decoded, but that -- does not guarantee that an expression passed to -- handleMessage will pass the runtime type checks and therefore -- be evaluated. matchAnyIf :: Serializable a => (a -> Bool) -> (Message -> Process b) -> Match b -- | Match on a typed channel matchChan :: ReceivePort a -> (a -> Process b) -> Match b -- | Match on an arbitrary STM action. -- -- This rather unusaul match primitive allows us to compose -- arbitrary STM actions with checks against our process' mailbox and/or -- any typed channel ReceivePorts we may hold. -- -- This allows us to process multiple input streams along with our -- mailbox, in just the same way that matchChan supports -- checking both the mailbox and an arbitrary set of typed -- channels in one atomic transaction. -- -- Note there are no ordering guarnatees with respect to these disparate -- input sources. matchSTM :: STM a -> (a -> Process b) -> Match b -- | Messages consist of their typeRep fingerprint and their encoding data Message -- | Match against any message, regardless of the underlying (contained) -- type matchMessage :: (Message -> Process Message) -> Match Message -- | Match against any message (regardless of underlying type) that -- satisfies a predicate matchMessageIf :: (Message -> Bool) -> (Message -> Process Message) -> Match Message -- | internal use only. isEncoded :: Message -> Bool -- | Wrap a Serializable value in a Message. Note that -- Messages are Serializable - like the datum they contain -- - but also note, deserialising such a Message will yield a -- Message, not the type within it! To obtain the wrapped datum, -- use unwrapMessage or handleMessage with a specific type. -- --
--   do
--      self <- getSelfPid
--      send self (wrapMessage "blah")
--      Nothing  <- expectTimeout 1000000 :: Process (Maybe String)
--      (Just m) <- expectTimeout 1000000 :: Process (Maybe Message)
--      (Just "blah") <- unwrapMessage m :: Process (Maybe String)
--   
wrapMessage :: Serializable a => a -> Message -- | Attempt to unwrap a raw Message. If the type of the decoded -- message payload matches the expected type, the value will be returned -- with Just, otherwise Nothing indicates the types do -- not match. -- -- This expression, for example, will evaluate to Nothing > -- unwrapMessage (wrapMessage "foobar") :: Process (Maybe Int) -- -- Whereas this expression, will yield Just "foo" > -- unwrapMessage (wrapMessage "foo") :: Process (Maybe String) unwrapMessage :: (Monad m, Serializable a) => Message -> m (Maybe a) -- | Attempt to handle a raw Message. If the type of the message -- matches the type of the first argument to the supplied expression, -- then the message will be decoded and the expression evaluated against -- its value. If this runtime type checking fails however, -- Nothing will be returned to indicate the fact. If the check -- succeeds and evaluation proceeds, the resulting value with be wrapped -- with Just. -- -- Intended for use in catchesExit and matchAny primitives. handleMessage :: (Monad m, Serializable a) => Message -> (a -> m b) -> m (Maybe b) -- | Conditionally handle a raw Message. If the predicate (a -- -> Bool) evaluates to True, invokes the supplied -- handler, other returns Nothing to indicate failure. See -- handleMessage for further information about runtime type -- checking. handleMessageIf :: (Monad m, Serializable a) => Message -> (a -> Bool) -> (a -> m b) -> m (Maybe b) -- | As handleMessage but ignores result, which is useful if you -- don't care whether or not the handler succeeded. handleMessage_ :: (Monad m, Serializable a) => Message -> (a -> m ()) -> m () -- | Conditional version of handleMessage_. handleMessageIf_ :: (Monad m, Serializable a) => Message -> (a -> Bool) -> (a -> m ()) -> m () -- | Forward a raw Message to the given ProcessId. forward :: Message -> ProcessId -> Process () -- | Receives messages and forwards them to pid if p msg == -- True. delegate :: ProcessId -> (Message -> Bool) -> Process () -- | A straight relay that forwards all messages to the supplied pid. relay :: ProcessId -> Process () -- | Proxies pid and forwards messages whenever proc -- evaluates to True. Unlike delegate the predicate -- proc runs in the Process monad, allowing for richer -- proxy behaviour. If proc returns False or the -- runtime type check fails, no action is taken and the proxy -- process will continue running. proxy :: Serializable a => ProcessId -> (a -> Process Bool) -> Process () -- | Spawn a process -- -- For more information about Closure, see -- Control.Distributed.Process.Closure. -- -- See also call. spawn :: NodeId -> Closure (Process ()) -> Process ProcessId -- | Run a process remotely and wait for it to reply -- -- We monitor the remote process: if it dies before it can send a reply, -- we die too. -- -- For more information about Static, SerializableDict, and -- Closure, see Control.Distributed.Process.Closure. -- -- See also spawn. call :: Serializable a => Static (SerializableDict a) -> NodeId -> Closure (Process a) -> Process a -- | Terminate immediately (throws a ProcessTerminationException) terminate :: Process a -- | Die immediately - throws a ProcessExitException with the given -- reason. die :: Serializable a => a -> Process b -- | Forceful request to kill a process. Where exit provides an -- exception that can be caught and handled, kill throws an -- unexposed exception type which cannot be handled explicitly (by type). kill :: ProcessId -> String -> Process () -- | Graceful request to exit a process. Throws ProcessExitException -- with the supplied reason encoded as a message. Any exit -- signal raised in this manner can be handled using the -- catchExit family of functions. exit :: Serializable a => ProcessId -> a -> Process () -- | Catches ProcessExitException. The handler will not be applied -- unless its type matches the encoded data stored in the exception (see -- the reason argument given to the exit primitive). If the -- handler cannot be applied, the exception will be re-thrown. -- -- To handle ProcessExitException without regard for -- reason, see catch. To handle multiple reasons of -- differing types, see catchesExit. catchExit :: (Show a, Serializable a) => Process b -> (ProcessId -> a -> Process b) -> Process b -- | Lift catches (almost). -- -- As ProcessExitException stores the exit reason as a -- typed, encoded message, a handler must accept inputs of the expected -- type. In order to handle a list of potentially different handlers (and -- therefore input types), a handler passed to catchesExit must -- accept Message and return Maybe (i.e., Just p -- if it handled the exit reason, otherwise Nothing). -- -- See maybeHandleMessage and Message for more details. catchesExit :: Process b -> [(ProcessId -> Message -> (Process (Maybe b)))] -> Process b -- | Thrown by terminate data ProcessTerminationException ProcessTerminationException :: ProcessTerminationException -- | Exception thrown when a process attempts to register a process under -- an already-registered name or to unregister a name that hasn't been -- registered data ProcessRegistrationException ProcessRegistrationException :: !String -> ProcessRegistrationException -- | SpawnRef are used to return pids of spawned processes data SpawnRef -- | Our own process ID getSelfPid :: Process ProcessId -- | Get the node ID of our local node getSelfNode :: Process NodeId -- | Provide information about a running process data ProcessInfo ProcessInfo :: NodeId -> [String] -> Maybe Int -> [(ProcessId, MonitorRef)] -> [ProcessId] -> ProcessInfo infoNode :: ProcessInfo -> NodeId infoRegisteredNames :: ProcessInfo -> [String] infoMessageQueueLength :: ProcessInfo -> Maybe Int infoMonitors :: ProcessInfo -> [(ProcessId, MonitorRef)] infoLinks :: ProcessInfo -> [ProcessId] -- | Get information about the specified process getProcessInfo :: ProcessId -> Process (Maybe ProcessInfo) data NodeStats NodeStats :: NodeId -> Int -> Int -> Int -> Int -> NodeStats nodeStatsNode :: NodeStats -> NodeId nodeStatsRegisteredNames :: NodeStats -> Int nodeStatsMonitors :: NodeStats -> Int nodeStatsLinks :: NodeStats -> Int nodeStatsProcesses :: NodeStats -> Int -- | Get statistics about the specified node getNodeStats :: NodeId -> Process (Either DiedReason NodeStats) -- | Get statistics about our local node getLocalNodeStats :: Process NodeStats -- | Link to a remote process (asynchronous) -- -- When process A links to process B (that is, process A calls link -- pidB) then an asynchronous exception will be thrown to process A -- when process B terminates (normally or abnormally), or when process A -- gets disconnected from process B. Although it is technically -- possible to catch these exceptions, chances are if you find yourself -- trying to do so you should probably be using monitor rather -- than link. In particular, code such as -- --
--   link pidB   -- Link to process B
--   expect      -- Wait for a message from process B
--   unlink pidB -- Unlink again
--   
-- -- doesn't quite do what one might expect: if process B sends a message -- to process A, and subsequently terminates, then process A might -- or might not be terminated too, depending on whether the exception is -- thrown before or after the unlink (i.e., this code has a race -- condition). -- -- Linking is all-or-nothing: A is either linked to B, or it's not. A -- second call to link has no effect. -- -- Note that link provides unidirectional linking (see -- spawnSupervised). Linking makes no distinction between normal -- and abnormal termination of the remote process. link :: ProcessId -> Process () -- | Link to a node (asynchronous) linkNode :: NodeId -> Process () -- | Link to a channel (asynchronous) linkPort :: SendPort a -> Process () -- | Remove a link -- -- This is synchronous in the sense that once it returns you are -- guaranteed that no exception will be raised if the remote process -- dies. However, it is asynchronous in the sense that we do not wait for -- a response from the remote node. unlink :: ProcessId -> Process () -- | Remove a node link -- -- This has the same synchronous/asynchronous nature as unlink. unlinkNode :: NodeId -> Process () -- | Remove a channel (send port) link -- -- This has the same synchronous/asynchronous nature as unlink. unlinkPort :: SendPort a -> Process () -- | Monitor another process (asynchronous) -- -- When process A monitors process B (that is, process A calls -- monitor pidB) then process A will receive a -- ProcessMonitorNotification when process B terminates -- (normally or abnormally), or when process A gets disconnected from -- process B. You receive this message like any other (using -- expect); the notification includes a reason (DiedNormal, -- DiedException, DiedDisconnect, etc.). -- -- Every call to monitor returns a new monitor reference -- MonitorRef; if multiple monitors are set up, multiple -- notifications will be delivered and monitors can be disabled -- individually using unmonitor. monitor :: ProcessId -> Process MonitorRef -- | Monitor a node (asynchronous) monitorNode :: NodeId -> Process MonitorRef -- | Monitor a typed channel (asynchronous) monitorPort :: Serializable a => SendPort a -> Process MonitorRef -- | Remove a monitor -- -- This has the same synchronous/asynchronous nature as unlink. unmonitor :: MonitorRef -> Process () -- | Establishes temporary monitoring of another process. -- -- withMonitor pid code sets up monitoring of pid for -- the duration of code. Note: although monitoring is no longer -- active when withMonitor returns, there might still be -- unreceived monitor messages in the queue. withMonitor :: ProcessId -> Process a -> Process a -- | MonitorRef is opaque for regular Cloud Haskell processes data MonitorRef -- | Exceptions thrown when a linked process dies data ProcessLinkException ProcessLinkException :: !ProcessId -> !DiedReason -> ProcessLinkException -- | Exception thrown when a linked node dies data NodeLinkException NodeLinkException :: !NodeId -> !DiedReason -> NodeLinkException -- | Exception thrown when a linked channel (port) dies data PortLinkException PortLinkException :: !SendPortId -> !DiedReason -> PortLinkException -- | Message sent by process monitors data ProcessMonitorNotification ProcessMonitorNotification :: !MonitorRef -> !ProcessId -> !DiedReason -> ProcessMonitorNotification -- | Message sent by node monitors data NodeMonitorNotification NodeMonitorNotification :: !MonitorRef -> !NodeId -> !DiedReason -> NodeMonitorNotification -- | Message sent by channel (port) monitors data PortMonitorNotification PortMonitorNotification :: !MonitorRef -> !SendPortId -> !DiedReason -> PortMonitorNotification -- | Why did a process die? data DiedReason -- | Normal termination DiedNormal :: DiedReason -- | The process exited with an exception (provided as String -- because Exception does not implement Binary) DiedException :: !String -> DiedReason -- | We got disconnected from the process node DiedDisconnect :: DiedReason -- | The process node died DiedNodeDown :: DiedReason -- | Invalid (processnodechannel) identifier DiedUnknownId :: DiedReason -- | A closure is a static value and an encoded environment data Closure a :: * -> * closure :: Static (ByteString -> a) -> ByteString -> Closure a -- | A static value. Static is opaque; see staticLabel and -- staticApply. data Static a :: * -> * -- | Resolve a static value unStatic :: Typeable a => Static a -> Process a -- | Resolve a closure unClosure :: Typeable a => Closure a -> Process a -- | Runtime dictionary for unstatic lookups data RemoteTable :: * -- | Log a string -- -- say message sends a message (time, pid of the current -- process, message) to the process registered as logger. By -- default, this process simply sends the string to stderr. -- Individual Cloud Haskell backends might replace this with a different -- logger process, however. say :: String -> Process () -- | Register a process with the local registry (synchronous). The name -- must not already be registered. The process need not be on this node. -- A bad registration will result in a -- ProcessRegistrationException -- -- The process to be registered does not have to be local itself. register :: String -> ProcessId -> Process () -- | Like register, but will replace an existing registration. The -- name must already be registered. reregister :: String -> ProcessId -> Process () -- | Remove a process from the local registry (asynchronous). This version -- will wait until a response is gotten from the management process. The -- name must already be registered. unregister :: String -> Process () -- | Query the local process registry whereis :: String -> Process (Maybe ProcessId) -- | Named send to a process in the local registry (asynchronous) nsend :: Serializable a => String -> a -> Process () -- | Register a process with a remote registry (asynchronous). -- -- The process to be registered does not have to live on the same remote -- node. Reply wil come in the form of a RegisterReply message -- -- See comments in whereisRemoteAsync registerRemoteAsync :: NodeId -> String -> ProcessId -> Process () reregisterRemoteAsync :: NodeId -> String -> ProcessId -> Process () -- | Remove a process from a remote registry (asynchronous). -- -- Reply wil come in the form of a RegisterReply message -- -- See comments in whereisRemoteAsync unregisterRemoteAsync :: NodeId -> String -> Process () -- | Query a remote process registry (asynchronous) -- -- Reply will come in the form of a WhereIsReply message. -- -- There is currently no synchronous version of -- whereisRemoteAsync: if you implement one yourself, be sure to -- take into account that the remote node might die or get disconnect -- before it can respond (i.e. you should use monitorNode and take -- appropriate action when you receive a NodeMonitorNotification). whereisRemoteAsync :: NodeId -> String -> Process () -- | Named send to a process in a remote registry (asynchronous) nsendRemote :: Serializable a => NodeId -> String -> a -> Process () -- | (Asynchronous) reply from whereis data WhereIsReply WhereIsReply :: String -> (Maybe ProcessId) -> WhereIsReply -- | (Asynchronous) reply from register and unregister data RegisterReply RegisterReply :: String -> Bool -> RegisterReply -- | Lift catch catch :: Exception e => Process a -> (e -> Process a) -> Process a -- | You need this when using catches data Handler a Handler :: (e -> Process a) -> Handler a -- | Lift catches catches :: Process a -> [Handler a] -> Process a -- | Lift try try :: Exception e => Process a -> Process (Either e a) -- | Lift mask mask :: ((forall a. Process a -> Process a) -> Process b) -> Process b -- | Lift mask_ mask_ :: Process a -> Process a -- | Lift onException onException :: Process a -> Process b -> Process a -- | Lift bracket bracket :: Process a -> (a -> Process b) -> (a -> Process c) -> Process c -- | Lift bracket_ bracket_ :: Process a -> Process b -> Process c -> Process c -- | Lift finally finally :: Process a -> Process b -> Process a -- | Asynchronous version of spawn -- -- (spawn is defined in terms of spawnAsync and -- expect) spawnAsync :: NodeId -> Closure (Process ()) -> Process SpawnRef -- | Spawn a child process, have the child link to the parent and the -- parent monitor the child spawnSupervised :: NodeId -> Closure (Process ()) -> Process (ProcessId, MonitorRef) -- | Spawn a process and link to it -- -- Note that this is just the sequential composition of spawn and -- link. (The Unified semantics that underlies Cloud -- Haskell does not even support a synchronous link operation) spawnLink :: NodeId -> Closure (Process ()) -> Process ProcessId -- | Like spawnLink, but monitor the spawned process spawnMonitor :: NodeId -> Closure (Process ()) -> Process (ProcessId, MonitorRef) -- | Spawn a new process, supplying it with a new ReceivePort and -- return the corresponding SendPort. spawnChannel :: Serializable a => Static (SerializableDict a) -> NodeId -> Closure (ReceivePort a -> Process ()) -> Process (SendPort a) -- | (Asynchronius) reply from spawn data DidSpawn DidSpawn :: SpawnRef -> ProcessId -> DidSpawn -- | Spawn a process on the local node spawnLocal :: Process () -> Process ProcessId -- | Create a new typed channel, spawn a process on the local node, passing -- it the receive port, and return the send port spawnChannelLocal :: Serializable a => (ReceivePort a -> Process ()) -> Process (SendPort a) -- | Cloud Haskell provides the illusion of connection-less, reliable, -- ordered message passing. However, when network connections get -- disrupted this illusion cannot always be maintained. Once a network -- connection breaks (even temporarily) no further communication on that -- connection will be possible. For example, if process A sends a message -- to process B, and A is then notified (by monitor notification) that it -- got disconnected from B, A will not be able to send any further -- messages to B, unless A explicitly indicates that it is -- acceptable to attempt to reconnect to B using the Cloud Haskell -- reconnect primitive. -- -- Importantly, when A calls reconnect it acknowledges that some -- messages to B might have been lost. For instance, if A sends messages -- m1 and m2 to B, then receives a monitor notification that its -- connection to B has been lost, calls reconnect and then sends -- m3, it is possible that B will receive m1 and m3 but not m2. -- -- Note that reconnect does not mean reconnect now but -- rather /it is okay to attempt to reconnect on the next send/. In -- particular, if no further communication attempts are made to B then A -- can use reconnect to clean up its connection to B. reconnect :: ProcessId -> Process () -- | Reconnect to a sendport. See reconnect for more information. reconnectPort :: SendPort a -> Process () module Control.Distributed.Process.Internal.Closure.Explicit -- | A RemoteRegister is a trasformer on a RemoteTable to register -- additional static values. type RemoteRegister = RemoteTable -> RemoteTable class MkTDict a mkTDict :: MkTDict a => String -> a -> RemoteRegister -- | This takes an explicit name and a value, and produces both a static -- reference to the name and a RemoteRegister for it. mkStaticVal :: Serializable a => String -> a -> (Static a, RemoteRegister) -- | This takes an explicit name, a function of arity one, and creates a -- creates a function yielding a closure and a remote register for it. mkClosureValSingle :: (Serializable a, Typeable b, MkTDict b) => String -> (a -> b) -> (a -> Closure b, RemoteRegister) -- | This takes an explict name, a function of any arity, and creates a -- function yielding a closure and a remote register for it. mkClosureVal :: (Curry (argTuple -> Closure result) closureFunction, MkTDict result, Uncurry HTrue argTuple func result, Typeable result, Serializable argTuple, IsFunction func HTrue) => String -> func -> (closureFunction, RemoteRegister) -- | Works just like standard call, but with a simpler signature. call' :: Serializable a => NodeId -> Closure (Process a) -> Process a instance [overlap ok] Typeable EndOfTuple instance [overlap ok] b ~ HFalse => IsFunction a b instance [overlap ok] b ~ HTrue => IsFunction (a -> c) b instance [overlap ok] (IsFunction func b, Uncurry b args func result) => Uncurry'' args func result instance [overlap ok] Uncurry HFalse EndOfTuple a a instance [overlap ok] Uncurry'' rest f r => Uncurry HTrue (a, rest) (a -> f) r instance [overlap ok] Curry (b -> c) r => Curry ((a, b) -> c) (a -> r) instance [overlap ok] Curry ((a, EndOfTuple) -> b) (a -> b) instance [overlap ok] Binary EndOfTuple instance [overlap ok] MkTDict a instance [overlap ok] Serializable b => MkTDict (Process b) -- | Towards Haskell in the Cloud (Epstein et al., Haskell Symposium -- 2011) proposes a new type construct called static that -- characterizes values that are known statically. Cloud Haskell uses the -- Static implementation from Control.Distributed.Static. -- That module comes with its own extensive documentation, which you -- should read if you want to know the details. Here we explain the -- Template Haskell support only. -- -- -- -- Given a top-level (possibly polymorphic, but unqualified) definition -- --
--   f :: forall a1 .. an. T
--   f = ...
--   
-- -- you can use a Template Haskell splice to create a static version of -- f: -- --
--   $(mkStatic 'f) :: forall a1 .. an. (Typeable a1, .., Typeable an) => Static T
--   
-- -- Every module that you write that contains calls to mkStatic -- needs to have a call to remotable: -- --
--   remotable [ 'f, 'g, ... ]
--   
-- -- where you must pass every function (or other value) that you pass as -- an argument to mkStatic. The call to remotable will -- create a definition -- --
--   __remoteTable :: RemoteTable -> RemoteTable
--   
-- -- which can be used to construct the RemoteTable used to -- initialize Cloud Haskell. You should have (at most) one call to -- remotable per module, and compose all created functions when -- initializing Cloud Haskell: -- --
--   let rtable :: RemoteTable
--       rtable = M1.__remoteTable
--              . M2.__remoteTable
--              . ...
--              . Mn.__remoteTable
--              $ initRemoteTable
--   
-- -- NOTE: If you get a type error from ghc along these lines -- --
--   The exact Name `a_a30k' is not in scope
--        Probable cause: you used a unique name (NameU) in Template Haskell but did not bind it
--   
-- -- then you need to enable the ScopedTypeVariables language -- extension. -- -- -- -- Some Cloud Haskell primitives require static serialization -- dictionaries (**): -- --
--   call :: Serializable a => Static (SerializableDict a) -> NodeId -> Closure (Process a) -> Process a
--   
-- -- Given some serializable type T you can define -- --
--   sdictT :: SerializableDict T
--   sdictT = SerializableDict
--   
-- -- and then have -- --
--   $(mkStatic 'sdictT) :: Static (SerializableDict T)
--   
-- -- However, since these dictionaries are so frequently required Cloud -- Haskell provides special support for them. When you call -- remotable on a monomorphic function f :: T1 -> -- T2 -- --
--   remotable ['f]
--   
-- -- then a serialization dictionary is automatically created for you, -- which you can access with -- --
--   $(functionSDict 'f) :: Static (SerializableDict T1)
--   
-- -- In addition, if f :: T1 -> Process T2, then a second -- dictionary is created -- --
--   $(functionTDict 'f) :: Static (SerializableDict T2)
--   
-- -- -- -- Suppose you have a process -- --
--   isPrime :: Integer -> Process Bool
--   
-- -- Then -- --
--   $(mkClosure 'isPrime) :: Integer -> Closure (Process Bool)
--   
-- -- which you can then call, for example, to have a remote node -- check if a number is prime. -- -- In general, if you have a monomorphic function -- --
--   f :: T1 -> T2
--   
-- -- then -- --
--   $(mkClosure 'f) :: T1 -> Closure T2
--   
-- -- provided that T1 is serializable (*) (remember to pass -- f to remotable). -- -- (You can also create closures manually--see the documentation of -- Control.Distributed.Static for examples.) -- -- -- -- Here is a small self-contained example that uses closures and -- serialization dictionaries. It makes use of the -- Control.Distributed.Process.SimpleLocalnet Cloud Haskell backend. -- --
--   {-# LANGUAGE TemplateHaskell #-}
--   import System.Environment (getArgs)
--   import Control.Distributed.Process
--   import Control.Distributed.Process.Closure
--   import Control.Distributed.Process.Backend.SimpleLocalnet
--   import Control.Distributed.Process.Node (initRemoteTable)
--   
--   isPrime :: Integer -> Process Bool
--   isPrime n = return . (n `elem`) . takeWhile (<= n) . sieve $ [2..]
--     where
--       sieve :: [Integer] -> [Integer]
--       sieve (p : xs) = p : sieve [x | x <- xs, x `mod` p > 0]
--   
--   remotable ['isPrime]
--   
--   master :: [NodeId] -> Process ()
--   master [] = liftIO $ putStrLn "no slaves"
--   master (slave:_) = do
--     isPrime79 <- call $(functionTDict 'isPrime) slave ($(mkClosure 'isPrime) (79 :: Integer))
--     liftIO $ print isPrime79
--   
--   main :: IO ()
--   main = do
--     args <- getArgs
--     case args of
--       ["master", host, port] -> do
--         backend <- initializeBackend host port rtable
--         startMaster backend master
--       ["slave", host, port] -> do
--         backend <- initializeBackend host port rtable
--         startSlave backend
--     where
--       rtable :: RemoteTable
--       rtable = __remoteTable initRemoteTable
--   
-- -- -- -- (*) If T1 is not serializable you will get a type error in -- the generated code. Unfortunately, the Template Haskell infrastructure -- cannot check a priori if T1 is serializable or not due to a -- bug in the Template Haskell libraries -- (http://hackage.haskell.org/trac/ghc/ticket/7066) -- -- (**) Even though call is passed an explicit serialization -- dictionary, we still need the Serializable constraint because -- Static is not the true static. If it was, we could -- unstatic the dictionary and pattern match on it to bring the -- Typeable instance into scope, but unless proper -- static support is added to ghc we need both the type class -- argument and the explicit dictionary. module Control.Distributed.Process.Closure -- | Reification of Serializable (see -- Control.Distributed.Process.Closure) data SerializableDict a SerializableDict :: SerializableDict a -- | Static decoder, given a static serialization dictionary. -- -- See module documentation of Control.Distributed.Process.Closure -- for an example. staticDecode :: Typeable a => Static (SerializableDict a) -> Static (ByteString -> a) -- | Serialization dictionary for '()' sdictUnit :: Static (SerializableDict ()) -- | Serialization dictionary for ProcessId sdictProcessId :: Static (SerializableDict ProcessId) -- | Serialization dictionary for SendPort sdictSendPort :: Typeable a => Static (SerializableDict a) -> Static (SerializableDict (SendPort a)) -- | Serialization dictionary for Static. sdictStatic :: Typeable a => Static (TypeableDict a) -> Static (SerializableDict (Static a)) -- | Serialization dictionary for Closure. sdictClosure :: Typeable a => Static (TypeableDict a) -> Static (SerializableDict (Closure a)) -- | CP a b is a process with input of type a and output -- of type b type CP a b = Closure (a -> Process b) -- | CP version of id idCP :: Typeable a => CP a a -- | CP version of (***) splitCP :: (Typeable a, Typeable b, Typeable c, Typeable d) => CP a c -> CP b d -> CP (a, b) (c, d) -- | CP version of return returnCP :: Serializable a => Static (SerializableDict a) -> a -> Closure (Process a) -- | (Not quite the) CP version of (>>=) bindCP :: (Typeable a, Typeable b) => Closure (Process a) -> CP a b -> Closure (Process b) -- | CP version of (>>) seqCP :: (Typeable a, Typeable b) => Closure (Process a) -> Closure (Process b) -> Closure (Process b) -- | CP version of link cpLink :: ProcessId -> Closure (Process ()) -- | CP version of unlink cpUnlink :: ProcessId -> Closure (Process ()) -- | CP version of relay cpRelay :: ProcessId -> Closure (Process ()) -- | CP version of send cpSend :: Typeable a => Static (SerializableDict a) -> ProcessId -> CP a () -- | CP version of expect cpExpect :: Typeable a => Static (SerializableDict a) -> Closure (Process a) -- | CP version of newChan cpNewChan :: Typeable a => Static (SerializableDict a) -> Closure (Process (SendPort a, ReceivePort a)) -- | A RemoteRegister is a trasformer on a RemoteTable to register -- additional static values. type RemoteRegister = RemoteTable -> RemoteTable class MkTDict a mkTDict :: MkTDict a => String -> a -> RemoteRegister -- | This takes an explicit name and a value, and produces both a static -- reference to the name and a RemoteRegister for it. mkStaticVal :: Serializable a => String -> a -> (Static a, RemoteRegister) -- | This takes an explicit name, a function of arity one, and creates a -- creates a function yielding a closure and a remote register for it. mkClosureValSingle :: (Serializable a, Typeable b, MkTDict b) => String -> (a -> b) -> (a -> Closure b, RemoteRegister) -- | This takes an explict name, a function of any arity, and creates a -- function yielding a closure and a remote register for it. mkClosureVal :: (Curry (argTuple -> Closure result) closureFunction, MkTDict result, Uncurry HTrue argTuple func result, Typeable result, Serializable argTuple, IsFunction func HTrue) => String -> func -> (closureFunction, RemoteRegister) -- | Works just like standard call, but with a simpler signature. call' :: Serializable a => NodeId -> Closure (Process a) -> Process a -- | Create the closure, decoder, and metadata definitions for the given -- list of functions remotable :: [Name] -> Q [Dec] -- | Like remotable, but parameterized by the declaration of a -- function instead of the function name. So where for remotable -- you'd do -- --
--   f :: T1 -> T2
--   f = ...
--   
--   remotable ['f]
--   
-- -- with remotableDecl you would instead do -- --
--   remotableDecl [
--      [d| f :: T1 -> T2 ;
--          f = ...
--        |]
--    ]
--   
-- -- remotableDecl creates the function specified as well as the -- various dictionaries and static versions that remotable also -- creates. remotableDecl is sometimes necessary when you want to -- refer to, say, $(mkClosure 'f) within the definition of -- f itself. -- -- NOTE: remotableDecl creates __remoteTableDecl instead -- of __remoteTable so that you can use both remotable -- and remotableDecl within the same module. remotableDecl :: [Q [Dec]] -> Q [Dec] -- | Construct a static value. -- -- If f : forall a1 .. an. T then $(mkStatic 'f) :: forall -- a1 .. an. Static T. Be sure to pass f to -- remotable. mkStatic :: Name -> Q Exp -- | If f : T1 -> T2 then $(mkClosure 'f) :: T1 -> -- Closure T2. -- -- TODO: The current version of mkClosure is too polymorphic (@forall a. -- Binary a => a -> Closure T2). mkClosure :: Name -> Q Exp -- | Make a Closure from a static function. This is useful for -- making a closure for a top-level Process () function, because -- using mkClosure would require adding a dummy () -- argument. mkStaticClosure :: Name -> Q Exp -- | If f : T1 -> T2 is a monomorphic function then -- $(functionSDict 'f) :: Static (SerializableDict T1). -- -- Be sure to pass f to remotable. functionSDict :: Name -> Q Exp -- | If f : T1 -> Process T2 is a monomorphic function then -- $(functionTDict 'f) :: Static (SerializableDict T2). -- -- Be sure to pass f to remotable. functionTDict :: Name -> Q Exp