-- Hoogle documentation, generated by Haddock -- See Hoogle, http://www.haskell.org/hoogle/ -- | Cloud Haskell: Erlang-style concurrency in Haskell -- -- This is an implementation of Cloud Haskell, as described in Towards -- Haskell in the Cloud by Jeff Epstein, Andrew Black, and Simon -- Peyton Jones -- (http://research.microsoft.com/en-us/um/people/simonpj/papers/parallel/), -- although some of the details are different. The precise message -- passing semantics are based on A unified semantics for future -- Erlang by Hans Svensson, Lars-Åke Fredlund and Clara Benac Earle. -- You will probably also want to install a Cloud Haskell backend such as -- distributed-process-simplelocalnet. @package distributed-process @version 0.7.2 module Control.Distributed.Process.Serializable -- | Objects that can be sent across the network class (Binary a, Typeable a) => Serializable a -- | Encode type representation as a bytestring encodeFingerprint :: Fingerprint -> ByteString -- | Decode a bytestring into a fingerprint. Throws an IO exception on -- failure decodeFingerprint :: ByteString -> Fingerprint -- | The fingerprint of the typeRep of the argument fingerprint :: Typeable a => a -> Fingerprint -- | Size of a fingerprint sizeOfFingerprint :: Int data Fingerprint :: * -- | Show fingerprint (for debugging purposes) showFingerprint :: Fingerprint -> ShowS -- | Reification of Serializable (see -- Control.Distributed.Process.Closure) data SerializableDict a [SerializableDict] :: Serializable a => SerializableDict a -- | Reification of Typeable. data TypeableDict a [TypeableDict] :: Typeable a => TypeableDict a instance (Data.Binary.Class.Binary a, Data.Typeable.Internal.Typeable a) => Control.Distributed.Process.Serializable.Serializable a -- | Clone of Control.Concurrent.STM.TQueue with support for mkWeakTQueue -- -- Not all functionality from the original module is available: -- unGetTQueue, peekTQueue and tryPeekTQueue are missing. In order to -- implement these we'd need to be able to touch# the write end of the -- queue inside unGetTQueue, but that means we need a version of touch# -- that works within the STM monad. module Control.Distributed.Process.Internal.WeakTQueue -- | TQueue is an abstract type representing an unbounded FIFO -- channel. data TQueue a -- | Build and returns a new instance of TQueue newTQueue :: STM (TQueue a) -- | IO version of newTQueue. This is useful for creating -- top-level TQueues using unsafePerformIO, because using -- atomically inside unsafePerformIO isn't possible. newTQueueIO :: IO (TQueue a) -- | Read the next value from the TQueue. readTQueue :: TQueue a -> STM a -- | A version of readTQueue which does not retry. Instead it -- returns Nothing if no value is available. tryReadTQueue :: TQueue a -> STM (Maybe a) -- | Write a value to a TQueue. writeTQueue :: TQueue a -> a -> STM () -- | Returns True if the supplied TQueue is empty. isEmptyTQueue :: TQueue a -> STM Bool mkWeakTQueue :: TQueue a -> IO () -> IO (Weak (TQueue a)) instance GHC.Classes.Eq (Control.Distributed.Process.Internal.WeakTQueue.TQueue a) -- | Like Control.Concurrent.MVar.Strict but reduce to HNF, not NF module Control.Distributed.Process.Internal.StrictMVar newtype StrictMVar a StrictMVar :: (MVar a) -> StrictMVar a newEmptyMVar :: IO (StrictMVar a) newMVar :: a -> IO (StrictMVar a) takeMVar :: StrictMVar a -> IO a putMVar :: StrictMVar a -> a -> IO () readMVar :: StrictMVar a -> IO a withMVar :: StrictMVar a -> (a -> IO b) -> IO b modifyMVar_ :: StrictMVar a -> (a -> IO a) -> IO () modifyMVar :: StrictMVar a -> (a -> IO (a, b)) -> IO b modifyMVarMasked :: StrictMVar a -> (a -> IO (a, b)) -> IO b mkWeakMVar :: StrictMVar a -> IO () -> IO (Weak (StrictMVar a)) -- | Spine and element strict list module Control.Distributed.Process.Internal.StrictList -- | Strict list data StrictList a Cons :: !a -> !(StrictList a) -> StrictList a Nil :: StrictList a Snoc :: !(StrictList a) -> !a -> StrictList a Append :: !(StrictList a) -> !(StrictList a) -> StrictList a append :: StrictList a -> StrictList a -> StrictList a foldr :: (a -> b -> b) -> b -> StrictList a -> b module Control.Distributed.Process.Internal.StrictContainerAccessors mapMaybe :: Ord key => key -> Accessor (Map key elem) (Maybe elem) mapDefault :: Ord key => elem -> key -> Accessor (Map key elem) elem -- | Concurrent queue for single reader, single writer module Control.Distributed.Process.Internal.CQueue data CQueue a data BlockSpec NonBlocking :: BlockSpec Blocking :: BlockSpec Timeout :: Int -> BlockSpec data MatchOn m a MatchMsg :: (m -> Maybe a) -> MatchOn m a MatchChan :: (STM a) -> MatchOn m a newCQueue :: IO (CQueue a) -- | Enqueue an element -- -- Enqueue is strict. enqueue :: CQueue a -> a -> IO () -- | Variant of enqueue for use in the STM monad. enqueueSTM :: CQueue a -> a -> STM () -- | Dequeue an element -- -- The timeout (if any) is applied only to waiting for incoming messages, -- not to checking messages that have already arrived dequeue :: forall m a. CQueue m -> BlockSpec -> [MatchOn m a] -> IO (Maybe a) -- | Weak reference to a CQueue mkWeakCQueue :: CQueue a -> IO () -> IO (Weak (CQueue a)) queueSize :: CQueue a -> IO Int instance GHC.Base.Functor (Control.Distributed.Process.Internal.CQueue.MatchOn m) -- | Types used throughout the Cloud Haskell framework -- -- We collect all types used internally in a single module because many -- of these data types are mutually recursive and cannot be split across -- modules. module Control.Distributed.Process.Internal.Types -- | Node identifier newtype NodeId NodeId :: EndPointAddress -> NodeId [nodeAddress] :: NodeId -> EndPointAddress -- | A local process ID consists of a seed which distinguishes processes -- from different instances of the same local node and a counter data LocalProcessId LocalProcessId :: {-# UNPACK #-} !Int32 -> {-# UNPACK #-} !Int32 -> LocalProcessId [lpidUnique] :: LocalProcessId -> {-# UNPACK #-} !Int32 [lpidCounter] :: LocalProcessId -> {-# UNPACK #-} !Int32 -- | Process identifier data ProcessId ProcessId :: !NodeId -> {-# UNPACK #-} !LocalProcessId -> ProcessId -- | The ID of the node the process is running on [processNodeId] :: ProcessId -> !NodeId -- | Node-local identifier for the process [processLocalId] :: ProcessId -> {-# UNPACK #-} !LocalProcessId -- | Union of all kinds of identifiers data Identifier NodeIdentifier :: !NodeId -> Identifier ProcessIdentifier :: !ProcessId -> Identifier SendPortIdentifier :: !SendPortId -> Identifier nodeOf :: Identifier -> NodeId firstNonReservedProcessId :: Int32 nullProcessId :: NodeId -> ProcessId -- | Local nodes data LocalNode LocalNode :: !NodeId -> !EndPoint -> !(StrictMVar LocalNodeState) -> !(Chan NCMsg) -> !MxEventBus -> !RemoteTable -> LocalNode -- | NodeId of the node [localNodeId] :: LocalNode -> !NodeId -- | The network endpoint associated with this node [localEndPoint] :: LocalNode -> !EndPoint -- | Local node state [localState] :: LocalNode -> !(StrictMVar LocalNodeState) -- | Channel for the node controller [localCtrlChan] :: LocalNode -> !(Chan NCMsg) -- | Internal management event bus [localEventBus] :: LocalNode -> !MxEventBus -- | Runtime lookup table for supporting closures TODO: this should be part -- of the CH state, not the local endpoint state [remoteTable] :: LocalNode -> !RemoteTable -- | Local node state data LocalNodeState LocalNodeValid :: {-# UNPACK #-} !ValidLocalNodeState -> LocalNodeState LocalNodeClosed :: LocalNodeState data ValidLocalNodeState ValidLocalNodeState :: !(Map LocalProcessId LocalProcess) -> !Int32 -> !Int32 -> !(Map (Identifier, Identifier) (Connection, ImplicitReconnect)) -> ValidLocalNodeState -- | Processes running on this node [_localProcesses] :: ValidLocalNodeState -> !(Map LocalProcessId LocalProcess) -- | Counter to assign PIDs [_localPidCounter] :: ValidLocalNodeState -> !Int32 -- | The unique value used to create PIDs (so that processes on -- restarted nodes have new PIDs) [_localPidUnique] :: ValidLocalNodeState -> !Int32 -- | Outgoing connections [_localConnections] :: ValidLocalNodeState -> !(Map (Identifier, Identifier) (Connection, ImplicitReconnect)) -- | Thrown by some primitives when they notice the node has been closed. data NodeClosedException NodeClosedException :: NodeId -> NodeClosedException -- | Wrapper around withMVar that checks that the local node is -- still in a valid state. withValidLocalState :: LocalNode -> (ValidLocalNodeState -> IO r) -> IO r -- | Wrapper around modifyMVar that checks that the local node is -- still in a valid state. modifyValidLocalState :: LocalNode -> (ValidLocalNodeState -> IO (ValidLocalNodeState, a)) -> IO (Maybe a) -- | Wrapper around modifyMVar_ that checks that the local node is -- still in a valid state. modifyValidLocalState_ :: LocalNode -> (ValidLocalNodeState -> IO ValidLocalNodeState) -> IO () -- | Provides access to the trace controller data Tracer Tracer :: !ProcessId -> !(Weak (CQueue Message)) -> Tracer -- | Process id for the currently active trace handler [tracerPid] :: Tracer -> !ProcessId -- | Weak reference to the tracer controller's mailbox [weakQ] :: Tracer -> !(Weak (CQueue Message)) -- | Local system management event bus state data MxEventBus MxEventBusInitialising :: MxEventBus MxEventBus :: !ProcessId -> !Tracer -> !(Weak (CQueue Message)) -> !(((TChan Message, TChan Message) -> Process ()) -> IO ProcessId) -> MxEventBus -- | Process id of the management agent controller process [agent] :: MxEventBus -> !ProcessId -- | Configuration for the local trace controller [tracer] :: MxEventBus -> !Tracer -- | Weak reference to the management agent controller's mailbox [evbuss] :: MxEventBus -> !(Weak (CQueue Message)) -- | API for adding management agents to a running node [mxNew] :: MxEventBus -> !(((TChan Message, TChan Message) -> Process ()) -> IO ProcessId) -- | Processes running on our local node data LocalProcess LocalProcess :: !(CQueue Message) -> !(Weak (CQueue Message)) -> !ProcessId -> !(StrictMVar LocalProcessState) -> !ThreadId -> !LocalNode -> LocalProcess [processQueue] :: LocalProcess -> !(CQueue Message) [processWeakQ] :: LocalProcess -> !(Weak (CQueue Message)) [processId] :: LocalProcess -> !ProcessId [processState] :: LocalProcess -> !(StrictMVar LocalProcessState) [processThread] :: LocalProcess -> !ThreadId [processNode] :: LocalProcess -> !LocalNode -- | Local process state data LocalProcessState LocalProcessState :: !Int32 -> !Int32 -> !Int32 -> !(Map LocalSendPortId TypedChannel) -> LocalProcessState [_monitorCounter] :: LocalProcessState -> !Int32 [_spawnCounter] :: LocalProcessState -> !Int32 [_channelCounter] :: LocalProcessState -> !Int32 [_typedChannels] :: LocalProcessState -> !(Map LocalSendPortId TypedChannel) -- | The Cloud Haskell Process type newtype Process a Process :: ReaderT LocalProcess IO a -> Process a [unProcess] :: Process a -> ReaderT LocalProcess IO a -- | Deconstructor for Process (not exported to the public API) runLocalProcess :: LocalProcess -> Process a -> IO a data ImplicitReconnect WithImplicitReconnect :: ImplicitReconnect NoImplicitReconnect :: ImplicitReconnect type LocalSendPortId = Int32 -- | A send port is identified by a SendPortId. -- -- You cannot send directly to a SendPortId; instead, use -- newChan to create a SendPort. data SendPortId SendPortId :: {-# UNPACK #-} !ProcessId -> {-# UNPACK #-} !LocalSendPortId -> SendPortId -- | The ID of the process that will receive messages sent on this port [sendPortProcessId] :: SendPortId -> {-# UNPACK #-} !ProcessId -- | Process-local ID of the channel [sendPortLocalId] :: SendPortId -> {-# UNPACK #-} !LocalSendPortId data TypedChannel TypedChannel :: (Weak (TQueue a)) -> TypedChannel -- | The send send of a typed channel (serializable) newtype SendPort a SendPort :: SendPortId -> SendPort a -- | The (unique) ID of this send port [sendPortId] :: SendPort a -> SendPortId -- | The receive end of a typed channel (not serializable) -- -- Note that ReceivePort implements Functor, -- Applicative, Alternative and Monad. This is -- especially useful when merging receive ports. newtype ReceivePort a ReceivePort :: STM a -> ReceivePort a [receiveSTM] :: ReceivePort a -> STM a -- | Messages consist of their typeRep fingerprint and their encoding data Message EncodedMessage :: !Fingerprint -> !ByteString -> Message [messageFingerprint] :: Message -> !Fingerprint [messageEncoding] :: Message -> !ByteString UnencodedMessage :: !Fingerprint -> !a -> Message [messageFingerprint] :: Message -> !Fingerprint [messagePayload] :: Message -> !a -- | internal use only. isEncoded :: Message -> Bool -- | Turn any serialiable term into a message createMessage :: Serializable a => a -> Message -- | Turn any serializable term into an unencoded/local message createUnencodedMessage :: Serializable a => a -> Message -- | Turn any serializable term into an unencodede/local message, without -- evalutaing it! This is a dangerous business. unsafeCreateUnencodedMessage :: Serializable a => a -> Message -- | Serialize a message messageToPayload :: Message -> [ByteString] -- | Deserialize a message payloadToMessage :: [ByteString] -> Message -- | MonitorRef is opaque for regular Cloud Haskell processes data MonitorRef MonitorRef :: !Identifier -> !Int32 -> MonitorRef -- | ID of the entity to be monitored [monitorRefIdent] :: MonitorRef -> !Identifier -- | Unique to distinguish multiple monitor requests by the same process [monitorRefCounter] :: MonitorRef -> !Int32 -- | Message sent by process monitors data ProcessMonitorNotification ProcessMonitorNotification :: !MonitorRef -> !ProcessId -> !DiedReason -> ProcessMonitorNotification -- | Message sent by node monitors data NodeMonitorNotification NodeMonitorNotification :: !MonitorRef -> !NodeId -> !DiedReason -> NodeMonitorNotification -- | Message sent by channel (port) monitors data PortMonitorNotification PortMonitorNotification :: !MonitorRef -> !SendPortId -> !DiedReason -> PortMonitorNotification -- | Internal exception thrown indirectly by exit data ProcessExitException ProcessExitException :: !ProcessId -> !Message -> ProcessExitException -- | Exceptions thrown when a linked process dies data ProcessLinkException ProcessLinkException :: !ProcessId -> !DiedReason -> ProcessLinkException -- | Exception thrown when a linked node dies data NodeLinkException NodeLinkException :: !NodeId -> !DiedReason -> NodeLinkException -- | Exception thrown when a linked channel (port) dies data PortLinkException PortLinkException :: !SendPortId -> !DiedReason -> PortLinkException -- | Exception thrown when a process attempts to register a process under -- an already-registered name or to unregister a name that hasn't been -- registered. Returns the name and the identifier of the process that -- owns it, if any. data ProcessRegistrationException ProcessRegistrationException :: !String -> !(Maybe ProcessId) -> ProcessRegistrationException -- | Why did a process die? data DiedReason -- | Normal termination DiedNormal :: DiedReason -- | The process exited with an exception (provided as String -- because Exception does not implement Binary) DiedException :: !String -> DiedReason -- | We got disconnected from the process node DiedDisconnect :: DiedReason -- | The process node died DiedNodeDown :: DiedReason -- | Invalid (processnodechannel) identifier DiedUnknownId :: DiedReason -- | (Asynchronous) reply from unmonitor newtype DidUnmonitor DidUnmonitor :: MonitorRef -> DidUnmonitor -- | (Asynchronous) reply from unlink newtype DidUnlinkProcess DidUnlinkProcess :: ProcessId -> DidUnlinkProcess -- | (Asynchronous) reply from unlinkNode newtype DidUnlinkNode DidUnlinkNode :: NodeId -> DidUnlinkNode -- | (Asynchronous) reply from unlinkPort newtype DidUnlinkPort DidUnlinkPort :: SendPortId -> DidUnlinkPort -- | SpawnRef are used to return pids of spawned processes newtype SpawnRef SpawnRef :: Int32 -> SpawnRef -- | (Asynchronius) reply from spawn data DidSpawn DidSpawn :: SpawnRef -> ProcessId -> DidSpawn -- | (Asynchronous) reply from whereis data WhereIsReply WhereIsReply :: String -> (Maybe ProcessId) -> WhereIsReply -- | (Asynchronous) reply from register and unregister data RegisterReply RegisterReply :: String -> Bool -> (Maybe ProcessId) -> RegisterReply -- | Provide information about a running process data ProcessInfo ProcessInfo :: NodeId -> [String] -> Int -> [(ProcessId, MonitorRef)] -> [ProcessId] -> ProcessInfo [infoNode] :: ProcessInfo -> NodeId [infoRegisteredNames] :: ProcessInfo -> [String] [infoMessageQueueLength] :: ProcessInfo -> Int [infoMonitors] :: ProcessInfo -> [(ProcessId, MonitorRef)] [infoLinks] :: ProcessInfo -> [ProcessId] data ProcessInfoNone ProcessInfoNone :: DiedReason -> ProcessInfoNone data NodeStats NodeStats :: NodeId -> Int -> Int -> Int -> Int -> NodeStats [nodeStatsNode] :: NodeStats -> NodeId [nodeStatsRegisteredNames] :: NodeStats -> Int [nodeStatsMonitors] :: NodeStats -> Int [nodeStatsLinks] :: NodeStats -> Int [nodeStatsProcesses] :: NodeStats -> Int -- | Messages to the node controller data NCMsg NCMsg :: !Identifier -> !ProcessSignal -> NCMsg [ctrlMsgSender] :: NCMsg -> !Identifier [ctrlMsgSignal] :: NCMsg -> !ProcessSignal -- | Signals to the node controller (see NCMsg) data ProcessSignal Link :: !Identifier -> ProcessSignal Unlink :: !Identifier -> ProcessSignal Monitor :: !MonitorRef -> ProcessSignal Unmonitor :: !MonitorRef -> ProcessSignal Died :: Identifier -> !DiedReason -> ProcessSignal Spawn :: !(Closure (Process ())) -> !SpawnRef -> ProcessSignal WhereIs :: !String -> ProcessSignal Register :: !String -> !NodeId -> !(Maybe ProcessId) -> !Bool -> ProcessSignal NamedSend :: !String -> !Message -> ProcessSignal UnreliableSend :: !LocalProcessId -> !Message -> ProcessSignal LocalSend :: !ProcessId -> !Message -> ProcessSignal LocalPortSend :: !SendPortId -> !Message -> ProcessSignal Kill :: !ProcessId -> !String -> ProcessSignal Exit :: !ProcessId -> !Message -> ProcessSignal GetInfo :: !ProcessId -> ProcessSignal SigShutdown :: ProcessSignal GetNodeStats :: !NodeId -> ProcessSignal localProcesses :: Accessor ValidLocalNodeState (Map LocalProcessId LocalProcess) localPidCounter :: Accessor ValidLocalNodeState Int32 localPidUnique :: Accessor ValidLocalNodeState Int32 localConnections :: Accessor ValidLocalNodeState (Map (Identifier, Identifier) (Connection, ImplicitReconnect)) localProcessWithId :: LocalProcessId -> Accessor ValidLocalNodeState (Maybe LocalProcess) localConnectionBetween :: Identifier -> Identifier -> Accessor ValidLocalNodeState (Maybe (Connection, ImplicitReconnect)) monitorCounter :: Accessor LocalProcessState Int32 spawnCounter :: Accessor LocalProcessState Int32 channelCounter :: Accessor LocalProcessState LocalSendPortId typedChannels :: Accessor LocalProcessState (Map LocalSendPortId TypedChannel) typedChannelWithId :: LocalSendPortId -> Accessor LocalProcessState (Maybe TypedChannel) forever' :: Monad m => m a -> m b instance GHC.Show.Show Control.Distributed.Process.Internal.Types.NCMsg instance Control.Monad.Reader.Class.MonadReader Control.Distributed.Process.Internal.Types.LocalProcess Control.Distributed.Process.Internal.Types.Process instance Control.Monad.IO.Class.MonadIO Control.Distributed.Process.Internal.Types.Process instance Control.Monad.Fix.MonadFix Control.Distributed.Process.Internal.Types.Process instance GHC.Base.Monad Control.Distributed.Process.Internal.Types.Process instance GHC.Base.Functor Control.Distributed.Process.Internal.Types.Process instance GHC.Base.Applicative Control.Distributed.Process.Internal.Types.Process instance GHC.Show.Show Control.Distributed.Process.Internal.Types.ProcessSignal instance GHC.Show.Show Control.Distributed.Process.Internal.Types.ProcessInfoNone instance GHC.Classes.Eq Control.Distributed.Process.Internal.Types.ProcessInfo instance GHC.Show.Show Control.Distributed.Process.Internal.Types.ProcessInfo instance GHC.Classes.Eq Control.Distributed.Process.Internal.Types.NodeStats instance GHC.Show.Show Control.Distributed.Process.Internal.Types.NodeStats instance GHC.Show.Show Control.Distributed.Process.Internal.Types.RegisterReply instance GHC.Show.Show Control.Distributed.Process.Internal.Types.WhereIsReply instance GHC.Show.Show Control.Distributed.Process.Internal.Types.DidSpawn instance GHC.Classes.Ord Control.Distributed.Process.Internal.Types.SpawnRef instance GHC.Classes.Eq Control.Distributed.Process.Internal.Types.SpawnRef instance Data.Binary.Class.Binary Control.Distributed.Process.Internal.Types.SpawnRef instance GHC.Show.Show Control.Distributed.Process.Internal.Types.SpawnRef instance Data.Binary.Class.Binary Control.Distributed.Process.Internal.Types.DidUnlinkPort instance Data.Binary.Class.Binary Control.Distributed.Process.Internal.Types.DidUnlinkNode instance Data.Binary.Class.Binary Control.Distributed.Process.Internal.Types.DidUnlinkProcess instance Data.Binary.Class.Binary Control.Distributed.Process.Internal.Types.DidUnmonitor instance GHC.Show.Show Control.Distributed.Process.Internal.Types.ProcessMonitorNotification instance GHC.Show.Show Control.Distributed.Process.Internal.Types.NodeMonitorNotification instance GHC.Show.Show Control.Distributed.Process.Internal.Types.PortMonitorNotification instance GHC.Show.Show Control.Distributed.Process.Internal.Types.ProcessLinkException instance GHC.Show.Show Control.Distributed.Process.Internal.Types.NodeLinkException instance GHC.Show.Show Control.Distributed.Process.Internal.Types.PortLinkException instance GHC.Classes.Eq Control.Distributed.Process.Internal.Types.DiedReason instance GHC.Show.Show Control.Distributed.Process.Internal.Types.DiedReason instance GHC.Show.Show Control.Distributed.Process.Internal.Types.ProcessRegistrationException instance GHC.Generics.Generic Control.Distributed.Process.Internal.Types.MonitorRef instance GHC.Show.Show Control.Distributed.Process.Internal.Types.MonitorRef instance GHC.Classes.Ord Control.Distributed.Process.Internal.Types.MonitorRef instance GHC.Classes.Eq Control.Distributed.Process.Internal.Types.MonitorRef instance GHC.Base.Monad Control.Distributed.Process.Internal.Types.ReceivePort instance GHC.Base.Alternative Control.Distributed.Process.Internal.Types.ReceivePort instance GHC.Base.Applicative Control.Distributed.Process.Internal.Types.ReceivePort instance GHC.Base.Functor Control.Distributed.Process.Internal.Types.ReceivePort instance GHC.Classes.Ord (Control.Distributed.Process.Internal.Types.SendPort a) instance GHC.Classes.Eq (Control.Distributed.Process.Internal.Types.SendPort a) instance GHC.Show.Show (Control.Distributed.Process.Internal.Types.SendPort a) instance GHC.Generics.Generic (Control.Distributed.Process.Internal.Types.SendPort a) instance GHC.Generics.Generic Control.Distributed.Process.Internal.Types.Identifier instance GHC.Classes.Ord Control.Distributed.Process.Internal.Types.Identifier instance GHC.Classes.Eq Control.Distributed.Process.Internal.Types.Identifier instance GHC.Generics.Generic Control.Distributed.Process.Internal.Types.SendPortId instance GHC.Classes.Ord Control.Distributed.Process.Internal.Types.SendPortId instance GHC.Classes.Eq Control.Distributed.Process.Internal.Types.SendPortId instance GHC.Show.Show Control.Distributed.Process.Internal.Types.NodeClosedException instance GHC.Show.Show Control.Distributed.Process.Internal.Types.ImplicitReconnect instance GHC.Classes.Eq Control.Distributed.Process.Internal.Types.ImplicitReconnect instance GHC.Generics.Generic Control.Distributed.Process.Internal.Types.ProcessId instance Data.Data.Data Control.Distributed.Process.Internal.Types.ProcessId instance GHC.Classes.Ord Control.Distributed.Process.Internal.Types.ProcessId instance GHC.Classes.Eq Control.Distributed.Process.Internal.Types.ProcessId instance GHC.Show.Show Control.Distributed.Process.Internal.Types.LocalProcessId instance GHC.Generics.Generic Control.Distributed.Process.Internal.Types.LocalProcessId instance Data.Data.Data Control.Distributed.Process.Internal.Types.LocalProcessId instance GHC.Classes.Ord Control.Distributed.Process.Internal.Types.LocalProcessId instance GHC.Classes.Eq Control.Distributed.Process.Internal.Types.LocalProcessId instance GHC.Generics.Generic Control.Distributed.Process.Internal.Types.NodeId instance Data.Data.Data Control.Distributed.Process.Internal.Types.NodeId instance GHC.Classes.Ord Control.Distributed.Process.Internal.Types.NodeId instance GHC.Classes.Eq Control.Distributed.Process.Internal.Types.NodeId instance Data.Binary.Class.Binary Control.Distributed.Process.Internal.Types.NodeId instance Control.DeepSeq.NFData Control.Distributed.Process.Internal.Types.NodeId instance Data.Hashable.Class.Hashable Control.Distributed.Process.Internal.Types.NodeId instance GHC.Show.Show Control.Distributed.Process.Internal.Types.NodeId instance Data.Hashable.Class.Hashable Control.Distributed.Process.Internal.Types.LocalProcessId instance Data.Binary.Class.Binary Control.Distributed.Process.Internal.Types.ProcessId instance Control.DeepSeq.NFData Control.Distributed.Process.Internal.Types.ProcessId instance Data.Hashable.Class.Hashable Control.Distributed.Process.Internal.Types.ProcessId instance GHC.Show.Show Control.Distributed.Process.Internal.Types.ProcessId instance Data.Hashable.Class.Hashable Control.Distributed.Process.Internal.Types.Identifier instance Control.DeepSeq.NFData Control.Distributed.Process.Internal.Types.Identifier instance GHC.Show.Show Control.Distributed.Process.Internal.Types.Identifier instance GHC.Exception.Exception Control.Distributed.Process.Internal.Types.NodeClosedException instance Control.Monad.Catch.MonadThrow Control.Distributed.Process.Internal.Types.Process instance Control.Monad.Catch.MonadCatch Control.Distributed.Process.Internal.Types.Process instance Control.Monad.Catch.MonadMask Control.Distributed.Process.Internal.Types.Process instance Data.Hashable.Class.Hashable Control.Distributed.Process.Internal.Types.SendPortId instance GHC.Show.Show Control.Distributed.Process.Internal.Types.SendPortId instance Control.DeepSeq.NFData Control.Distributed.Process.Internal.Types.SendPortId instance Control.Distributed.Process.Serializable.Serializable a => Data.Binary.Class.Binary (Control.Distributed.Process.Internal.Types.SendPort a) instance Data.Hashable.Class.Hashable a => Data.Hashable.Class.Hashable (Control.Distributed.Process.Internal.Types.SendPort a) instance Control.DeepSeq.NFData a => Control.DeepSeq.NFData (Control.Distributed.Process.Internal.Types.SendPort a) instance Control.DeepSeq.NFData Control.Distributed.Process.Internal.Types.Message instance GHC.Show.Show Control.Distributed.Process.Internal.Types.Message instance Data.Hashable.Class.Hashable Control.Distributed.Process.Internal.Types.MonitorRef instance Control.DeepSeq.NFData Control.Distributed.Process.Internal.Types.MonitorRef instance GHC.Exception.Exception Control.Distributed.Process.Internal.Types.ProcessExitException instance GHC.Show.Show Control.Distributed.Process.Internal.Types.ProcessExitException instance GHC.Exception.Exception Control.Distributed.Process.Internal.Types.ProcessLinkException instance GHC.Exception.Exception Control.Distributed.Process.Internal.Types.NodeLinkException instance GHC.Exception.Exception Control.Distributed.Process.Internal.Types.PortLinkException instance GHC.Exception.Exception Control.Distributed.Process.Internal.Types.ProcessRegistrationException instance Control.DeepSeq.NFData Control.Distributed.Process.Internal.Types.DiedReason instance Data.Binary.Class.Binary Control.Distributed.Process.Internal.Types.Message instance Data.Binary.Class.Binary Control.Distributed.Process.Internal.Types.LocalProcessId instance Data.Binary.Class.Binary Control.Distributed.Process.Internal.Types.ProcessMonitorNotification instance Data.Binary.Class.Binary Control.Distributed.Process.Internal.Types.NodeMonitorNotification instance Data.Binary.Class.Binary Control.Distributed.Process.Internal.Types.PortMonitorNotification instance Data.Binary.Class.Binary Control.Distributed.Process.Internal.Types.NCMsg instance Data.Binary.Class.Binary Control.Distributed.Process.Internal.Types.MonitorRef instance Data.Binary.Class.Binary Control.Distributed.Process.Internal.Types.ProcessSignal instance Data.Binary.Class.Binary Control.Distributed.Process.Internal.Types.DiedReason instance Data.Binary.Class.Binary Control.Distributed.Process.Internal.Types.DidSpawn instance Data.Binary.Class.Binary Control.Distributed.Process.Internal.Types.SendPortId instance Data.Binary.Class.Binary Control.Distributed.Process.Internal.Types.Identifier instance Data.Binary.Class.Binary Control.Distributed.Process.Internal.Types.WhereIsReply instance Data.Binary.Class.Binary Control.Distributed.Process.Internal.Types.RegisterReply instance Data.Binary.Class.Binary Control.Distributed.Process.Internal.Types.ProcessInfo instance Data.Binary.Class.Binary Control.Distributed.Process.Internal.Types.NodeStats instance Data.Binary.Class.Binary Control.Distributed.Process.Internal.Types.ProcessInfoNone module Control.Distributed.Process.Internal.Messaging sendPayload :: LocalNode -> Identifier -> Identifier -> ImplicitReconnect -> [ByteString] -> IO () sendBinary :: Binary a => LocalNode -> Identifier -> Identifier -> ImplicitReconnect -> a -> IO () sendMessage :: Serializable a => LocalNode -> Identifier -> Identifier -> ImplicitReconnect -> a -> IO () disconnect :: LocalNode -> Identifier -> Identifier -> IO () closeImplicitReconnections :: LocalNode -> Identifier -> IO () -- | a impliesDeathOf b is true if the death of a -- (for instance, a node) implies the death of b (for instance, -- a process on that node) impliesDeathOf :: Identifier -> Identifier -> Bool sendCtrlMsg :: Maybe NodeId -> ProcessSignal -> Process () module Control.Distributed.Process.Management.Internal.Types -- | A newtype wrapper for an agent id (which is a string). newtype MxAgentId MxAgentId :: String -> MxAgentId [agentId] :: MxAgentId -> String data MxAgentState s MxAgentState :: !MxAgentId -> !(TChan Message) -> !s -> MxAgentState s [mxAgentId] :: MxAgentState s -> !MxAgentId [mxBus] :: MxAgentState s -> !(TChan Message) [mxLocalState] :: MxAgentState s -> !s -- | Monad for management agents. newtype MxAgent s a MxAgent :: StateT (MxAgentState s) Process a -> MxAgent s a [unAgent] :: MxAgent s a -> StateT (MxAgentState s) Process a -- | Represents the actions a management agent can take when evaluating an -- event sink. data MxAction MxAgentDeactivate :: !String -> MxAction MxAgentPrioritise :: !ChannelSelector -> MxAction MxAgentReady :: MxAction MxAgentSkip :: MxAction data ChannelSelector InputChan :: ChannelSelector Mailbox :: ChannelSelector -- | Gross though it is, this synonym represents a function used to forking -- new processes, which has to be passed as a HOF when calling -- mxAgentController, since there's no other way to avoid a circular -- dependency with Node.hs type Fork = Process () -> IO ProcessId -- | Type of a management agent's event sink. type MxSink s = Message -> MxAgent s (Maybe MxAction) -- | This is the default management event, fired for various -- internal events around the NT connection and Process lifecycle. All -- published events that conform to this type, are eligible for tracing - -- i.e., they will be delivered to the trace controller. data MxEvent -- | fired whenever a local process is spawned MxSpawned :: ProcessId -> MxEvent -- | fired whenever a process/name is registered (locally) MxRegistered :: ProcessId -> String -> MxEvent -- | fired whenever a process/name is unregistered (locally) MxUnRegistered :: ProcessId -> String -> MxEvent -- | fired whenever a process dies MxProcessDied :: ProcessId -> DiedReason -> MxEvent -- | fired whenever a node dies (i.e., the connection is -- broken/disconnected) MxNodeDied :: NodeId -> DiedReason -> MxEvent -- | fired whenever a message is sent from a local process MxSent :: ProcessId -> ProcessId -> Message -> MxEvent -- | fired whenever a message is received by a local process MxReceived :: ProcessId -> Message -> MxEvent -- | fired when a network-transport connection is first established MxConnected :: ConnectionId -> EndPointAddress -> MxEvent -- | fired when a network-transport connection is broken/disconnected MxDisconnected :: ConnectionId -> EndPointAddress -> MxEvent -- | a user defined trace event MxUser :: Message -> MxEvent -- | a logging event - used for debugging purposes only MxLog :: String -> MxEvent -- | notifies a trace listener that all subsequent traces will be sent to -- pid MxTraceTakeover :: ProcessId -> MxEvent -- | notifies a trace listener that it has been disabled/removed MxTraceDisable :: MxEvent -- | The class of things that we might be able to resolve to a -- ProcessId (or not). class Addressable a resolveToPid :: Addressable a => a -> Maybe ProcessId instance GHC.Base.Applicative (Control.Distributed.Process.Management.Internal.Types.MxAgent s) instance Control.Monad.State.Class.MonadState (Control.Distributed.Process.Management.Internal.Types.MxAgentState s) (Control.Distributed.Process.Management.Internal.Types.MxAgent s) instance Control.Monad.IO.Class.MonadIO (Control.Distributed.Process.Management.Internal.Types.MxAgent s) instance GHC.Base.Monad (Control.Distributed.Process.Management.Internal.Types.MxAgent s) instance GHC.Base.Functor (Control.Distributed.Process.Management.Internal.Types.MxAgent s) instance GHC.Classes.Ord Control.Distributed.Process.Management.Internal.Types.MxAgentId instance GHC.Classes.Eq Control.Distributed.Process.Management.Internal.Types.MxAgentId instance Data.Binary.Class.Binary Control.Distributed.Process.Management.Internal.Types.MxAgentId instance GHC.Show.Show Control.Distributed.Process.Management.Internal.Types.MxEvent instance GHC.Generics.Generic Control.Distributed.Process.Management.Internal.Types.MxEvent instance Data.Binary.Class.Binary Control.Distributed.Process.Management.Internal.Types.MxEvent instance Control.Distributed.Process.Management.Internal.Types.Addressable Control.Distributed.Process.Management.Internal.Types.MxEvent -- | -- -- Cloud Haskell's semantics do not mention evaluation/strictness -- properties at all; Only the promise (or lack) of signal/message -- delivery between processes is considered. For practical purposes, -- Cloud Haskell optimises communication between (intra-node) local -- processes by skipping the network-transport layer. In order to ensure -- the same strictness semantics however, messages still undergo -- binary serialisation before (internal) transmission takes place. Thus -- we ensure that in both (the local and remote) cases, message payloads -- are fully evaluated. Whilst this provides the user with -- unsurprising behaviour, the resulting performance overhead is -- quite severe. Communicating data between Haskell threads -- without forcing binary serialisation reduces (intra-node, -- inter-process) communication overheads by several orders of magnitude. -- -- This module provides variants of Cloud Haskell's messaging primitives -- (send, sendChan, nsend and wrapMessage) -- which do not force binary serialisation in the local case, -- thereby offering superior intra-node messaging performance. The -- catch is that any evaluation problems lurking within the passed -- data structure (e.g., fields set to undefined and so on) will -- show up in the receiver rather than in the caller (as they would with -- the normal strategy). -- -- Use of the functions in this module can potentially change the runtime -- behaviour of your application. You have been warned! -- -- This module is exported so that you can replace the use of Cloud -- Haskell's safe messaging primitives. If you want to use both -- variants, then you can take advantage of qualified imports, however -- Control.Distributed.Process also re-exports these functions -- under different names, using the unsafe prefix. module Control.Distributed.Process.UnsafePrimitives -- | Send a message send :: Serializable a => ProcessId -> a -> Process () -- | Send a message on a typed channel sendChan :: Serializable a => SendPort a -> a -> Process () -- | Named send to a process in the local registry (asynchronous) nsend :: Serializable a => String -> a -> Process () -- | Named send to a process in a remote registry (asynchronous) nsendRemote :: Serializable a => NodeId -> String -> a -> Process () -- | Send a message unreliably. -- -- Unlike send, this function is insensitive to -- reconnect. It will try to send the message regardless of the -- history of connection failures between the nodes. -- -- Message passing with usend is ordered for a given sender and -- receiver if the messages arrive at all. usend :: Serializable a => ProcessId -> a -> Process () -- | Create an unencoded Message for any Serializable -- type. wrapMessage :: Serializable a => a -> Message -- | Interface to the management event bus. module Control.Distributed.Process.Management.Internal.Bus publishEvent :: MxEventBus -> Message -> IO () -- | Tracing/Debugging support - Types module Control.Distributed.Process.Management.Internal.Trace.Types data SetTrace TraceEnable :: !ProcessId -> SetTrace TraceDisable :: SetTrace -- | Defines which processes will be traced by a given TraceFlag, -- either by name, or ProcessId. Choosing TraceAll is -- by far the most efficient approach, as the tracer process -- therefore avoids deciding whether or not a trace event is viable. data TraceSubject TraceAll :: TraceSubject TraceProcs :: !(Set ProcessId) -> TraceSubject TraceNames :: !(Set String) -> TraceSubject -- | Defines what will be traced. Flags that control tracing of -- Process events, take a TraceSubject controlling which -- processes should generate trace events in the target process. data TraceFlags TraceFlags :: !(Maybe TraceSubject) -> !(Maybe TraceSubject) -> !(Maybe TraceSubject) -> !(Maybe TraceSubject) -> !(Maybe TraceSubject) -> !(Maybe TraceSubject) -> !Bool -> !Bool -> TraceFlags [traceSpawned] :: TraceFlags -> !(Maybe TraceSubject) [traceDied] :: TraceFlags -> !(Maybe TraceSubject) [traceRegistered] :: TraceFlags -> !(Maybe TraceSubject) [traceUnregistered] :: TraceFlags -> !(Maybe TraceSubject) [traceSend] :: TraceFlags -> !(Maybe TraceSubject) [traceRecv] :: TraceFlags -> !(Maybe TraceSubject) [traceNodes] :: TraceFlags -> !Bool [traceConnections] :: TraceFlags -> !Bool data TraceArg TraceStr :: String -> TraceArg Trace :: a -> TraceArg -- | A generic ok response from the trace coordinator. data TraceOk TraceOk :: TraceOk traceLog :: MxEventBus -> String -> IO () traceLogFmt :: MxEventBus -> String -> [TraceArg] -> IO () traceEvent :: MxEventBus -> MxEvent -> IO () traceMessage :: Serializable m => MxEventBus -> m -> IO () defaultTraceFlags :: TraceFlags enableTrace :: MxEventBus -> ProcessId -> IO () enableTraceSync :: MxEventBus -> SendPort TraceOk -> ProcessId -> IO () disableTrace :: MxEventBus -> IO () disableTraceSync :: MxEventBus -> SendPort TraceOk -> IO () getTraceFlags :: MxEventBus -> SendPort TraceFlags -> IO () setTraceFlags :: MxEventBus -> TraceFlags -> IO () setTraceFlagsSync :: MxEventBus -> SendPort TraceOk -> TraceFlags -> IO () getCurrentTraceClient :: MxEventBus -> SendPort (Maybe ProcessId) -> IO () instance GHC.Generics.Generic Control.Distributed.Process.Management.Internal.Trace.Types.TraceOk instance GHC.Show.Show Control.Distributed.Process.Management.Internal.Trace.Types.TraceFlags instance GHC.Generics.Generic Control.Distributed.Process.Management.Internal.Trace.Types.TraceFlags instance GHC.Show.Show Control.Distributed.Process.Management.Internal.Trace.Types.TraceSubject instance GHC.Generics.Generic Control.Distributed.Process.Management.Internal.Trace.Types.TraceSubject instance GHC.Show.Show Control.Distributed.Process.Management.Internal.Trace.Types.SetTrace instance GHC.Classes.Eq Control.Distributed.Process.Management.Internal.Trace.Types.SetTrace instance GHC.Generics.Generic Control.Distributed.Process.Management.Internal.Trace.Types.SetTrace instance Data.Binary.Class.Binary Control.Distributed.Process.Management.Internal.Trace.Types.SetTrace instance Data.Binary.Class.Binary Control.Distributed.Process.Management.Internal.Trace.Types.TraceSubject instance Data.Binary.Class.Binary Control.Distributed.Process.Management.Internal.Trace.Types.TraceFlags instance Data.Binary.Class.Binary Control.Distributed.Process.Management.Internal.Trace.Types.TraceOk instance Control.Distributed.Process.Management.Internal.Trace.Types.Traceable Control.Distributed.Process.Internal.Types.ProcessId instance Control.Distributed.Process.Management.Internal.Trace.Types.Traceable GHC.Base.String -- | Cloud Haskell primitives -- -- We define these in a separate module so that we don't have to rely on -- the closure combinators module Control.Distributed.Process.Internal.Primitives -- | Send a message send :: Serializable a => ProcessId -> a -> Process () -- | Send a message unreliably. -- -- Unlike send, this function is insensitive to reconnect. -- It will try to send the message regardless of the history of -- connection failures between the nodes. -- -- Message passing with usend is ordered for a given sender and -- receiver if the messages arrive at all. usend :: Serializable a => ProcessId -> a -> Process () -- | Wait for a message of a specific type expect :: forall a. Serializable a => Process a -- | Create a new typed channel newChan :: Serializable a => Process (SendPort a, ReceivePort a) -- | Send a message on a typed channel sendChan :: Serializable a => SendPort a -> a -> Process () -- | Wait for a message on a typed channel receiveChan :: Serializable a => ReceivePort a -> Process a -- | Merge a list of typed channels. -- -- The result port is left-biased: if there are messages available on -- more than one port, the first available message is returned. mergePortsBiased :: Serializable a => [ReceivePort a] -> Process (ReceivePort a) -- | Like mergePortsBiased, but with a round-robin scheduler (rather -- than left-biased) mergePortsRR :: Serializable a => [ReceivePort a] -> Process (ReceivePort a) -- | Unsafe variant of send. This function makes no -- attempt to serialize and (in the case when the destination process -- resides on the same local node) therefore ensure that the payload is -- fully evaluated before it is delivered. unsafeSend :: Serializable a => ProcessId -> a -> Process () -- | Unsafe variant of usend. This function makes no -- attempt to serialize the message when the destination process resides -- on the same local node. Therefore, a local receiver would need to be -- prepared to cope with any errors resulting from evaluation of the -- message. unsafeUSend :: Serializable a => ProcessId -> a -> Process () -- | Send a message on a typed channel. This function makes no -- attempt to serialize and (in the case when the ReceivePort -- resides on the same local node) therefore ensure that the payload is -- fully evaluated before it is delivered. unsafeSendChan :: Serializable a => SendPort a -> a -> Process () -- | Named send to a process in the local registry (asynchronous). This -- function makes no attempt to serialize and (in the case when -- the destination process resides on the same local node) therefore -- ensure that the payload is fully evaluated before it is delivered. unsafeNSend :: Serializable a => String -> a -> Process () -- | Named send to a process in a remote registry (asynchronous) This -- function makes no attempt to serialize and (in the case when -- the destination process resides on the same local node) therefore -- ensure that the payload is fully evaluated before it is delivered. unsafeNSendRemote :: Serializable a => NodeId -> String -> a -> Process () -- | Opaque type used in receiveWait and receiveTimeout data Match b -- | Test the matches in order against each message in the queue receiveWait :: [Match b] -> Process b -- | Like receiveWait but with a timeout. -- -- If the timeout is zero do a non-blocking check for matching messages. -- A non-zero timeout is applied only when waiting for incoming messages -- (that is, after we have checked the messages that are already -- in the mailbox). receiveTimeout :: Int -> [Match b] -> Process (Maybe b) -- | Match against any message of the right type match :: forall a b. Serializable a => (a -> Process b) -> Match b -- | Match against any message of the right type that satisfies a predicate matchIf :: forall a b. Serializable a => (a -> Bool) -> (a -> Process b) -> Match b -- | Remove any message from the queue matchUnknown :: Process b -> Match b -- | Match against an arbitrary message. matchAny removes the first -- available message from the process mailbox. To handle arbitrary -- raw messages once removed from the mailbox, see -- handleMessage and unwrapMessage. matchAny :: forall b. (Message -> Process b) -> Match b -- | Match against an arbitrary message. Intended for use with -- handleMessage and unwrapMessage, this function -- only removes a message from the process mailbox, if the -- supplied condition matches. The success (or failure) of runtime type -- checks deferred to handleMessage and friends is irrelevant -- here, i.e., if the condition evaluates to True then the -- message will be removed from the process mailbox and decoded, but that -- does not guarantee that an expression passed to -- handleMessage will pass the runtime type checks and therefore -- be evaluated. matchAnyIf :: forall a b. (Serializable a) => (a -> Bool) -> (Message -> Process b) -> Match b -- | Match on a typed channel matchChan :: ReceivePort a -> (a -> Process b) -> Match b -- | Match on an arbitrary STM action. -- -- This rather unusaul match primitive allows us to compose -- arbitrary STM actions with checks against our process' mailbox and/or -- any typed channel ReceivePorts we may hold. -- -- This allows us to process multiple input streams along with our -- mailbox, in just the same way that matchChan supports -- checking both the mailbox and an arbitrary set of typed -- channels in one atomic transaction. -- -- Note there are no ordering guarnatees with respect to these disparate -- input sources. matchSTM :: STM a -> (a -> Process b) -> Match b -- | Match against any message, regardless of the underlying (contained) -- type matchMessage :: (Message -> Process Message) -> Match Message -- | Match against any message (regardless of underlying type) that -- satisfies a predicate matchMessageIf :: (Message -> Bool) -> (Message -> Process Message) -> Match Message -- | internal use only. isEncoded :: Message -> Bool -- | Wrap a Serializable value in a Message. Note that -- Messages are Serializable - like the datum they contain -- - but also note, deserialising such a Message will yield a -- Message, not the type within it! To obtain the wrapped datum, -- use unwrapMessage or handleMessage with a specific type. -- --
--   do
--      self <- getSelfPid
--      send self (wrapMessage "blah")
--      Nothing  <- expectTimeout 1000000 :: Process (Maybe String)
--      (Just m) <- expectTimeout 1000000 :: Process (Maybe Message)
--      (Just "blah") <- unwrapMessage m :: Process (Maybe String)
--   
wrapMessage :: Serializable a => a -> Message -- | This is the unsafe variant of wrapMessage. See -- Control.Distributed.Process.UnsafePrimitives for details. unsafeWrapMessage :: Serializable a => a -> Message -- | Attempt to unwrap a raw Message. If the type of the decoded -- message payload matches the expected type, the value will be returned -- with Just, otherwise Nothing indicates the types do -- not match. -- -- This expression, for example, will evaluate to Nothing > -- unwrapMessage (wrapMessage "foobar") :: Process (Maybe Int) -- -- Whereas this expression, will yield Just "foo" > -- unwrapMessage (wrapMessage "foo") :: Process (Maybe String) unwrapMessage :: forall m a. (Monad m, Serializable a) => Message -> m (Maybe a) -- | Attempt to handle a raw Message. If the type of the message -- matches the type of the first argument to the supplied expression, -- then the message will be decoded and the expression evaluated against -- its value. If this runtime type checking fails however, -- Nothing will be returned to indicate the fact. If the check -- succeeds and evaluation proceeds, the resulting value with be wrapped -- with Just. -- -- Intended for use in catchesExit and matchAny primitives. handleMessage :: forall m a b. (Monad m, Serializable a) => Message -> (a -> m b) -> m (Maybe b) -- | Conditionally handle a raw Message. If the predicate (a -- -> Bool) evaluates to True, invokes the supplied -- handler, other returns Nothing to indicate failure. See -- handleMessage for further information about runtime type -- checking. handleMessageIf :: forall m a b. (Monad m, Serializable a) => Message -> (a -> Bool) -> (a -> m b) -> m (Maybe b) -- | As handleMessage but ignores result, which is useful if you -- don't care whether or not the handler succeeded. handleMessage_ :: forall m a. (Monad m, Serializable a) => Message -> (a -> m ()) -> m () -- | Conditional version of handleMessage_. handleMessageIf_ :: forall m a. (Monad m, Serializable a) => Message -> (a -> Bool) -> (a -> m ()) -> m () -- | Forward a raw Message to the given ProcessId. forward :: Message -> ProcessId -> Process () -- | Forward a raw Message to the given ProcessId. -- -- Unlike forward, this function is insensitive to -- reconnect. It will try to send the message regardless of the -- history of connection failures between the nodes. uforward :: Message -> ProcessId -> Process () -- | Receives messages and forwards them to pid if p msg == -- True. delegate :: ProcessId -> (Message -> Bool) -> Process () -- | A straight relay that forwards all messages to the supplied pid. relay :: ProcessId -> Process () -- | Proxies pid and forwards messages whenever proc -- evaluates to True. Unlike delegate the predicate -- proc runs in the Process monad, allowing for richer -- proxy behaviour. If proc returns False or the -- runtime type check fails, no action is taken and the proxy -- process will continue running. proxy :: Serializable a => ProcessId -> (a -> Process Bool) -> Process () -- | Terminate immediately (throws a ProcessTerminationException) terminate :: Process a -- | Thrown by terminate data ProcessTerminationException ProcessTerminationException :: ProcessTerminationException -- | Die immediately - throws a ProcessExitException with the given -- reason. die :: Serializable a => a -> Process b -- | Forceful request to kill a process. Where exit provides an -- exception that can be caught and handled, kill throws an -- unexposed exception type which cannot be handled explicitly (by type). kill :: ProcessId -> String -> Process () -- | Graceful request to exit a process. Throws ProcessExitException -- with the supplied reason encoded as a message. Any exit -- signal raised in this manner can be handled using the -- catchExit family of functions. exit :: Serializable a => ProcessId -> a -> Process () -- | Catches ProcessExitException. The handler will not be applied -- unless its type matches the encoded data stored in the exception (see -- the reason argument given to the exit primitive). If the -- handler cannot be applied, the exception will be re-thrown. -- -- To handle ProcessExitException without regard for -- reason, see catch. To handle multiple reasons of -- differing types, see catchesExit. catchExit :: forall a b. (Show a, Serializable a) => Process b -> (ProcessId -> a -> Process b) -> Process b -- | Lift catches (almost). -- -- As ProcessExitException stores the exit reason as a -- typed, encoded message, a handler must accept inputs of the expected -- type. In order to handle a list of potentially different handlers (and -- therefore input types), a handler passed to catchesExit must -- accept Message and return Maybe (i.e., Just p -- if it handled the exit reason, otherwise Nothing). -- -- See maybeHandleMessage and Message for more details. catchesExit :: Process b -> [(ProcessId -> Message -> (Process (Maybe b)))] -> Process b -- | Internal exception thrown indirectly by exit data ProcessExitException -- | Our own process ID getSelfPid :: Process ProcessId -- | Get the node ID of our local node getSelfNode :: Process NodeId -- | Provide information about a running process data ProcessInfo ProcessInfo :: NodeId -> [String] -> Int -> [(ProcessId, MonitorRef)] -> [ProcessId] -> ProcessInfo [infoNode] :: ProcessInfo -> NodeId [infoRegisteredNames] :: ProcessInfo -> [String] [infoMessageQueueLength] :: ProcessInfo -> Int [infoMonitors] :: ProcessInfo -> [(ProcessId, MonitorRef)] [infoLinks] :: ProcessInfo -> [ProcessId] -- | Get information about the specified process getProcessInfo :: ProcessId -> Process (Maybe ProcessInfo) data NodeStats NodeStats :: NodeId -> Int -> Int -> Int -> Int -> NodeStats [nodeStatsNode] :: NodeStats -> NodeId [nodeStatsRegisteredNames] :: NodeStats -> Int [nodeStatsMonitors] :: NodeStats -> Int [nodeStatsLinks] :: NodeStats -> Int [nodeStatsProcesses] :: NodeStats -> Int -- | Get statistics about the specified node getNodeStats :: NodeId -> Process (Either DiedReason NodeStats) -- | Get statistics about our local node getLocalNodeStats :: Process NodeStats -- | Link to a remote process (asynchronous) -- -- When process A links to process B (that is, process A calls link -- pidB) then an asynchronous exception will be thrown to process A -- when process B terminates (normally or abnormally), or when process A -- gets disconnected from process B. Although it is technically -- possible to catch these exceptions, chances are if you find yourself -- trying to do so you should probably be using monitor rather -- than link. In particular, code such as -- --
--   link pidB   -- Link to process B
--   expect      -- Wait for a message from process B
--   unlink pidB -- Unlink again
--   
-- -- doesn't quite do what one might expect: if process B sends a message -- to process A, and subsequently terminates, then process A might -- or might not be terminated too, depending on whether the exception is -- thrown before or after the unlink (i.e., this code has a race -- condition). -- -- Linking is all-or-nothing: A is either linked to B, or it's not. A -- second call to link has no effect. -- -- Note that link provides unidirectional linking (see -- spawnSupervised). Linking makes no distinction between normal -- and abnormal termination of the remote process. link :: ProcessId -> Process () -- | Remove a link -- -- This is synchronous in the sense that once it returns you are -- guaranteed that no exception will be raised if the remote process -- dies. However, it is asynchronous in the sense that we do not wait for -- a response from the remote node. unlink :: ProcessId -> Process () -- | Monitor another process (asynchronous) -- -- When process A monitors process B (that is, process A calls -- monitor pidB) then process A will receive a -- ProcessMonitorNotification when process B terminates (normally -- or abnormally), or when process A gets disconnected from process B. -- You receive this message like any other (using expect); the -- notification includes a reason (DiedNormal, -- DiedException, DiedDisconnect, etc.). -- -- Every call to monitor returns a new monitor reference -- MonitorRef; if multiple monitors are set up, multiple -- notifications will be delivered and monitors can be disabled -- individually using unmonitor. monitor :: ProcessId -> Process MonitorRef -- | Remove a monitor -- -- This has the same synchronous/asynchronous nature as unlink. -- -- ProcessMonitorNotification messages for the given MonitorRef are -- removed from the mailbox. unmonitor :: MonitorRef -> Process () -- | Establishes temporary monitoring of another process. -- -- withMonitor pid code sets up monitoring of pid for -- the duration of code. Note: although monitoring is no longer -- active when withMonitor returns, there might still be -- unreceived monitor messages in the queue. withMonitor :: ProcessId -> (MonitorRef -> Process a) -> Process a -- | Establishes temporary monitoring of another process. -- -- withMonitor_ pid code sets up monitoring of pid for -- the duration of code. Note: although monitoring is no longer -- active when withMonitor_ returns, there might still be -- unreceived monitor messages in the queue. -- -- Since 0.6.1 withMonitor_ :: ProcessId -> Process a -> Process a data SayMessage SayMessage :: UTCTime -> ProcessId -> String -> SayMessage [sayTime] :: SayMessage -> UTCTime [sayProcess] :: SayMessage -> ProcessId [sayMessage] :: SayMessage -> String -- | Log a string -- -- say message sends a message of type SayMessage with -- the current time and ProcessId of the current process to the -- process registered as logger. By default, this process simply -- sends the string to stderr. Individual Cloud Haskell backends -- might replace this with a different logger process, however. say :: String -> Process () -- | Register a process with the local registry (synchronous). The name -- must not already be registered. The process need not be on this node. -- A bad registration will result in a -- ProcessRegistrationException -- -- The process to be registered does not have to be local itself. register :: String -> ProcessId -> Process () -- | Like register, but will replace an existing registration. The -- name must already be registered. reregister :: String -> ProcessId -> Process () -- | Remove a process from the local registry (asynchronous). This version -- will wait until a response is gotten from the management process. The -- name must already be registered. unregister :: String -> Process () -- | Query the local process registry whereis :: String -> Process (Maybe ProcessId) -- | Named send to a process in the local registry (asynchronous) nsend :: Serializable a => String -> a -> Process () -- | Register a process with a remote registry (asynchronous). -- -- The process to be registered does not have to live on the same remote -- node. Reply wil come in the form of a RegisterReply message -- -- See comments in whereisRemoteAsync registerRemoteAsync :: NodeId -> String -> ProcessId -> Process () reregisterRemoteAsync :: NodeId -> String -> ProcessId -> Process () -- | Remove a process from a remote registry (asynchronous). -- -- Reply wil come in the form of a RegisterReply message -- -- See comments in whereisRemoteAsync unregisterRemoteAsync :: NodeId -> String -> Process () -- | Query a remote process registry (asynchronous) -- -- Reply will come in the form of a WhereIsReply message. -- -- There is currently no synchronous version of -- whereisRemoteAsync: if you implement one yourself, be sure to -- take into account that the remote node might die or get disconnect -- before it can respond (i.e. you should use monitorNode and take -- appropriate action when you receive a NodeMonitorNotification). whereisRemoteAsync :: NodeId -> String -> Process () -- | Named send to a process in a remote registry (asynchronous) nsendRemote :: Serializable a => NodeId -> String -> a -> Process () -- | Resolve a closure unClosure :: Typeable a => Closure a -> Process a -- | Resolve a static value unStatic :: Typeable a => Static a -> Process a -- | Lift catch -- | Deprecated: Use Control.Monad.Catch.catch instead catch :: Exception e => Process a -> (e -> Process a) -> Process a -- | You need this when using catches data Handler a Handler :: (e -> Process a) -> Handler a -- | Lift catches catches :: Process a -> [Handler a] -> Process a -- | Lift try -- | Deprecated: Use Control.Monad.Catch.try instead try :: Exception e => Process a -> Process (Either e a) -- | Lift mask -- | Deprecated: Use Control.Monad.Catch.mask_ instead mask :: ((forall a. Process a -> Process a) -> Process b) -> Process b -- | Lift mask_ -- | Deprecated: Use Control.Monad.Catch.mask_ instead mask_ :: Process a -> Process a -- | Lift onException -- | Deprecated: Use Control.Monad.Catch.onException instead onException :: Process a -> Process b -> Process a -- | Lift bracket -- | Deprecated: Use Control.Monad.Catch.bracket instead bracket :: Process a -> (a -> Process b) -> (a -> Process c) -> Process c -- | Lift bracket_ -- | Deprecated: Use Control.Monad.Catch.bracket_ instead bracket_ :: Process a -> Process b -> Process c -> Process c -- | Lift finally -- | Deprecated: Use Control.Monad.Catch.finally instead finally :: Process a -> Process b -> Process a -- | Like expect but with a timeout expectTimeout :: forall a. Serializable a => Int -> Process (Maybe a) -- | Like receiveChan but with a timeout. If the timeout is 0, do a -- non-blocking check for a message. receiveChanTimeout :: Serializable a => Int -> ReceivePort a -> Process (Maybe a) -- | Asynchronous version of spawn -- -- (spawn is defined in terms of spawnAsync and -- expect) spawnAsync :: NodeId -> Closure (Process ()) -> Process SpawnRef -- | Link to a node (asynchronous) linkNode :: NodeId -> Process () -- | Link to a channel (asynchronous) linkPort :: SendPort a -> Process () -- | Remove a node link -- -- This has the same synchronous/asynchronous nature as unlink. unlinkNode :: NodeId -> Process () -- | Remove a channel (send port) link -- -- This has the same synchronous/asynchronous nature as unlink. unlinkPort :: SendPort a -> Process () -- | Monitor a node (asynchronous) monitorNode :: NodeId -> Process MonitorRef -- | Monitor a typed channel (asynchronous) monitorPort :: forall a. Serializable a => SendPort a -> Process MonitorRef -- | Cloud Haskell provides the illusion of connection-less, reliable, -- ordered message passing. However, when network connections get -- disrupted this illusion cannot always be maintained. Once a network -- connection breaks (even temporarily) no further communication on that -- connection will be possible. For example, if process A sends a message -- to process B, and A is then notified (by monitor notification) that it -- got disconnected from B, A will not be able to send any further -- messages to B, unless A explicitly indicates that it is -- acceptable to attempt to reconnect to B using the Cloud Haskell -- reconnect primitive. -- -- Importantly, when A calls reconnect it acknowledges that some -- messages to B might have been lost. For instance, if A sends messages -- m1 and m2 to B, then receives a monitor notification that its -- connection to B has been lost, calls reconnect and then sends -- m3, it is possible that B will receive m1 and m3 but not m2. -- -- Note that reconnect does not mean reconnect now but -- rather /it is okay to attempt to reconnect on the next send/. In -- particular, if no further communication attempts are made to B then A -- can use reconnect to clean up its connection to B. reconnect :: ProcessId -> Process () -- | Reconnect to a sendport. See reconnect for more information. reconnectPort :: SendPort a -> Process () sendCtrlMsg :: Maybe NodeId -> ProcessSignal -> Process () instance GHC.Show.Show Control.Distributed.Process.Internal.Primitives.ProcessTerminationException instance GHC.Base.Functor Control.Distributed.Process.Internal.Primitives.Match instance GHC.Exception.Exception Control.Distributed.Process.Internal.Primitives.ProcessTerminationException instance GHC.Base.Functor Control.Distributed.Process.Internal.Primitives.Handler instance GHC.Show.Show Control.Distributed.Process.Internal.Primitives.SayMessage instance Data.Binary.Class.Binary Control.Distributed.Process.Internal.Primitives.SayMessage module Control.Distributed.Process.Internal.Closure.BuiltIn remoteTable :: RemoteTable -> RemoteTable -- | Static decoder, given a static serialization dictionary. -- -- See module documentation of Control.Distributed.Process.Closure -- for an example. staticDecode :: Typeable a => Static (SerializableDict a) -> Static (ByteString -> a) -- | Serialization dictionary for '()' sdictUnit :: Static (SerializableDict ()) -- | Serialization dictionary for ProcessId sdictProcessId :: Static (SerializableDict ProcessId) -- | Serialization dictionary for SendPort sdictSendPort :: Typeable a => Static (SerializableDict a) -> Static (SerializableDict (SendPort a)) -- | Serialization dictionary for Static. sdictStatic :: Typeable a => Static (TypeableDict a) -> Static (SerializableDict (Static a)) -- | Serialization dictionary for Closure. sdictClosure :: Typeable a => Static (TypeableDict a) -> Static (SerializableDict (Closure a)) sndStatic :: Static ((a, b) -> b) -- | CP a b is a process with input of type a and output -- of type b type CP a b = Closure (a -> Process b) -- | CP version of id idCP :: Typeable a => CP a a -- | CP version of (***) splitCP :: (Typeable a, Typeable b, Typeable c, Typeable d) => CP a c -> CP b d -> CP (a, b) (c, d) -- | CP version of return returnCP :: forall a. Serializable a => Static (SerializableDict a) -> a -> Closure (Process a) -- | (Not quite the) CP version of (>>=) bindCP :: forall a b. (Typeable a, Typeable b) => Closure (Process a) -> CP a b -> Closure (Process b) -- | CP version of (>>) seqCP :: (Typeable a, Typeable b) => Closure (Process a) -> Closure (Process b) -> Closure (Process b) decodeProcessIdStatic :: Static (ByteString -> ProcessId) -- | CP version of link cpLink :: ProcessId -> Closure (Process ()) -- | CP version of unlink cpUnlink :: ProcessId -> Closure (Process ()) -- | CP version of relay cpRelay :: ProcessId -> Closure (Process ()) -- | CP version of send cpSend :: forall a. Typeable a => Static (SerializableDict a) -> ProcessId -> CP a () -- | CP version of expect cpExpect :: Typeable a => Static (SerializableDict a) -> Closure (Process a) -- | CP version of newChan cpNewChan :: Typeable a => Static (SerializableDict a) -> Closure (Process (SendPort a, ReceivePort a)) -- | CP version of delay cpDelayed :: ProcessId -> Closure (Process ()) -> Closure (Process ()) cpEnableTraceRemote :: ProcessId -> Closure (Process ()) -- | Template Haskell support module Control.Distributed.Process.Internal.Closure.TH -- | Create the closure, decoder, and metadata definitions for the given -- list of functions remotable :: [Name] -> Q [Dec] -- | Like remotable, but parameterized by the declaration of a -- function instead of the function name. So where for remotable -- you'd do -- --
--   f :: T1 -> T2
--   f = ...
--   
--   remotable ['f]
--   
-- -- with remotableDecl you would instead do -- --
--   remotableDecl [
--      [d| f :: T1 -> T2 ;
--          f = ...
--        |]
--    ]
--   
-- -- remotableDecl creates the function specified as well as the -- various dictionaries and static versions that remotable also -- creates. remotableDecl is sometimes necessary when you want to -- refer to, say, $(mkClosure 'f) within the definition of -- f itself. -- -- NOTE: remotableDecl creates __remoteTableDecl instead -- of __remoteTable so that you can use both remotable -- and remotableDecl within the same module. remotableDecl :: [Q [Dec]] -> Q [Dec] -- | Construct a static value. -- -- If f : forall a1 .. an. T then $(mkStatic 'f) :: forall -- a1 .. an. Static T. Be sure to pass f to -- remotable. mkStatic :: Name -> Q Exp -- | If f : T1 -> T2 is a monomorphic function then -- $(functionSDict 'f) :: Static (SerializableDict T1). -- -- Be sure to pass f to remotable. functionSDict :: Name -> Q Exp -- | If f : T1 -> Process T2 is a monomorphic function then -- $(functionTDict 'f) :: Static (SerializableDict T2). -- -- Be sure to pass f to remotable. functionTDict :: Name -> Q Exp -- | If f : T1 -> T2 then $(mkClosure 'f) :: T1 -> -- Closure T2. -- -- TODO: The current version of mkClosure is too polymorphic (@forall a. -- Binary a => a -> Closure T2). mkClosure :: Name -> Q Exp -- | Make a Closure from a static function. This is useful for -- making a closure for a top-level Process () function, because -- using mkClosure would require adding a dummy () -- argument. mkStaticClosure :: Name -> Q Exp module Control.Distributed.Process.Internal.Spawn -- | Spawn a process -- -- For more information about Closure, see -- Control.Distributed.Process.Closure. -- -- See also call. spawn :: NodeId -> Closure (Process ()) -> Process ProcessId -- | Spawn a process and link to it -- -- Note that this is just the sequential composition of spawn and -- link. (The Unified semantics that underlies Cloud -- Haskell does not even support a synchronous link operation) spawnLink :: NodeId -> Closure (Process ()) -> Process ProcessId -- | Like spawnLink, but monitor the spawned process spawnMonitor :: NodeId -> Closure (Process ()) -> Process (ProcessId, MonitorRef) -- | Run a process remotely and wait for it to reply -- -- We monitor the remote process: if it dies before it can send a reply, -- we die too. -- -- For more information about Static, SerializableDict, and -- Closure, see Control.Distributed.Process.Closure. -- -- See also spawn. call :: Serializable a => Static (SerializableDict a) -> NodeId -> Closure (Process a) -> Process a -- | Spawn a child process, have the child link to the parent and the -- parent monitor the child spawnSupervised :: NodeId -> Closure (Process ()) -> Process (ProcessId, MonitorRef) -- | Spawn a new process, supplying it with a new ReceivePort and -- return the corresponding SendPort. spawnChannel :: forall a. Serializable a => Static (SerializableDict a) -> NodeId -> Closure (ReceivePort a -> Process ()) -> Process (SendPort a) -- | -- -- This module presents an API for creating Management Agents: -- special processes that are capable of receiving and responding to a -- node's internal system events. These system events are -- delivered by the management event bus: An internal subsystem -- maintained for each running node, to which all agents are -- automatically subscribed. -- -- Agents are defined in terms of event sinks, taking a -- particular Serializable type and evaluating to an action in -- the MxAgent monad in response. Each MxSink evaluates to -- an MxAction that specifies whether the agent should continue -- processing it's inputs or stop. If the type of a message cannot be -- matched to any of the agent's sinks, it will be discarded. A sink can -- also deliberately skip processing a message, deferring to the -- remaining handlers. This is the only way that more than one -- event sink can handle the same data type, since otherwise the first -- type match will win every time a message arrives. See -- mxSkip for details. -- -- Various events are published to the management event bus -- automatically, the full list of which can be found in the definition -- of the MxEvent data type. Additionally, clients of the -- Management API can publish arbitrary Serializable data -- to the event bus using mxNotify. All running agents receive all -- events (from the primary event bus to which they're subscribed). -- -- Agent processes are automatically registered on the local node, and -- can receive messages via their mailbox just like ordinary processes. -- Unlike ordinary Process code however, it is unnecessary -- (though possible) for agents to use the base expect and -- receiveX primitives to do this, since the management -- infrastructure will continuously read from both the primary event bus -- and the process' own mailbox. Messages are transparently passed -- to the agent's event sinks from both sources, so an agent need only -- concern itself with how to respond to its inputs. -- -- Some agents may wish to prioritise messages from their mailbox over -- traffic on the management event bus, or vice versa. The -- mxReceive and mxReceiveChan API calls do this for the -- mailbox and event bus, respectively. The prioritisation these -- APIs offer is simply that the chosen data stream will be checked -- first. No blocking will occur if the chosen (prioritised) source is -- devoid of input messages, instead the agent handling code will revert -- to switching between the alternatives in round-robin as usual. -- If messages exist in one or more channels, they will be consumed as -- soon as they're available, priority is effectively a hint about which -- channel to consume from, should messages be available in both. -- -- Prioritisation then, is a hint about the preference of data -- source from which the next input should be chosen. No guarantee can be -- made that the chosen source will in fact be selected at runtime. -- -- -- -- The management API provides no guarantees whatsoever, viz: -- -- -- -- -- -- Both management agents and clients of the API have access to a variety -- of data storage capabilities, to facilitate publishing and consuming -- useful system information. Agents maintain their own internal state -- privately (via a state transformer - see mxGetLocal et al), -- however it is possible for agents to share additional data with each -- other (and the outside world) using whatever mechanism the user -- wishes, e.g., acidstate, or shared memory primitives. -- -- -- -- New agents are defined with mxAgent and require a unique -- MxAgentId, an initial state - MxAgent runs in a state -- transformer - and a list of the agent's event sinks. Each -- MxSink is defined in terms of a specific Serializable -- type, via the mxSink function, binding the event handler -- expression to inputs of only that type. -- -- Apart from modifying its own local state, an agent can execute -- arbitrary Process a code via lifting (see liftMX) and -- even publish its own messages back to the primary event bus (see -- mxBroadcast). -- -- Since messages are delivered to agents from both the management event -- bus and the agent processes mailbox, agents (i.e., event sinks) will -- generally have no idea as to their origin. An agent can, however, -- choose to prioritise the choice of input (source) each time one of its -- event sinks runs. The standard way for an event sink to -- indicate that the agent is ready for its next input is to evaluate -- mxReady. When this happens, the management infrastructure will -- obtain data from the event bus and process' mailbox in a round robbin -- fashion, i.e., one after the other, changing each time. -- -- -- -- What follows is a grossly over-simplified example of a management -- agent that provides a basic name monitoring facility. Whenever a -- process name is registered or unregistered, clients are informed of -- the fact. -- --
--   -- simple notification data type
--   
--   data Registration = Reg { added  :: Bool
--                           , procId :: ProcessId
--                           , name   :: String
--                           }
--   
--   -- start a /name monitoring agent/
--   nameMonitorAgent = do
--     mxAgent (MxAgentId "name-monitor") Set.empty [
--           (mxSink $ \(pid :: ProcessId) -> do
--              mxUpdateState $ Set.insert pid
--              mxReady)
--         , (mxSink $
--               let act =
--                     case ev of
--                       (MxRegistered   p n) -> notify True  n p
--                       (MxUnRegistered p n) -> notify False n p
--                       _                    -> return ()
--               act >> mxReady)
--       ]
--     where
--       notify a n p = do
--         Foldable.mapM_ (liftMX . deliver (Reg a n p)) =<< mxGetLocal
--   
-- -- The client interface (for sending their pid) can take one of two -- forms: -- --
--   monitorNames = getSelfPid >>= nsend "name-monitor"
--   monitorNames2 = getSelfPid >>= mxNotify
--   
-- -- For some real-world examples, see the distributed-process-platform -- package. -- -- -- -- Management Agents offer numerous advantages over regular -- processes: broadcast communication with them can have a lower latency, -- they offer simplified messgage (i.e., input type) handling and they -- have access to internal system information that would be otherwise -- unobtainable. -- -- Do not be tempted to implement everything (e.g., the kitchen sink) -- using the management API though. There are overheads associated with -- management agents which is why they're presented as tools for -- consuming low level system information, instead of as application -- level development tools. -- -- Agents that rely heavily on a busy mailbox can cause the management -- event bus to backlog un-GC'ed data, leading to increased heap space. -- Producers that do not take care to avoid passing unevaluated thunks to -- the API can crash all the agents in the system. Agents are not -- monitored or managed in any way, and those that crash will not be -- restarted. -- -- The management event bus can receive a great deal of traffic. Every -- time a message is sent and/or received, an event is passed to the -- agent controller and broadcast to all agents (plus the trace -- controller, if tracing is enabled for the node). This is already a -- significant overhead - though profiling and benchmarks have -- demonstrated that it does not adversely affect performance if few -- agents are installed. Agents will typically use more cycles than plain -- processes, since they perform additional work: selecting input data -- from both the event bus and their own mailboxes, plus searching -- through the set of event sinks (for each agent) to determine the right -- handler for the event. -- -- -- -- The architecture of the management event bus is internal and subject -- to change without prior notice. The description that follows is -- provided for informational purposes only. -- -- When a node initially starts, two special, internal system processes -- are started to support the management infrastructure. The first, known -- as the trace controller, is responsible for consuming -- MxEvents and forwarding them to the configured tracer - see -- Control.Distributed.Process.Debug for further details. The -- second is the management agent controller, and is the primary -- worker process underpinning the management infrastructure. All -- published management events are routed to this process, which places -- them onto a system wide event bus and additionally passes them -- directly to the trace controller. -- -- There are several reasons for segregating the tracing and management -- control planes in this fashion. Tracing can be enabled or disabled by -- clients, whilst the management event bus cannot, since in addition to -- providing runtime instrumentation, its intended use-cases include node -- monitoring, peer discovery (via topology providing backends) and other -- essential system services that require knowledge of otherwise hidden -- system internals. Tracing is also subject to trace flags that -- limit the specific MxEvents delivered to trace clients - an -- overhead/complexity not shared by management agents. Finally, tracing -- and management agents are implemented using completely different -- signalling techniques - more on this later - which would introduce -- considerable complexity if the shared the same event loop. -- -- The management control plane is driven by a shared broadcast channel, -- which is written to by the agent controller and subscribed to by all -- agent processes. Agents are spawned as regular processes, whose -- primary implementation (i.e., server loop) is responsible for -- consuming messages from both the broadcast channel and their own -- mailbox. Once consumed, messages are applied to the agent's event -- sinks until one matches the input, at which point it is applied -- and the loop continues. The implementation chooses from the event bus -- and the mailbox in a round-robin fashion, until a message is received. -- This polling activity would lead to management agents consuming -- considerable system resources if left unchecked, therefore the -- implementation will poll for a limitted number of retries, after which -- it will perform a blocking read on the event bus. module Control.Distributed.Process.Management -- | This is the default management event, fired for various -- internal events around the NT connection and Process lifecycle. All -- published events that conform to this type, are eligible for tracing - -- i.e., they will be delivered to the trace controller. data MxEvent -- | fired whenever a local process is spawned MxSpawned :: ProcessId -> MxEvent -- | fired whenever a process/name is registered (locally) MxRegistered :: ProcessId -> String -> MxEvent -- | fired whenever a process/name is unregistered (locally) MxUnRegistered :: ProcessId -> String -> MxEvent -- | fired whenever a process dies MxProcessDied :: ProcessId -> DiedReason -> MxEvent -- | fired whenever a node dies (i.e., the connection is -- broken/disconnected) MxNodeDied :: NodeId -> DiedReason -> MxEvent -- | fired whenever a message is sent from a local process MxSent :: ProcessId -> ProcessId -> Message -> MxEvent -- | fired whenever a message is received by a local process MxReceived :: ProcessId -> Message -> MxEvent -- | fired when a network-transport connection is first established MxConnected :: ConnectionId -> EndPointAddress -> MxEvent -- | fired when a network-transport connection is broken/disconnected MxDisconnected :: ConnectionId -> EndPointAddress -> MxEvent -- | a user defined trace event MxUser :: Message -> MxEvent -- | a logging event - used for debugging purposes only MxLog :: String -> MxEvent -- | notifies a trace listener that all subsequent traces will be sent to -- pid MxTraceTakeover :: ProcessId -> MxEvent -- | notifies a trace listener that it has been disabled/removed MxTraceDisable :: MxEvent -- | Publishes an arbitrary Serializable message to the management -- event bus. Note that no attempt is made to force the argument, -- therefore it is very important that you do not pass unevaluated thunks -- that might crash the receiving process via this API, since all -- registered agents will gain access to the data structure once it is -- broadcast by the agent controller. mxNotify :: (Serializable a) => a -> Process () -- | Represents the actions a management agent can take when evaluating an -- event sink. data MxAction -- | A newtype wrapper for an agent id (which is a string). newtype MxAgentId MxAgentId :: String -> MxAgentId [agentId] :: MxAgentId -> String -- | Monad for management agents. data MxAgent s a -- | Activates a new agent. mxAgent :: MxAgentId -> s -> [MxSink s] -> Process ProcessId -- | Activates a new agent. This variant takes a finalizer -- expression, that is run once the agent shuts down (even in case of -- failure/exceptions). The finalizer expression runs in the mx -- monad - MxAgent s () - such that the agent's internal state -- remains accessible to the shutdown/cleanup code. mxAgentWithFinalize :: MxAgentId -> s -> [MxSink s] -> MxAgent s () -> Process ProcessId -- | Type of a management agent's event sink. type MxSink s = Message -> MxAgent s (Maybe MxAction) -- | Create an MxSink from an expression taking a -- Serializable type m, that yields an MxAction -- in the MxAgent monad. mxSink :: forall s m. (Serializable m) => (m -> MxAgent s MxAction) -> MxSink s -- | Return the MxAgentId for the currently executing agent. mxGetId :: MxAgent s MxAgentId -- | Gracefully terminate an agent. mxDeactivate :: forall s. String -> MxAgent s MxAction -- | Continue executing (i.e., receiving and processing messages). mxReady :: forall s. MxAgent s MxAction -- | Causes the currently executing event sink to be skipped. The -- remaining declared event sinks will be evaluated to find a matching -- handler. Can be used to allow multiple event sinks to process data of -- the same type. mxSkip :: forall s. MxAgent s MxAction -- | Continue exeucting, prioritising inputs from the process' own -- mailbox ahead of data from the management event bus. mxReceive :: forall s. MxAgent s MxAction -- | Continue exeucting, prioritising inputs from the management event bus -- over the process' own mailbox. mxReceiveChan :: forall s. MxAgent s MxAction -- | The MxAgent version of mxNotify. mxBroadcast :: (Serializable m) => m -> MxAgent s () -- | Set the agent's local state. mxSetLocal :: s -> MxAgent s () -- | Fetch the agent's local state. mxGetLocal :: MxAgent s s -- | Update the agent's local state. mxUpdateLocal :: (s -> s) -> MxAgent s () -- | Lift a Process action. liftMX :: Process a -> MxAgent s a -- | Keeps the tracing API calls separate from the Tracer implementation, -- which allows us to avoid a nasty import cycle between tracing and the -- messaging primitives that rely on it, and also between the node -- controller (which requires access to the tracing related elements of -- our RemoteTable) and the Debug module, which requires -- forkProcess. This module is also used by the management -- agent, which relies on the tracing infrastructure's messaging fabric. module Control.Distributed.Process.Management.Internal.Trace.Primitives -- | Send a log message to the internal tracing facility. If tracing is -- enabled, this will create a custom trace log event. traceLog :: String -> Process () -- | Send a log message to the internal tracing facility, using the given -- list of printable TraceArgs interspersed with the preceding -- delimiter. traceLogFmt :: String -> [TraceArg] -> Process () -- | Send an arbitrary Message to the tracer process. traceMessage :: Serializable m => m -> Process () defaultTraceFlags :: TraceFlags -- | Enable tracing to the supplied process and wait for a TraceOk -- response from the trace coordinator process. enableTrace :: ProcessId -> Process () -- | Enable tracing to the supplied process. enableTraceAsync :: ProcessId -> Process () -- | Disable the currently configured trace and wait for a TraceOk -- response from the trace coordinator process. disableTrace :: Process () -- | Disable the currently configured trace. disableTraceAsync :: Process () getTraceFlags :: Process TraceFlags -- | Set the given flags for the current tracer and wait for a -- TraceOk response from the trace coordinator process. setTraceFlags :: TraceFlags -> Process () -- | Set the given flags for the current tracer. setTraceFlagsAsync :: TraceFlags -> Process () -- | Turn tracing for for a subset of trace targets. traceOnly :: Traceable a => [a] -> Maybe TraceSubject -- | Trace all targets. traceOn :: Maybe TraceSubject -- | Trace no targets. traceOff :: Maybe TraceSubject withLocalTracer :: (MxEventBus -> Process ()) -> Process () withRegisteredTracer :: (ProcessId -> Process a) -> Process a instance Control.Distributed.Process.Management.Internal.Trace.Primitives.Traceable Control.Distributed.Process.Internal.Types.ProcessId instance Control.Distributed.Process.Management.Internal.Trace.Primitives.Traceable GHC.Base.String module Control.Distributed.Process.Management.Internal.Trace.Remote -- | Set the given flags for a remote node (asynchronous). setTraceFlagsRemote :: TraceFlags -> NodeId -> Process () -- | Starts a trace relay process on the remote node, which forwards -- all trace events to the registered tracer on this (the calling -- process') node. startTraceRelay :: NodeId -> Process ProcessId -- | Remote Table. remoteTable :: RemoteTable -> RemoteTable -- | Tracing/Debugging support - Trace Implementation module Control.Distributed.Process.Management.Internal.Trace.Tracer traceController :: MVar ((Weak (CQueue Message))) -> Process () defaultTracer :: Process () systemLoggerTracer :: Process () logfileTracer :: FilePath -> Process () eventLogTracer :: Process () module Control.Distributed.Process.Management.Internal.Agent -- | A triple containing a configured tracer, weak pointer to the agent -- controller's mailbox (CQueue) and an expression used to instantiate -- new agents on the current node. type AgentConfig = (Tracer, Weak (CQueue Message), ((TChan Message, TChan Message) -> Process ()) -> IO ProcessId) -- | Starts a management agent for the current node. The agent process must -- not crash or be killed, so we generally avoid publishing its -- ProcessId where possible. -- -- Our process is also responsible for forwarding messages to the trace -- controller, since having two special processes handled via the -- LocalNode would be inelegant. We forward messages directly to -- the trace controller's message queue, just as the MxEventBus -- that's set up on the LocalNode forwards messages directly to -- us. This optimises the code path for tracing and avoids overloading -- the node node controller's internal control plane with additional -- routing, at the cost of a little more complexity and two cases where -- we break encapsulation. mxAgentController :: Fork -> MVar AgentConfig -> Process () -- | Forks a new process in which an mxAgent is run. mxStartAgent :: Fork -> TChan Message -> ((TChan Message, TChan Message) -> Process ()) -> IO ProcessId -- | Start the tracer controller. startTracing :: Fork -> IO Tracer -- | Start a dead letter (agent) queue. -- -- If no agents are registered on the system, the management event bus -- will fill up and its data won't be GC'ed until someone comes along and -- reads from the broadcast channel (via dupTChan of course). This is -- effectively a leak, so to mitigate it, we start a dead letter -- queue that drains the event bus continuously, thus ensuring if -- there are no other consumers that we won't use up heap space -- unnecessarily. startDeadLetterQueue :: TChan Message -> IO () -- | This is an implementation of bidirectional multimaps. module Control.Distributed.Process.Internal.BiMultiMap -- | A bidirectional multimaps BiMultiMap a b v is a set of -- triplets of type (a, b, v). -- -- It is possible to lookup values by using either a or -- b as keys. data BiMultiMap a b v -- | The empty bidirectional multimap. empty :: BiMultiMap a b v -- | A bidirectional multimap containing a single triplet. singleton :: (Ord a, Ord b, Ord v) => a -> b -> v -> BiMultiMap a b v -- | Yields the amount of triplets in the multimap. size :: BiMultiMap a b v -> Int -- | Inserts a triplet in the multimap. insert :: (Ord a, Ord b, Ord v) => a -> b -> v -> BiMultiMap a b v -> BiMultiMap a b v -- | Looks up all the triplets whose first component is the given value. lookupBy1st :: Ord a => a -> BiMultiMap a b v -> Set (b, v) -- | Looks up all the triplets whose second component is the given value. lookupBy2nd :: Ord b => b -> BiMultiMap a b v -> Set (a, v) -- | Deletes a triplet. It yields the original multimap if the triplet is -- not present. delete :: (Ord a, Ord b, Ord v) => a -> b -> v -> BiMultiMap a b v -> BiMultiMap a b v -- | Deletes all triplets whose first component is the given value. deleteAllBy1st :: (Ord a, Ord b, Ord v) => a -> BiMultiMap a b v -> BiMultiMap a b v -- | Like deleteAllBy1st but deletes by the second component of the -- triplets. deleteAllBy2nd :: (Ord a, Ord b, Ord v) => b -> BiMultiMap a b v -> BiMultiMap a b v -- | Yields the triplets satisfying the given predicate, and a multimap -- with all this triplets removed. partitionWithKeyBy1st :: (Ord a, Ord b, Ord v) => (a -> Set (b, v) -> Bool) -> BiMultiMap a b v -> (Map a (Set (b, v)), BiMultiMap a b v) -- | Like partitionWithKeyBy1st but the predicates takes the second -- component of the triplets as first argument. partitionWithKeyBy2nd :: (Ord a, Ord b, Ord v) => (b -> Set (a, v) -> Bool) -> BiMultiMap a b v -> (Map b (Set (a, v)), BiMultiMap a b v) -- | Exchange the first and the second components of all triplets. flip :: BiMultiMap a b v -> BiMultiMap b a v -- | Local nodes module Control.Distributed.Process.Node -- | Local nodes data LocalNode -- | Initialize a new local node. newLocalNode :: Transport -> RemoteTable -> IO LocalNode -- | Force-close a local node, killing all processes on that node. closeLocalNode :: LocalNode -> IO () -- | Spawn a new process on a local node forkProcess :: LocalNode -> Process () -> IO ProcessId -- | Run a process on a local node and wait for it to finish runProcess :: LocalNode -> Process () -> IO () initRemoteTable :: RemoteTable -- | NodeId of the node localNodeId :: LocalNode -> NodeId instance Control.Monad.Reader.Class.MonadReader Control.Distributed.Process.Internal.Types.LocalNode Control.Distributed.Process.Node.NC instance Control.Monad.State.Class.MonadState Control.Distributed.Process.Node.NCState Control.Distributed.Process.Node.NC instance Control.Monad.IO.Class.MonadIO Control.Distributed.Process.Node.NC instance GHC.Base.Monad Control.Distributed.Process.Node.NC instance GHC.Base.Functor Control.Distributed.Process.Node.NC instance GHC.Base.Applicative Control.Distributed.Process.Node.NC instance GHC.Exception.Exception Control.Distributed.Process.Node.ProcessKillException instance GHC.Show.Show Control.Distributed.Process.Node.ProcessKillException -- | -- -- Cloud Haskell provides a general purpose tracing mechanism, allowing a -- user supplied tracer process to receive messages when certain -- classes of system events occur. It's possible to use this facility to -- aid in debugging and/or perform other diagnostic tasks to a program at -- runtime. -- -- -- -- Throughout the lifecycle of a local node, the distributed-process -- runtime generates trace events, describing internal runtime -- activities such as the spawning and death of processes, message -- sending, delivery and so on. See the MxEvent type's -- documentation for a list of all the published event types, which -- correspond directly to the types of management events. Users -- can additionally publish custom trace events in the form of -- MxLog log messages or pass custom (i.e., completely user -- defined) event data using the traceMessage function. -- -- All published traces are forwarded to a tracer process, which -- can be specified (and changed) at runtime using traceEnable. -- Some pre-defined tracer processes are provided for conveniently -- printing to stderr, a log file or the GHC eventlog. -- -- If a tracer process crashes, no attempt is made to restart it. -- -- -- -- The tracing facility only ever writes to a single tracer process. This -- invariant insulates the tracer controller and ensures a fast path for -- handling all trace events. This module provides facilities for -- layering trace handlers using Cloud Haskell's built-in delegation -- primitives. -- -- The startTracer function wraps the registered tracer -- process with the supplied handler and also forwards trace events to -- the original tracer. The corresponding stopTracer function -- terminates tracer processes in reverse of the order in which they were -- started, and re-registers the previous tracer process. -- -- -- -- The built in tracers provide a simple logging facility that -- writes trace events out to either a log file, stderr or the -- GHC eventlog. These tracers can be configured using environment -- variables, or specified manually using the traceEnable -- function. -- -- When a new local node is started, the contents of several environment -- variables are checked to determine which default tracer process is -- selected. If none of these variables is set, a no-op tracer process is -- installed, which effectively ignores all trace messages. Note that in -- this case, trace events are still generated and passed through the -- system. Only one default tracer will be chosen - the first that -- contains a (valid) value. These environment variables, in the order -- they're examined, are: -- --
    --
  1. DISTRIBUTED_PROCESS_TRACE_FILE This is checked for a -- valid file path. If it exists and the file can be opened for writing, -- all trace output will be directed thence. If the supplied path is -- invalid, or the file is unavailable for writing, this tracer will not -- be selected.
  2. --
  3. DISTRIBUTED_PROCESS_TRACE_CONSOLE This is checked for -- any non-empty value. If set, then all trace output will be -- directed to the system logger process.
  4. --
  5. DISTRIBUTED_PROCESS_TRACE_EVENTLOG This is checked for -- any non-empty value. If set, all internal traces are written to -- the GHC eventlog.
  6. --
-- -- By default, the built in tracers will ignore all trace events! In -- order to enable tracing the incoming MxEvent stream, the -- DISTRIBUTED_PROCESS_TRACE_FLAGS environment variable accepts -- the following flags, which enable tracing specific event types: -- -- -- -- Users of the simplelocalnet Cloud Haskell backend should also -- note that because the trace file option only supports trace output -- from a single node (so as to avoid interleaving), a file trace -- configured for the master node will prevent slaves from tracing to the -- file. They will need to fall back to the console or eventlog tracers -- instead, which can be accomplished by setting one of these environment -- variables as well, since the latter will only be selected on -- slaves (when the file tracer selection fails). -- -- Support for writing to the eventlog requires specific intervention to -- work, without which, written traces are silently dropped/ignored and -- no output will be generated. The GHC eventlog documentation provides -- information about enabling, viewing and working with event traces at -- http://hackage.haskell.org/trac/ghc/wiki/EventLog. module Control.Distributed.Process.Debug data TraceArg TraceStr :: String -> TraceArg Trace :: a -> TraceArg -- | Defines what will be traced. Flags that control tracing of -- Process events, take a TraceSubject controlling which -- processes should generate trace events in the target process. data TraceFlags TraceFlags :: !(Maybe TraceSubject) -> !(Maybe TraceSubject) -> !(Maybe TraceSubject) -> !(Maybe TraceSubject) -> !(Maybe TraceSubject) -> !(Maybe TraceSubject) -> !Bool -> !Bool -> TraceFlags [traceSpawned] :: TraceFlags -> !(Maybe TraceSubject) [traceDied] :: TraceFlags -> !(Maybe TraceSubject) [traceRegistered] :: TraceFlags -> !(Maybe TraceSubject) [traceUnregistered] :: TraceFlags -> !(Maybe TraceSubject) [traceSend] :: TraceFlags -> !(Maybe TraceSubject) [traceRecv] :: TraceFlags -> !(Maybe TraceSubject) [traceNodes] :: TraceFlags -> !Bool [traceConnections] :: TraceFlags -> !Bool -- | Defines which processes will be traced by a given TraceFlag, -- either by name, or ProcessId. Choosing TraceAll is -- by far the most efficient approach, as the tracer process -- therefore avoids deciding whether or not a trace event is viable. data TraceSubject TraceAll :: TraceSubject TraceProcs :: !(Set ProcessId) -> TraceSubject TraceNames :: !(Set String) -> TraceSubject -- | Enable tracing to the supplied process and wait for a TraceOk -- response from the trace coordinator process. enableTrace :: ProcessId -> Process () -- | Enable tracing to the supplied process. enableTraceAsync :: ProcessId -> Process () -- | Disable the currently configured trace and wait for a TraceOk -- response from the trace coordinator process. disableTrace :: Process () -- | Evaluate proc with tracing enabled via handler, and -- immediately disable tracing thereafter, before giving the result (or -- exception in case of failure). withTracer :: forall a. (MxEvent -> Process ()) -> Process a -> Process (Either SomeException a) -- | Evaluate proc with the supplied flags enabled. Any previously -- set trace flags are restored immediately afterwards. withFlags :: forall a. TraceFlags -> Process a -> Process (Either SomeException a) getTraceFlags :: Process TraceFlags -- | Set the given flags for the current tracer and wait for a -- TraceOk response from the trace coordinator process. setTraceFlags :: TraceFlags -> Process () -- | Set the given flags for the current tracer. setTraceFlagsAsync :: TraceFlags -> Process () defaultTraceFlags :: TraceFlags -- | Trace all targets. traceOn :: Maybe TraceSubject -- | Turn tracing for for a subset of trace targets. traceOnly :: Traceable a => [a] -> Maybe TraceSubject -- | Trace no targets. traceOff :: Maybe TraceSubject -- | Starts a new tracer, using the supplied trace function. Only one -- tracer can be registered at a time, however this function -- overlays the registered tracer with the supplied handler, allowing the -- user to layer multiple tracers on top of one another, with trace -- events forwarded down through all the layers in turn. Once the top -- layer is stopped, the user is responsible for re-registering the -- original (prior) tracer pid before terminating. See withTracer -- for a mechanism that handles that. startTracer :: (MxEvent -> Process ()) -> Process ProcessId -- | Stops a user supplied tracer started with startTracer. Note -- that only one tracer process can be active at any given time. This -- process will stop the last process started with startTracer. If -- startTracer is called multiple times, successive calls to this -- function will stop the tracers in the reverse order which they were -- started. -- -- This function will never stop the system tracer (i.e., the tracer -- initially started when the node is created), therefore once all user -- supplied tracers (i.e., processes started via startTracer) have -- exited, subsequent calls to this function will have no effect. -- -- If the last tracer to have been registered was not started with -- startTracer then the behaviour of this function is -- undefined. stopTracer :: Process () -- | Send a log message to the internal tracing facility. If tracing is -- enabled, this will create a custom trace log event. traceLog :: String -> Process () -- | Send a log message to the internal tracing facility, using the given -- list of printable TraceArgs interspersed with the preceding -- delimiter. traceLogFmt :: String -> [TraceArg] -> Process () -- | Send an arbitrary Message to the tracer process. traceMessage :: Serializable m => m -> Process () -- | Remote Table. remoteTable :: RemoteTable -> RemoteTable -- | Starts a trace relay process on the remote node, which forwards -- all trace events to the registered tracer on this (the calling -- process') node. startTraceRelay :: NodeId -> Process ProcessId -- | Set the given flags for a remote node (asynchronous). setTraceFlagsRemote :: TraceFlags -> NodeId -> Process () systemLoggerTracer :: Process () logfileTracer :: FilePath -> Process () eventLogTracer :: Process () -- | -- -- This is an implementation of Cloud Haskell, as described in Towards -- Haskell in the Cloud by Jeff Epstein, Andrew Black, and Simon -- Peyton Jones (see -- http://research.microsoft.com/en-us/um/people/simonpj/papers/parallel/), -- although some of the details are different. The precise message -- passing semantics are based on A unified semantics for future -- Erlang by Hans Svensson, Lars-Åke Fredlund and Clara Benac Earle. -- -- For a detailed description of the package and other reference -- materials, please see the distributed-process wiki page on github: -- https://github.com/haskell-distributed/distributed-process/wiki. module Control.Distributed.Process -- | Process identifier data ProcessId -- | Node identifier newtype NodeId NodeId :: EndPointAddress -> NodeId [nodeAddress] :: NodeId -> EndPointAddress -- | The Cloud Haskell Process type data Process a -- | A send port is identified by a SendPortId. -- -- You cannot send directly to a SendPortId; instead, use -- newChan to create a SendPort. data SendPortId -- | The ID of the node the process is running on processNodeId :: ProcessId -> NodeId -- | The ID of the process that will receive messages sent on this port sendPortProcessId :: SendPortId -> ProcessId -- | Lift a computation from the IO monad. liftIO :: MonadIO m => forall a. IO a -> m a -- | Send a message send :: Serializable a => ProcessId -> a -> Process () -- | Send a message unreliably. -- -- Unlike send, this function is insensitive to reconnect. -- It will try to send the message regardless of the history of -- connection failures between the nodes. -- -- Message passing with usend is ordered for a given sender and -- receiver if the messages arrive at all. usend :: Serializable a => ProcessId -> a -> Process () -- | Wait for a message of a specific type expect :: forall a. Serializable a => Process a -- | Like expect but with a timeout expectTimeout :: forall a. Serializable a => Int -> Process (Maybe a) -- | The receive end of a typed channel (not serializable) -- -- Note that ReceivePort implements Functor, -- Applicative, Alternative and Monad. This is -- especially useful when merging receive ports. data ReceivePort a -- | The send send of a typed channel (serializable) data SendPort a -- | The (unique) ID of this send port sendPortId :: SendPort a -> SendPortId -- | Create a new typed channel newChan :: Serializable a => Process (SendPort a, ReceivePort a) -- | Send a message on a typed channel sendChan :: Serializable a => SendPort a -> a -> Process () -- | Wait for a message on a typed channel receiveChan :: Serializable a => ReceivePort a -> Process a -- | Like receiveChan but with a timeout. If the timeout is 0, do a -- non-blocking check for a message. receiveChanTimeout :: Serializable a => Int -> ReceivePort a -> Process (Maybe a) -- | Merge a list of typed channels. -- -- The result port is left-biased: if there are messages available on -- more than one port, the first available message is returned. mergePortsBiased :: Serializable a => [ReceivePort a] -> Process (ReceivePort a) -- | Like mergePortsBiased, but with a round-robin scheduler (rather -- than left-biased) mergePortsRR :: Serializable a => [ReceivePort a] -> Process (ReceivePort a) -- | Unsafe variant of send. This function makes no -- attempt to serialize and (in the case when the destination process -- resides on the same local node) therefore ensure that the payload is -- fully evaluated before it is delivered. unsafeSend :: Serializable a => ProcessId -> a -> Process () -- | Unsafe variant of usend. This function makes no -- attempt to serialize the message when the destination process resides -- on the same local node. Therefore, a local receiver would need to be -- prepared to cope with any errors resulting from evaluation of the -- message. unsafeUSend :: Serializable a => ProcessId -> a -> Process () -- | Send a message on a typed channel. This function makes no -- attempt to serialize and (in the case when the ReceivePort -- resides on the same local node) therefore ensure that the payload is -- fully evaluated before it is delivered. unsafeSendChan :: Serializable a => SendPort a -> a -> Process () -- | Named send to a process in the local registry (asynchronous). This -- function makes no attempt to serialize and (in the case when -- the destination process resides on the same local node) therefore -- ensure that the payload is fully evaluated before it is delivered. unsafeNSend :: Serializable a => String -> a -> Process () -- | Named send to a process in a remote registry (asynchronous) This -- function makes no attempt to serialize and (in the case when -- the destination process resides on the same local node) therefore -- ensure that the payload is fully evaluated before it is delivered. unsafeNSendRemote :: Serializable a => NodeId -> String -> a -> Process () -- | This is the unsafe variant of wrapMessage. See -- Control.Distributed.Process.UnsafePrimitives for details. unsafeWrapMessage :: Serializable a => a -> Message -- | Opaque type used in receiveWait and receiveTimeout data Match b -- | Test the matches in order against each message in the queue receiveWait :: [Match b] -> Process b -- | Like receiveWait but with a timeout. -- -- If the timeout is zero do a non-blocking check for matching messages. -- A non-zero timeout is applied only when waiting for incoming messages -- (that is, after we have checked the messages that are already -- in the mailbox). receiveTimeout :: Int -> [Match b] -> Process (Maybe b) -- | Match against any message of the right type match :: forall a b. Serializable a => (a -> Process b) -> Match b -- | Match against any message of the right type that satisfies a predicate matchIf :: forall a b. Serializable a => (a -> Bool) -> (a -> Process b) -> Match b -- | Remove any message from the queue matchUnknown :: Process b -> Match b -- | Match against an arbitrary message. matchAny removes the first -- available message from the process mailbox. To handle arbitrary -- raw messages once removed from the mailbox, see -- handleMessage and unwrapMessage. matchAny :: forall b. (Message -> Process b) -> Match b -- | Match against an arbitrary message. Intended for use with -- handleMessage and unwrapMessage, this function -- only removes a message from the process mailbox, if the -- supplied condition matches. The success (or failure) of runtime type -- checks deferred to handleMessage and friends is irrelevant -- here, i.e., if the condition evaluates to True then the -- message will be removed from the process mailbox and decoded, but that -- does not guarantee that an expression passed to -- handleMessage will pass the runtime type checks and therefore -- be evaluated. matchAnyIf :: forall a b. (Serializable a) => (a -> Bool) -> (Message -> Process b) -> Match b -- | Match on a typed channel matchChan :: ReceivePort a -> (a -> Process b) -> Match b -- | Match on an arbitrary STM action. -- -- This rather unusaul match primitive allows us to compose -- arbitrary STM actions with checks against our process' mailbox and/or -- any typed channel ReceivePorts we may hold. -- -- This allows us to process multiple input streams along with our -- mailbox, in just the same way that matchChan supports -- checking both the mailbox and an arbitrary set of typed -- channels in one atomic transaction. -- -- Note there are no ordering guarnatees with respect to these disparate -- input sources. matchSTM :: STM a -> (a -> Process b) -> Match b -- | Messages consist of their typeRep fingerprint and their encoding data Message -- | Match against any message, regardless of the underlying (contained) -- type matchMessage :: (Message -> Process Message) -> Match Message -- | Match against any message (regardless of underlying type) that -- satisfies a predicate matchMessageIf :: (Message -> Bool) -> (Message -> Process Message) -> Match Message -- | internal use only. isEncoded :: Message -> Bool -- | Wrap a Serializable value in a Message. Note that -- Messages are Serializable - like the datum they contain -- - but also note, deserialising such a Message will yield a -- Message, not the type within it! To obtain the wrapped datum, -- use unwrapMessage or handleMessage with a specific type. -- --
--   do
--      self <- getSelfPid
--      send self (wrapMessage "blah")
--      Nothing  <- expectTimeout 1000000 :: Process (Maybe String)
--      (Just m) <- expectTimeout 1000000 :: Process (Maybe Message)
--      (Just "blah") <- unwrapMessage m :: Process (Maybe String)
--   
wrapMessage :: Serializable a => a -> Message -- | Attempt to unwrap a raw Message. If the type of the decoded -- message payload matches the expected type, the value will be returned -- with Just, otherwise Nothing indicates the types do -- not match. -- -- This expression, for example, will evaluate to Nothing > -- unwrapMessage (wrapMessage "foobar") :: Process (Maybe Int) -- -- Whereas this expression, will yield Just "foo" > -- unwrapMessage (wrapMessage "foo") :: Process (Maybe String) unwrapMessage :: forall m a. (Monad m, Serializable a) => Message -> m (Maybe a) -- | Attempt to handle a raw Message. If the type of the message -- matches the type of the first argument to the supplied expression, -- then the message will be decoded and the expression evaluated against -- its value. If this runtime type checking fails however, -- Nothing will be returned to indicate the fact. If the check -- succeeds and evaluation proceeds, the resulting value with be wrapped -- with Just. -- -- Intended for use in catchesExit and matchAny primitives. handleMessage :: forall m a b. (Monad m, Serializable a) => Message -> (a -> m b) -> m (Maybe b) -- | Conditionally handle a raw Message. If the predicate (a -- -> Bool) evaluates to True, invokes the supplied -- handler, other returns Nothing to indicate failure. See -- handleMessage for further information about runtime type -- checking. handleMessageIf :: forall m a b. (Monad m, Serializable a) => Message -> (a -> Bool) -> (a -> m b) -> m (Maybe b) -- | As handleMessage but ignores result, which is useful if you -- don't care whether or not the handler succeeded. handleMessage_ :: forall m a. (Monad m, Serializable a) => Message -> (a -> m ()) -> m () -- | Conditional version of handleMessage_. handleMessageIf_ :: forall m a. (Monad m, Serializable a) => Message -> (a -> Bool) -> (a -> m ()) -> m () -- | Forward a raw Message to the given ProcessId. forward :: Message -> ProcessId -> Process () -- | Forward a raw Message to the given ProcessId. -- -- Unlike forward, this function is insensitive to -- reconnect. It will try to send the message regardless of the -- history of connection failures between the nodes. uforward :: Message -> ProcessId -> Process () -- | Receives messages and forwards them to pid if p msg == -- True. delegate :: ProcessId -> (Message -> Bool) -> Process () -- | A straight relay that forwards all messages to the supplied pid. relay :: ProcessId -> Process () -- | Proxies pid and forwards messages whenever proc -- evaluates to True. Unlike delegate the predicate -- proc runs in the Process monad, allowing for richer -- proxy behaviour. If proc returns False or the -- runtime type check fails, no action is taken and the proxy -- process will continue running. proxy :: Serializable a => ProcessId -> (a -> Process Bool) -> Process () -- | Spawn a process -- -- For more information about Closure, see -- Control.Distributed.Process.Closure. -- -- See also call. spawn :: NodeId -> Closure (Process ()) -> Process ProcessId -- | Run a process remotely and wait for it to reply -- -- We monitor the remote process: if it dies before it can send a reply, -- we die too. -- -- For more information about Static, SerializableDict, and -- Closure, see Control.Distributed.Process.Closure. -- -- See also spawn. call :: Serializable a => Static (SerializableDict a) -> NodeId -> Closure (Process a) -> Process a -- | Terminate immediately (throws a ProcessTerminationException) terminate :: Process a -- | Die immediately - throws a ProcessExitException with the given -- reason. die :: Serializable a => a -> Process b -- | Forceful request to kill a process. Where exit provides an -- exception that can be caught and handled, kill throws an -- unexposed exception type which cannot be handled explicitly (by type). kill :: ProcessId -> String -> Process () -- | Graceful request to exit a process. Throws ProcessExitException -- with the supplied reason encoded as a message. Any exit -- signal raised in this manner can be handled using the -- catchExit family of functions. exit :: Serializable a => ProcessId -> a -> Process () -- | Catches ProcessExitException. The handler will not be applied -- unless its type matches the encoded data stored in the exception (see -- the reason argument given to the exit primitive). If the -- handler cannot be applied, the exception will be re-thrown. -- -- To handle ProcessExitException without regard for -- reason, see catch. To handle multiple reasons of -- differing types, see catchesExit. catchExit :: forall a b. (Show a, Serializable a) => Process b -> (ProcessId -> a -> Process b) -> Process b -- | Lift catches (almost). -- -- As ProcessExitException stores the exit reason as a -- typed, encoded message, a handler must accept inputs of the expected -- type. In order to handle a list of potentially different handlers (and -- therefore input types), a handler passed to catchesExit must -- accept Message and return Maybe (i.e., Just p -- if it handled the exit reason, otherwise Nothing). -- -- See maybeHandleMessage and Message for more details. catchesExit :: Process b -> [(ProcessId -> Message -> (Process (Maybe b)))] -> Process b -- | Thrown by terminate data ProcessTerminationException ProcessTerminationException :: ProcessTerminationException -- | Exception thrown when a process attempts to register a process under -- an already-registered name or to unregister a name that hasn't been -- registered. Returns the name and the identifier of the process that -- owns it, if any. data ProcessRegistrationException ProcessRegistrationException :: !String -> !(Maybe ProcessId) -> ProcessRegistrationException -- | SpawnRef are used to return pids of spawned processes data SpawnRef -- | Our own process ID getSelfPid :: Process ProcessId -- | Get the node ID of our local node getSelfNode :: Process NodeId -- | Provide information about a running process data ProcessInfo ProcessInfo :: NodeId -> [String] -> Int -> [(ProcessId, MonitorRef)] -> [ProcessId] -> ProcessInfo [infoNode] :: ProcessInfo -> NodeId [infoRegisteredNames] :: ProcessInfo -> [String] [infoMessageQueueLength] :: ProcessInfo -> Int [infoMonitors] :: ProcessInfo -> [(ProcessId, MonitorRef)] [infoLinks] :: ProcessInfo -> [ProcessId] -- | Get information about the specified process getProcessInfo :: ProcessId -> Process (Maybe ProcessInfo) data NodeStats NodeStats :: NodeId -> Int -> Int -> Int -> Int -> NodeStats [nodeStatsNode] :: NodeStats -> NodeId [nodeStatsRegisteredNames] :: NodeStats -> Int [nodeStatsMonitors] :: NodeStats -> Int [nodeStatsLinks] :: NodeStats -> Int [nodeStatsProcesses] :: NodeStats -> Int -- | Get statistics about the specified node getNodeStats :: NodeId -> Process (Either DiedReason NodeStats) -- | Get statistics about our local node getLocalNodeStats :: Process NodeStats -- | Link to a remote process (asynchronous) -- -- When process A links to process B (that is, process A calls link -- pidB) then an asynchronous exception will be thrown to process A -- when process B terminates (normally or abnormally), or when process A -- gets disconnected from process B. Although it is technically -- possible to catch these exceptions, chances are if you find yourself -- trying to do so you should probably be using monitor rather -- than link. In particular, code such as -- --
--   link pidB   -- Link to process B
--   expect      -- Wait for a message from process B
--   unlink pidB -- Unlink again
--   
-- -- doesn't quite do what one might expect: if process B sends a message -- to process A, and subsequently terminates, then process A might -- or might not be terminated too, depending on whether the exception is -- thrown before or after the unlink (i.e., this code has a race -- condition). -- -- Linking is all-or-nothing: A is either linked to B, or it's not. A -- second call to link has no effect. -- -- Note that link provides unidirectional linking (see -- spawnSupervised). Linking makes no distinction between normal -- and abnormal termination of the remote process. link :: ProcessId -> Process () -- | Link to a node (asynchronous) linkNode :: NodeId -> Process () -- | Link to a channel (asynchronous) linkPort :: SendPort a -> Process () -- | Remove a link -- -- This is synchronous in the sense that once it returns you are -- guaranteed that no exception will be raised if the remote process -- dies. However, it is asynchronous in the sense that we do not wait for -- a response from the remote node. unlink :: ProcessId -> Process () -- | Remove a node link -- -- This has the same synchronous/asynchronous nature as unlink. unlinkNode :: NodeId -> Process () -- | Remove a channel (send port) link -- -- This has the same synchronous/asynchronous nature as unlink. unlinkPort :: SendPort a -> Process () -- | Monitor another process (asynchronous) -- -- When process A monitors process B (that is, process A calls -- monitor pidB) then process A will receive a -- ProcessMonitorNotification when process B terminates (normally -- or abnormally), or when process A gets disconnected from process B. -- You receive this message like any other (using expect); the -- notification includes a reason (DiedNormal, -- DiedException, DiedDisconnect, etc.). -- -- Every call to monitor returns a new monitor reference -- MonitorRef; if multiple monitors are set up, multiple -- notifications will be delivered and monitors can be disabled -- individually using unmonitor. monitor :: ProcessId -> Process MonitorRef -- | Monitor a node (asynchronous) monitorNode :: NodeId -> Process MonitorRef -- | Monitor a typed channel (asynchronous) monitorPort :: forall a. Serializable a => SendPort a -> Process MonitorRef -- | Remove a monitor -- -- This has the same synchronous/asynchronous nature as unlink. -- -- ProcessMonitorNotification messages for the given MonitorRef are -- removed from the mailbox. unmonitor :: MonitorRef -> Process () -- | Establishes temporary monitoring of another process. -- -- withMonitor pid code sets up monitoring of pid for -- the duration of code. Note: although monitoring is no longer -- active when withMonitor returns, there might still be -- unreceived monitor messages in the queue. withMonitor :: ProcessId -> (MonitorRef -> Process a) -> Process a -- | Establishes temporary monitoring of another process. -- -- withMonitor_ pid code sets up monitoring of pid for -- the duration of code. Note: although monitoring is no longer -- active when withMonitor_ returns, there might still be -- unreceived monitor messages in the queue. -- -- Since 0.6.1 withMonitor_ :: ProcessId -> Process a -> Process a -- | MonitorRef is opaque for regular Cloud Haskell processes data MonitorRef -- | Exceptions thrown when a linked process dies data ProcessLinkException ProcessLinkException :: !ProcessId -> !DiedReason -> ProcessLinkException -- | Exception thrown when a linked node dies data NodeLinkException NodeLinkException :: !NodeId -> !DiedReason -> NodeLinkException -- | Exception thrown when a linked channel (port) dies data PortLinkException PortLinkException :: !SendPortId -> !DiedReason -> PortLinkException -- | Message sent by process monitors data ProcessMonitorNotification ProcessMonitorNotification :: !MonitorRef -> !ProcessId -> !DiedReason -> ProcessMonitorNotification -- | Message sent by node monitors data NodeMonitorNotification NodeMonitorNotification :: !MonitorRef -> !NodeId -> !DiedReason -> NodeMonitorNotification -- | Message sent by channel (port) monitors data PortMonitorNotification PortMonitorNotification :: !MonitorRef -> !SendPortId -> !DiedReason -> PortMonitorNotification -- | Why did a process die? data DiedReason -- | Normal termination DiedNormal :: DiedReason -- | The process exited with an exception (provided as String -- because Exception does not implement Binary) DiedException :: !String -> DiedReason -- | We got disconnected from the process node DiedDisconnect :: DiedReason -- | The process node died DiedNodeDown :: DiedReason -- | Invalid (processnodechannel) identifier DiedUnknownId :: DiedReason -- | A closure is a static value and an encoded environment data Closure a :: * -> * closure :: Static (ByteString -> a) -> ByteString -> Closure a -- | A static value. Static is opaque; see staticLabel and -- staticApply. data Static a :: * -> * -- | Resolve a static value unStatic :: Typeable a => Static a -> Process a -- | Resolve a closure unClosure :: Typeable a => Closure a -> Process a -- | Runtime dictionary for unstatic lookups data RemoteTable :: * -- | Log a string -- -- say message sends a message of type SayMessage with -- the current time and ProcessId of the current process to the -- process registered as logger. By default, this process simply -- sends the string to stderr. Individual Cloud Haskell backends -- might replace this with a different logger process, however. say :: String -> Process () -- | Register a process with the local registry (synchronous). The name -- must not already be registered. The process need not be on this node. -- A bad registration will result in a -- ProcessRegistrationException -- -- The process to be registered does not have to be local itself. register :: String -> ProcessId -> Process () -- | Like register, but will replace an existing registration. The -- name must already be registered. reregister :: String -> ProcessId -> Process () -- | Remove a process from the local registry (asynchronous). This version -- will wait until a response is gotten from the management process. The -- name must already be registered. unregister :: String -> Process () -- | Query the local process registry whereis :: String -> Process (Maybe ProcessId) -- | Named send to a process in the local registry (asynchronous) nsend :: Serializable a => String -> a -> Process () -- | Register a process with a remote registry (asynchronous). -- -- The process to be registered does not have to live on the same remote -- node. Reply wil come in the form of a RegisterReply message -- -- See comments in whereisRemoteAsync registerRemoteAsync :: NodeId -> String -> ProcessId -> Process () reregisterRemoteAsync :: NodeId -> String -> ProcessId -> Process () -- | Remove a process from a remote registry (asynchronous). -- -- Reply wil come in the form of a RegisterReply message -- -- See comments in whereisRemoteAsync unregisterRemoteAsync :: NodeId -> String -> Process () -- | Query a remote process registry (asynchronous) -- -- Reply will come in the form of a WhereIsReply message. -- -- There is currently no synchronous version of -- whereisRemoteAsync: if you implement one yourself, be sure to -- take into account that the remote node might die or get disconnect -- before it can respond (i.e. you should use monitorNode and take -- appropriate action when you receive a NodeMonitorNotification). whereisRemoteAsync :: NodeId -> String -> Process () -- | Named send to a process in a remote registry (asynchronous) nsendRemote :: Serializable a => NodeId -> String -> a -> Process () -- | (Asynchronous) reply from whereis data WhereIsReply WhereIsReply :: String -> (Maybe ProcessId) -> WhereIsReply -- | (Asynchronous) reply from register and unregister data RegisterReply RegisterReply :: String -> Bool -> (Maybe ProcessId) -> RegisterReply -- | Lift catch -- | Deprecated: Use Control.Monad.Catch.catch instead catch :: Exception e => Process a -> (e -> Process a) -> Process a -- | You need this when using catches data Handler a Handler :: (e -> Process a) -> Handler a -- | Lift catches catches :: Process a -> [Handler a] -> Process a -- | Lift try -- | Deprecated: Use Control.Monad.Catch.try instead try :: Exception e => Process a -> Process (Either e a) -- | Lift mask -- | Deprecated: Use Control.Monad.Catch.mask_ instead mask :: ((forall a. Process a -> Process a) -> Process b) -> Process b -- | Lift mask_ -- | Deprecated: Use Control.Monad.Catch.mask_ instead mask_ :: Process a -> Process a -- | Lift onException -- | Deprecated: Use Control.Monad.Catch.onException instead onException :: Process a -> Process b -> Process a -- | Lift bracket -- | Deprecated: Use Control.Monad.Catch.bracket instead bracket :: Process a -> (a -> Process b) -> (a -> Process c) -> Process c -- | Lift bracket_ -- | Deprecated: Use Control.Monad.Catch.bracket_ instead bracket_ :: Process a -> Process b -> Process c -> Process c -- | Lift finally -- | Deprecated: Use Control.Monad.Catch.finally instead finally :: Process a -> Process b -> Process a -- | Asynchronous version of spawn -- -- (spawn is defined in terms of spawnAsync and -- expect) spawnAsync :: NodeId -> Closure (Process ()) -> Process SpawnRef -- | Spawn a child process, have the child link to the parent and the -- parent monitor the child spawnSupervised :: NodeId -> Closure (Process ()) -> Process (ProcessId, MonitorRef) -- | Spawn a process and link to it -- -- Note that this is just the sequential composition of spawn and -- link. (The Unified semantics that underlies Cloud -- Haskell does not even support a synchronous link operation) spawnLink :: NodeId -> Closure (Process ()) -> Process ProcessId -- | Like spawnLink, but monitor the spawned process spawnMonitor :: NodeId -> Closure (Process ()) -> Process (ProcessId, MonitorRef) -- | Spawn a new process, supplying it with a new ReceivePort and -- return the corresponding SendPort. spawnChannel :: forall a. Serializable a => Static (SerializableDict a) -> NodeId -> Closure (ReceivePort a -> Process ()) -> Process (SendPort a) -- | (Asynchronius) reply from spawn data DidSpawn DidSpawn :: SpawnRef -> ProcessId -> DidSpawn -- | Spawn a process on the local node spawnLocal :: Process () -> Process ProcessId -- | Create a new typed channel, spawn a process on the local node, passing -- it the receive port, and return the send port spawnChannelLocal :: Serializable a => (ReceivePort a -> Process ()) -> Process (SendPort a) -- | Local version of call. Running a process in this way isolates -- it from messages sent to the caller process, and also allows silently -- dropping late or duplicate messages sent to the isolated process after -- it exits. Silently dropping messages may not always be the best -- approach. callLocal :: Process a -> Process a -- | Cloud Haskell provides the illusion of connection-less, reliable, -- ordered message passing. However, when network connections get -- disrupted this illusion cannot always be maintained. Once a network -- connection breaks (even temporarily) no further communication on that -- connection will be possible. For example, if process A sends a message -- to process B, and A is then notified (by monitor notification) that it -- got disconnected from B, A will not be able to send any further -- messages to B, unless A explicitly indicates that it is -- acceptable to attempt to reconnect to B using the Cloud Haskell -- reconnect primitive. -- -- Importantly, when A calls reconnect it acknowledges that some -- messages to B might have been lost. For instance, if A sends messages -- m1 and m2 to B, then receives a monitor notification that its -- connection to B has been lost, calls reconnect and then sends -- m3, it is possible that B will receive m1 and m3 but not m2. -- -- Note that reconnect does not mean reconnect now but -- rather /it is okay to attempt to reconnect on the next send/. In -- particular, if no further communication attempts are made to B then A -- can use reconnect to clean up its connection to B. reconnect :: ProcessId -> Process () -- | Reconnect to a sendport. See reconnect for more information. reconnectPort :: SendPort a -> Process () module Control.Distributed.Process.Internal.Closure.Explicit -- | A RemoteRegister is a trasformer on a RemoteTable to register -- additional static values. type RemoteRegister = RemoteTable -> RemoteTable class MkTDict a mkTDict :: MkTDict a => String -> a -> RemoteRegister -- | This takes an explicit name and a value, and produces both a static -- reference to the name and a RemoteRegister for it. mkStaticVal :: Serializable a => String -> a -> (Static a, RemoteRegister) -- | This takes an explicit name, a function of arity one, and creates a -- creates a function yielding a closure and a remote register for it. mkClosureValSingle :: forall a b. (Serializable a, Typeable b, MkTDict b) => String -> (a -> b) -> (a -> Closure b, RemoteRegister) -- | This takes an explict name, a function of any arity, and creates a -- function yielding a closure and a remote register for it. mkClosureVal :: forall func argTuple result closureFunction. (Curry (argTuple -> Closure result) closureFunction, MkTDict result, Uncurry HTrue argTuple func result, Typeable result, Serializable argTuple, IsFunction func HTrue) => String -> func -> (closureFunction, RemoteRegister) -- | Works just like standard call, but with a simpler signature. call' :: forall a. Serializable a => NodeId -> Closure (Process a) -> Process a instance Control.Distributed.Process.Serializable.Serializable b => Control.Distributed.Process.Internal.Closure.Explicit.MkTDict (Control.Distributed.Process.Internal.Types.Process b) instance Control.Distributed.Process.Internal.Closure.Explicit.MkTDict a instance Data.Binary.Class.Binary Control.Distributed.Process.Internal.Closure.Explicit.EndOfTuple instance Control.Distributed.Process.Internal.Closure.Explicit.Curry ((a, Control.Distributed.Process.Internal.Closure.Explicit.EndOfTuple) -> b) (a -> b) instance Control.Distributed.Process.Internal.Closure.Explicit.Curry (b -> c) r => Control.Distributed.Process.Internal.Closure.Explicit.Curry ((a, b) -> c) (a -> r) instance Control.Distributed.Process.Internal.Closure.Explicit.Uncurry'' rest f r => Control.Distributed.Process.Internal.Closure.Explicit.Uncurry Control.Distributed.Process.Internal.Closure.Explicit.HTrue (a, rest) (a -> f) r instance Control.Distributed.Process.Internal.Closure.Explicit.Uncurry Control.Distributed.Process.Internal.Closure.Explicit.HFalse Control.Distributed.Process.Internal.Closure.Explicit.EndOfTuple a a instance (Control.Distributed.Process.Internal.Closure.Explicit.IsFunction func b, Control.Distributed.Process.Internal.Closure.Explicit.Uncurry b args func result) => Control.Distributed.Process.Internal.Closure.Explicit.Uncurry'' args func result instance b ~ Control.Distributed.Process.Internal.Closure.Explicit.HTrue => Control.Distributed.Process.Internal.Closure.Explicit.IsFunction (a -> c) b instance b ~ Control.Distributed.Process.Internal.Closure.Explicit.HFalse => Control.Distributed.Process.Internal.Closure.Explicit.IsFunction a b -- | Towards Haskell in the Cloud (Epstein et al., Haskell Symposium -- 2011) proposes a new type construct called static that -- characterizes values that are known statically. Cloud Haskell uses the -- Static implementation from Control.Distributed.Static. -- That module comes with its own extensive documentation, which you -- should read if you want to know the details. Here we explain the -- Template Haskell support only. -- -- -- -- Given a top-level (possibly polymorphic, but unqualified) definition -- --
--   f :: forall a1 .. an. T
--   f = ...
--   
-- -- you can use a Template Haskell splice to create a static version of -- f: -- --
--   $(mkStatic 'f) :: forall a1 .. an. (Typeable a1, .., Typeable an) => Static T
--   
-- -- Every module that you write that contains calls to mkStatic -- needs to have a call to remotable: -- --
--   remotable [ 'f, 'g, ... ]
--   
-- -- where you must pass every function (or other value) that you pass as -- an argument to mkStatic. The call to remotable will -- create a definition -- --
--   __remoteTable :: RemoteTable -> RemoteTable
--   
-- -- which can be used to construct the RemoteTable used to -- initialize Cloud Haskell. You should have (at most) one call to -- remotable per module, and compose all created functions when -- initializing Cloud Haskell: -- --
--   let rtable :: RemoteTable
--       rtable = M1.__remoteTable
--              . M2.__remoteTable
--              . ...
--              . Mn.__remoteTable
--              $ initRemoteTable
--   
-- -- NOTE: If you get a type error from ghc along these lines -- --
--   The exact Name `a_a30k' is not in scope
--        Probable cause: you used a unique name (NameU) in Template Haskell but did not bind it
--   
-- -- then you need to enable the ScopedTypeVariables language -- extension. -- -- -- -- Some Cloud Haskell primitives require static serialization -- dictionaries (**): -- --
--   call :: Serializable a => Static (SerializableDict a) -> NodeId -> Closure (Process a) -> Process a
--   
-- -- Given some serializable type T you can define -- --
--   sdictT :: SerializableDict T
--   sdictT = SerializableDict
--   
-- -- and then have -- --
--   $(mkStatic 'sdictT) :: Static (SerializableDict T)
--   
-- -- However, since these dictionaries are so frequently required Cloud -- Haskell provides special support for them. When you call -- remotable on a monomorphic function f :: T1 -> -- T2 -- --
--   remotable ['f]
--   
-- -- then a serialization dictionary is automatically created for you, -- which you can access with -- --
--   $(functionSDict 'f) :: Static (SerializableDict T1)
--   
-- -- In addition, if f :: T1 -> Process T2, then a second -- dictionary is created -- --
--   $(functionTDict 'f) :: Static (SerializableDict T2)
--   
-- -- -- -- Suppose you have a process -- --
--   isPrime :: Integer -> Process Bool
--   
-- -- Then -- --
--   $(mkClosure 'isPrime) :: Integer -> Closure (Process Bool)
--   
-- -- which you can then call, for example, to have a remote node -- check if a number is prime. -- -- In general, if you have a monomorphic function -- --
--   f :: T1 -> T2
--   
-- -- then -- --
--   $(mkClosure 'f) :: T1 -> Closure T2
--   
-- -- provided that T1 is serializable (*) (remember to pass -- f to remotable). -- -- (You can also create closures manually--see the documentation of -- Control.Distributed.Static for examples.) -- -- -- -- Here is a small self-contained example that uses closures and -- serialization dictionaries. It makes use of the -- Control.Distributed.Process.SimpleLocalnet Cloud Haskell backend. -- --
--   {-# LANGUAGE TemplateHaskell #-}
--   import System.Environment (getArgs)
--   import Control.Distributed.Process
--   import Control.Distributed.Process.Closure
--   import Control.Distributed.Process.Backend.SimpleLocalnet
--   import Control.Distributed.Process.Node (initRemoteTable)
--   
--   isPrime :: Integer -> Process Bool
--   isPrime n = return . (n `elem`) . takeWhile (<= n) . sieve $ [2..]
--     where
--       sieve :: [Integer] -> [Integer]
--       sieve (p : xs) = p : sieve [x | x <- xs, x `mod` p > 0]
--   
--   remotable ['isPrime]
--   
--   master :: [NodeId] -> Process ()
--   master [] = liftIO $ putStrLn "no slaves"
--   master (slave:_) = do
--     isPrime79 <- call $(functionTDict 'isPrime) slave ($(mkClosure 'isPrime) (79 :: Integer))
--     liftIO $ print isPrime79
--   
--   main :: IO ()
--   main = do
--     args <- getArgs
--     case args of
--       ["master", host, port] -> do
--         backend <- initializeBackend host port rtable
--         startMaster backend master
--       ["slave", host, port] -> do
--         backend <- initializeBackend host port rtable
--         startSlave backend
--     where
--       rtable :: RemoteTable
--       rtable = __remoteTable initRemoteTable
--   
-- -- -- -- (*) If T1 is not serializable you will get a type error in -- the generated code. Unfortunately, the Template Haskell infrastructure -- cannot check a priori if T1 is serializable or not due to a -- bug in the Template Haskell libraries -- (http://hackage.haskell.org/trac/ghc/ticket/7066) -- -- (**) Even though call is passed an explicit serialization -- dictionary, we still need the Serializable constraint because -- Static is not the true static. If it was, we could -- unstatic the dictionary and pattern match on it to bring the -- Typeable instance into scope, but unless proper -- static support is added to ghc we need both the type class -- argument and the explicit dictionary. module Control.Distributed.Process.Closure -- | Reification of Serializable (see -- Control.Distributed.Process.Closure) data SerializableDict a [SerializableDict] :: Serializable a => SerializableDict a -- | Static decoder, given a static serialization dictionary. -- -- See module documentation of Control.Distributed.Process.Closure -- for an example. staticDecode :: Typeable a => Static (SerializableDict a) -> Static (ByteString -> a) -- | Serialization dictionary for '()' sdictUnit :: Static (SerializableDict ()) -- | Serialization dictionary for ProcessId sdictProcessId :: Static (SerializableDict ProcessId) -- | Serialization dictionary for SendPort sdictSendPort :: Typeable a => Static (SerializableDict a) -> Static (SerializableDict (SendPort a)) -- | Serialization dictionary for Static. sdictStatic :: Typeable a => Static (TypeableDict a) -> Static (SerializableDict (Static a)) -- | Serialization dictionary for Closure. sdictClosure :: Typeable a => Static (TypeableDict a) -> Static (SerializableDict (Closure a)) -- | CP a b is a process with input of type a and output -- of type b type CP a b = Closure (a -> Process b) -- | CP version of id idCP :: Typeable a => CP a a -- | CP version of (***) splitCP :: (Typeable a, Typeable b, Typeable c, Typeable d) => CP a c -> CP b d -> CP (a, b) (c, d) -- | CP version of return returnCP :: forall a. Serializable a => Static (SerializableDict a) -> a -> Closure (Process a) -- | (Not quite the) CP version of (>>=) bindCP :: forall a b. (Typeable a, Typeable b) => Closure (Process a) -> CP a b -> Closure (Process b) -- | CP version of (>>) seqCP :: (Typeable a, Typeable b) => Closure (Process a) -> Closure (Process b) -> Closure (Process b) -- | CP version of link cpLink :: ProcessId -> Closure (Process ()) -- | CP version of unlink cpUnlink :: ProcessId -> Closure (Process ()) -- | CP version of relay cpRelay :: ProcessId -> Closure (Process ()) -- | CP version of send cpSend :: forall a. Typeable a => Static (SerializableDict a) -> ProcessId -> CP a () -- | CP version of expect cpExpect :: Typeable a => Static (SerializableDict a) -> Closure (Process a) -- | CP version of newChan cpNewChan :: Typeable a => Static (SerializableDict a) -> Closure (Process (SendPort a, ReceivePort a)) -- | A RemoteRegister is a trasformer on a RemoteTable to register -- additional static values. type RemoteRegister = RemoteTable -> RemoteTable class MkTDict a mkTDict :: MkTDict a => String -> a -> RemoteRegister -- | This takes an explicit name and a value, and produces both a static -- reference to the name and a RemoteRegister for it. mkStaticVal :: Serializable a => String -> a -> (Static a, RemoteRegister) -- | This takes an explicit name, a function of arity one, and creates a -- creates a function yielding a closure and a remote register for it. mkClosureValSingle :: forall a b. (Serializable a, Typeable b, MkTDict b) => String -> (a -> b) -> (a -> Closure b, RemoteRegister) -- | This takes an explict name, a function of any arity, and creates a -- function yielding a closure and a remote register for it. mkClosureVal :: forall func argTuple result closureFunction. (Curry (argTuple -> Closure result) closureFunction, MkTDict result, Uncurry HTrue argTuple func result, Typeable result, Serializable argTuple, IsFunction func HTrue) => String -> func -> (closureFunction, RemoteRegister) -- | Works just like standard call, but with a simpler signature. call' :: forall a. Serializable a => NodeId -> Closure (Process a) -> Process a -- | Create the closure, decoder, and metadata definitions for the given -- list of functions remotable :: [Name] -> Q [Dec] -- | Like remotable, but parameterized by the declaration of a -- function instead of the function name. So where for remotable -- you'd do -- --
--   f :: T1 -> T2
--   f = ...
--   
--   remotable ['f]
--   
-- -- with remotableDecl you would instead do -- --
--   remotableDecl [
--      [d| f :: T1 -> T2 ;
--          f = ...
--        |]
--    ]
--   
-- -- remotableDecl creates the function specified as well as the -- various dictionaries and static versions that remotable also -- creates. remotableDecl is sometimes necessary when you want to -- refer to, say, $(mkClosure 'f) within the definition of -- f itself. -- -- NOTE: remotableDecl creates __remoteTableDecl instead -- of __remoteTable so that you can use both remotable -- and remotableDecl within the same module. remotableDecl :: [Q [Dec]] -> Q [Dec] -- | Construct a static value. -- -- If f : forall a1 .. an. T then $(mkStatic 'f) :: forall -- a1 .. an. Static T. Be sure to pass f to -- remotable. mkStatic :: Name -> Q Exp -- | If f : T1 -> T2 then $(mkClosure 'f) :: T1 -> -- Closure T2. -- -- TODO: The current version of mkClosure is too polymorphic (@forall a. -- Binary a => a -> Closure T2). mkClosure :: Name -> Q Exp -- | Make a Closure from a static function. This is useful for -- making a closure for a top-level Process () function, because -- using mkClosure would require adding a dummy () -- argument. mkStaticClosure :: Name -> Q Exp -- | If f : T1 -> T2 is a monomorphic function then -- $(functionSDict 'f) :: Static (SerializableDict T1). -- -- Be sure to pass f to remotable. functionSDict :: Name -> Q Exp -- | If f : T1 -> Process T2 is a monomorphic function then -- $(functionTDict 'f) :: Static (SerializableDict T2). -- -- Be sure to pass f to remotable. functionTDict :: Name -> Q Exp