-- Hoogle documentation, generated by Haddock -- See Hoogle, http://www.haskell.org/hoogle/ -- | Cloud Haskell: Erlang-style concurrency in Haskell -- -- This is an implementation of Cloud Haskell, as described in Towards -- Haskell in the Cloud by Jeff Epstein, Andrew Black, and Simon -- Peyton Jones -- (http://research.microsoft.com/en-us/um/people/simonpj/papers/parallel/), -- although some of the details are different. The precise message -- passing semantics are based on A unified semantics for future -- Erlang by Hans Svensson, Lars-Åke Fredlund and Clara Benac Earle. -- You will probably also want to install a Cloud Haskell backend such as -- distributed-process-simplelocalnet. @package distributed-process @version 0.7.5 -- | This is an implementation of bidirectional multimaps. module Control.Distributed.Process.Internal.BiMultiMap -- | A bidirectional multimaps BiMultiMap a b v is a set of -- triplets of type (a, b, v). -- -- It is possible to lookup values by using either a or -- b as keys. data BiMultiMap a b v -- | The empty bidirectional multimap. empty :: BiMultiMap a b v -- | A bidirectional multimap containing a single triplet. singleton :: (Ord a, Ord b, Ord v) => a -> b -> v -> BiMultiMap a b v -- | Yields the amount of triplets in the multimap. size :: BiMultiMap a b v -> Int -- | Inserts a triplet in the multimap. insert :: (Ord a, Ord b, Ord v) => a -> b -> v -> BiMultiMap a b v -> BiMultiMap a b v -- | Looks up all the triplets whose first component is the given value. lookupBy1st :: Ord a => a -> BiMultiMap a b v -> Set (b, v) -- | Looks up all the triplets whose second component is the given value. lookupBy2nd :: Ord b => b -> BiMultiMap a b v -> Set (a, v) -- | Deletes a triplet. It yields the original multimap if the triplet is -- not present. delete :: (Ord a, Ord b, Ord v) => a -> b -> v -> BiMultiMap a b v -> BiMultiMap a b v -- | Deletes all triplets whose first component is the given value. deleteAllBy1st :: (Ord a, Ord b, Ord v) => a -> BiMultiMap a b v -> BiMultiMap a b v -- | Like deleteAllBy1st but deletes by the second component of the -- triplets. deleteAllBy2nd :: (Ord a, Ord b, Ord v) => b -> BiMultiMap a b v -> BiMultiMap a b v -- | Yields the triplets satisfying the given predicate, and a multimap -- with all this triplets removed. partitionWithKeyBy1st :: (Ord a, Ord b, Ord v) => (a -> Set (b, v) -> Bool) -> BiMultiMap a b v -> (Map a (Set (b, v)), BiMultiMap a b v) -- | Like partitionWithKeyBy1st but the predicates takes the second -- component of the triplets as first argument. partitionWithKeyBy2nd :: (Ord a, Ord b, Ord v) => (b -> Set (a, v) -> Bool) -> BiMultiMap a b v -> (Map b (Set (a, v)), BiMultiMap a b v) -- | Exchange the first and the second components of all triplets. flip :: BiMultiMap a b v -> BiMultiMap b a v module Control.Distributed.Process.Internal.StrictContainerAccessors mapMaybe :: Ord key => key -> Accessor (Map key elem) (Maybe elem) mapDefault :: Ord key => elem -> key -> Accessor (Map key elem) elem -- | Spine and element strict list module Control.Distributed.Process.Internal.StrictList -- | Strict list data StrictList a Cons :: !a -> !StrictList a -> StrictList a Nil :: StrictList a Snoc :: !StrictList a -> !a -> StrictList a Append :: !StrictList a -> !StrictList a -> StrictList a append :: StrictList a -> StrictList a -> StrictList a foldr :: (a -> b -> b) -> b -> StrictList a -> b -- | Like Control.Concurrent.MVar.Strict but reduce to HNF, not NF module Control.Distributed.Process.Internal.StrictMVar newtype StrictMVar a StrictMVar :: MVar a -> StrictMVar a newEmptyMVar :: IO (StrictMVar a) newMVar :: a -> IO (StrictMVar a) takeMVar :: StrictMVar a -> IO a putMVar :: StrictMVar a -> a -> IO () readMVar :: StrictMVar a -> IO a withMVar :: StrictMVar a -> (a -> IO b) -> IO b modifyMVar_ :: StrictMVar a -> (a -> IO a) -> IO () modifyMVar :: StrictMVar a -> (a -> IO (a, b)) -> IO b modifyMVarMasked :: StrictMVar a -> (a -> IO (a, b)) -> IO b mkWeakMVar :: StrictMVar a -> IO () -> IO (Weak (StrictMVar a)) -- | Concurrent queue for single reader, single writer module Control.Distributed.Process.Internal.CQueue data CQueue a data BlockSpec NonBlocking :: BlockSpec Blocking :: BlockSpec Timeout :: Int -> BlockSpec data MatchOn m a MatchMsg :: (m -> Maybe a) -> MatchOn m a MatchChan :: STM a -> MatchOn m a newCQueue :: IO (CQueue a) -- | Enqueue an element -- -- Enqueue is strict. enqueue :: CQueue a -> a -> IO () -- | Variant of enqueue for use in the STM monad. enqueueSTM :: CQueue a -> a -> STM () -- | Dequeue an element -- -- The timeout (if any) is applied only to waiting for incoming messages, -- not to checking messages that have already arrived dequeue :: CQueue m -> BlockSpec -> [MatchOn m a] -> IO (Maybe a) -- | Weak reference to a CQueue mkWeakCQueue :: CQueue a -> IO () -> IO (Weak (CQueue a)) queueSize :: CQueue a -> IO Int instance GHC.Base.Functor (Control.Distributed.Process.Internal.CQueue.MatchOn m) -- | Clone of Control.Concurrent.STM.TQueue with support for mkWeakTQueue -- -- Not all functionality from the original module is available: -- unGetTQueue, peekTQueue and tryPeekTQueue are missing. In order to -- implement these we'd need to be able to touch# the write end of the -- queue inside unGetTQueue, but that means we need a version of touch# -- that works within the STM monad. module Control.Distributed.Process.Internal.WeakTQueue -- | TQueue is an abstract type representing an unbounded FIFO -- channel. data TQueue a -- | Build and returns a new instance of TQueue newTQueue :: STM (TQueue a) -- | IO version of newTQueue. This is useful for creating -- top-level TQueues using unsafePerformIO, because using -- atomically inside unsafePerformIO isn't possible. newTQueueIO :: IO (TQueue a) -- | Read the next value from the TQueue. readTQueue :: TQueue a -> STM a -- | A version of readTQueue which does not retry. Instead it -- returns Nothing if no value is available. tryReadTQueue :: TQueue a -> STM (Maybe a) -- | Write a value to a TQueue. writeTQueue :: TQueue a -> a -> STM () -- | Returns True if the supplied TQueue is empty. isEmptyTQueue :: TQueue a -> STM Bool mkWeakTQueue :: TQueue a -> IO () -> IO (Weak (TQueue a)) instance GHC.Classes.Eq (Control.Distributed.Process.Internal.WeakTQueue.TQueue a) module Control.Distributed.Process.Serializable -- | Objects that can be sent across the network type Serializable a = (Binary a, Typeable a) -- | Encode type representation as a bytestring encodeFingerprint :: Fingerprint -> ByteString -- | Decode a bytestring into a fingerprint. Throws an IO exception on -- failure decodeFingerprint :: ByteString -> Fingerprint -- | The fingerprint of the typeRep of the argument fingerprint :: Typeable a => a -> Fingerprint -- | Size of a fingerprint sizeOfFingerprint :: Int data Fingerprint -- | Show fingerprint (for debugging purposes) showFingerprint :: Fingerprint -> ShowS -- | Reification of Serializable (see -- Control.Distributed.Process.Closure) data SerializableDict a [SerializableDict] :: forall a. Serializable a => SerializableDict a -- | Reification of Typeable. data TypeableDict a [TypeableDict] :: forall a. Typeable a => TypeableDict a -- | Types used throughout the Cloud Haskell framework -- -- We collect all types used internally in a single module because many -- of these data types are mutually recursive and cannot be split across -- modules. module Control.Distributed.Process.Internal.Types -- | Node identifier newtype NodeId NodeId :: EndPointAddress -> NodeId [nodeAddress] :: NodeId -> EndPointAddress -- | A local process ID consists of a seed which distinguishes processes -- from different instances of the same local node and a counter data LocalProcessId LocalProcessId :: {-# UNPACK #-} !Int32 -> {-# UNPACK #-} !Int32 -> LocalProcessId [lpidUnique] :: LocalProcessId -> {-# UNPACK #-} !Int32 [lpidCounter] :: LocalProcessId -> {-# UNPACK #-} !Int32 -- | Process identifier data ProcessId ProcessId :: !NodeId -> {-# UNPACK #-} !LocalProcessId -> ProcessId -- | The ID of the node the process is running on [processNodeId] :: ProcessId -> !NodeId -- | Node-local identifier for the process [processLocalId] :: ProcessId -> {-# UNPACK #-} !LocalProcessId -- | Union of all kinds of identifiers data Identifier NodeIdentifier :: !NodeId -> Identifier ProcessIdentifier :: !ProcessId -> Identifier SendPortIdentifier :: !SendPortId -> Identifier nodeOf :: Identifier -> NodeId firstNonReservedProcessId :: Int32 nullProcessId :: NodeId -> ProcessId -- | Local nodes data LocalNode LocalNode :: !NodeId -> !EndPoint -> !StrictMVar LocalNodeState -> !Chan NCMsg -> !MxEventBus -> !RemoteTable -> LocalNode -- | NodeId of the node [localNodeId] :: LocalNode -> !NodeId -- | The network endpoint associated with this node [localEndPoint] :: LocalNode -> !EndPoint -- | Local node state [localState] :: LocalNode -> !StrictMVar LocalNodeState -- | Channel for the node controller [localCtrlChan] :: LocalNode -> !Chan NCMsg -- | Internal management event bus [localEventBus] :: LocalNode -> !MxEventBus -- | Runtime lookup table for supporting closures TODO: this should be part -- of the CH state, not the local endpoint state [remoteTable] :: LocalNode -> !RemoteTable -- | Local node state data LocalNodeState LocalNodeValid :: {-# UNPACK #-} !ValidLocalNodeState -> LocalNodeState LocalNodeClosed :: LocalNodeState data ValidLocalNodeState ValidLocalNodeState :: !Map LocalProcessId LocalProcess -> !Int32 -> !Int32 -> !Map (Identifier, Identifier) (Connection, ImplicitReconnect) -> ValidLocalNodeState -- | Processes running on this node [_localProcesses] :: ValidLocalNodeState -> !Map LocalProcessId LocalProcess -- | Counter to assign PIDs [_localPidCounter] :: ValidLocalNodeState -> !Int32 -- | The unique value used to create PIDs (so that processes on -- restarted nodes have new PIDs) [_localPidUnique] :: ValidLocalNodeState -> !Int32 -- | Outgoing connections [_localConnections] :: ValidLocalNodeState -> !Map (Identifier, Identifier) (Connection, ImplicitReconnect) -- | Thrown by some primitives when they notice the node has been closed. data NodeClosedException NodeClosedException :: NodeId -> NodeClosedException -- | Wrapper around withMVar that checks that the local node is -- still in a valid state. withValidLocalState :: LocalNode -> (ValidLocalNodeState -> IO r) -> IO r -- | Wrapper around modifyMVar that checks that the local node is -- still in a valid state. modifyValidLocalState :: LocalNode -> (ValidLocalNodeState -> IO (ValidLocalNodeState, a)) -> IO (Maybe a) -- | Wrapper around modifyMVar_ that checks that the local node is -- still in a valid state. modifyValidLocalState_ :: LocalNode -> (ValidLocalNodeState -> IO ValidLocalNodeState) -> IO () -- | Provides access to the trace controller data Tracer Tracer :: !ProcessId -> !Weak (CQueue Message) -> Tracer -- | Process id for the currently active trace handler [tracerPid] :: Tracer -> !ProcessId -- | Weak reference to the tracer controller's mailbox [weakQ] :: Tracer -> !Weak (CQueue Message) -- | Local system management event bus state data MxEventBus MxEventBusInitialising :: MxEventBus MxEventBus :: !ProcessId -> !Tracer -> !Weak (CQueue Message) -> !((TChan Message, TChan Message) -> Process ()) -> IO ProcessId -> MxEventBus -- | Process id of the management agent controller process [agent] :: MxEventBus -> !ProcessId -- | Configuration for the local trace controller [tracer] :: MxEventBus -> !Tracer -- | Weak reference to the management agent controller's mailbox [evbuss] :: MxEventBus -> !Weak (CQueue Message) -- | API for adding management agents to a running node [mxNew] :: MxEventBus -> !((TChan Message, TChan Message) -> Process ()) -> IO ProcessId -- | Processes running on our local node data LocalProcess LocalProcess :: !CQueue Message -> !Weak (CQueue Message) -> !ProcessId -> !StrictMVar LocalProcessState -> !ThreadId -> !LocalNode -> LocalProcess [processQueue] :: LocalProcess -> !CQueue Message [processWeakQ] :: LocalProcess -> !Weak (CQueue Message) [processId] :: LocalProcess -> !ProcessId [processState] :: LocalProcess -> !StrictMVar LocalProcessState [processThread] :: LocalProcess -> !ThreadId [processNode] :: LocalProcess -> !LocalNode -- | Local process state data LocalProcessState LocalProcessState :: !Int32 -> !Int32 -> !Int32 -> !Map LocalSendPortId TypedChannel -> LocalProcessState [_monitorCounter] :: LocalProcessState -> !Int32 [_spawnCounter] :: LocalProcessState -> !Int32 [_channelCounter] :: LocalProcessState -> !Int32 [_typedChannels] :: LocalProcessState -> !Map LocalSendPortId TypedChannel -- | The Cloud Haskell Process type newtype Process a Process :: ReaderT LocalProcess IO a -> Process a [unProcess] :: Process a -> ReaderT LocalProcess IO a -- | Deconstructor for Process (not exported to the public API) runLocalProcess :: LocalProcess -> Process a -> IO a data ImplicitReconnect WithImplicitReconnect :: ImplicitReconnect NoImplicitReconnect :: ImplicitReconnect type LocalSendPortId = Int32 -- | A send port is identified by a SendPortId. -- -- You cannot send directly to a SendPortId; instead, use -- newChan to create a SendPort. data SendPortId SendPortId :: {-# UNPACK #-} !ProcessId -> {-# UNPACK #-} !LocalSendPortId -> SendPortId -- | The ID of the process that will receive messages sent on this port [sendPortProcessId] :: SendPortId -> {-# UNPACK #-} !ProcessId -- | Process-local ID of the channel [sendPortLocalId] :: SendPortId -> {-# UNPACK #-} !LocalSendPortId data TypedChannel TypedChannel :: Weak (TQueue a) -> TypedChannel -- | The send send of a typed channel (serializable) newtype SendPort a SendPort :: SendPortId -> SendPort a -- | The (unique) ID of this send port [sendPortId] :: SendPort a -> SendPortId -- | The receive end of a typed channel (not serializable) -- -- Note that ReceivePort implements Functor, -- Applicative, Alternative and Monad. This is -- especially useful when merging receive ports. newtype ReceivePort a ReceivePort :: STM a -> ReceivePort a [receiveSTM] :: ReceivePort a -> STM a -- | Messages consist of their typeRep fingerprint and their encoding data Message EncodedMessage :: !Fingerprint -> !ByteString -> Message [messageFingerprint] :: Message -> !Fingerprint [messageEncoding] :: Message -> !ByteString UnencodedMessage :: !Fingerprint -> !a -> Message [messageFingerprint] :: Message -> !Fingerprint [messagePayload] :: Message -> !a -- | internal use only. isEncoded :: Message -> Bool -- | Turn any serialiable term into a message createMessage :: Serializable a => a -> Message -- | Turn any serializable term into an unencoded/local message createUnencodedMessage :: Serializable a => a -> Message -- | Turn any serializable term into an unencodede/local message, without -- evalutaing it! This is a dangerous business. unsafeCreateUnencodedMessage :: Serializable a => a -> Message -- | Serialize a message messageToPayload :: Message -> [ByteString] -- | Deserialize a message payloadToMessage :: [ByteString] -> Message -- | MonitorRef is opaque for regular Cloud Haskell processes data MonitorRef MonitorRef :: !Identifier -> !Int32 -> MonitorRef -- | ID of the entity to be monitored [monitorRefIdent] :: MonitorRef -> !Identifier -- | Unique to distinguish multiple monitor requests by the same process [monitorRefCounter] :: MonitorRef -> !Int32 -- | Message sent by process monitors data ProcessMonitorNotification ProcessMonitorNotification :: !MonitorRef -> !ProcessId -> !DiedReason -> ProcessMonitorNotification -- | Message sent by node monitors data NodeMonitorNotification NodeMonitorNotification :: !MonitorRef -> !NodeId -> !DiedReason -> NodeMonitorNotification -- | Message sent by channel (port) monitors data PortMonitorNotification PortMonitorNotification :: !MonitorRef -> !SendPortId -> !DiedReason -> PortMonitorNotification -- | Internal exception thrown indirectly by exit data ProcessExitException ProcessExitException :: !ProcessId -> !Message -> ProcessExitException -- | Exceptions thrown when a linked process dies data ProcessLinkException ProcessLinkException :: !ProcessId -> !DiedReason -> ProcessLinkException -- | Exception thrown when a linked node dies data NodeLinkException NodeLinkException :: !NodeId -> !DiedReason -> NodeLinkException -- | Exception thrown when a linked channel (port) dies data PortLinkException PortLinkException :: !SendPortId -> !DiedReason -> PortLinkException -- | Exception thrown when a process attempts to register a process under -- an already-registered name or to unregister a name that hasn't been -- registered. Returns the name and the identifier of the process that -- owns it, if any. data ProcessRegistrationException ProcessRegistrationException :: !String -> !Maybe ProcessId -> ProcessRegistrationException -- | Why did a process die? data DiedReason -- | Normal termination DiedNormal :: DiedReason -- | The process exited with an exception (provided as String -- because Exception does not implement Binary) DiedException :: !String -> DiedReason -- | We got disconnected from the process node DiedDisconnect :: DiedReason -- | The process node died DiedNodeDown :: DiedReason -- | Invalid (processnodechannel) identifier DiedUnknownId :: DiedReason -- | (Asynchronous) reply from unmonitor newtype DidUnmonitor DidUnmonitor :: MonitorRef -> DidUnmonitor -- | (Asynchronous) reply from unlink newtype DidUnlinkProcess DidUnlinkProcess :: ProcessId -> DidUnlinkProcess -- | (Asynchronous) reply from unlinkNode newtype DidUnlinkNode DidUnlinkNode :: NodeId -> DidUnlinkNode -- | (Asynchronous) reply from unlinkPort newtype DidUnlinkPort DidUnlinkPort :: SendPortId -> DidUnlinkPort -- | SpawnRef are used to return pids of spawned processes newtype SpawnRef SpawnRef :: Int32 -> SpawnRef -- | (Asynchronius) reply from spawn data DidSpawn DidSpawn :: SpawnRef -> ProcessId -> DidSpawn -- | (Asynchronous) reply from whereis data WhereIsReply WhereIsReply :: String -> Maybe ProcessId -> WhereIsReply -- | (Asynchronous) reply from register and unregister data RegisterReply RegisterReply :: String -> Bool -> Maybe ProcessId -> RegisterReply -- | Provide information about a running process data ProcessInfo ProcessInfo :: NodeId -> [String] -> Int -> [(ProcessId, MonitorRef)] -> [ProcessId] -> ProcessInfo [infoNode] :: ProcessInfo -> NodeId [infoRegisteredNames] :: ProcessInfo -> [String] [infoMessageQueueLength] :: ProcessInfo -> Int [infoMonitors] :: ProcessInfo -> [(ProcessId, MonitorRef)] [infoLinks] :: ProcessInfo -> [ProcessId] data ProcessInfoNone ProcessInfoNone :: DiedReason -> ProcessInfoNone data NodeStats NodeStats :: NodeId -> Int -> Int -> Int -> Int -> NodeStats [nodeStatsNode] :: NodeStats -> NodeId [nodeStatsRegisteredNames] :: NodeStats -> Int [nodeStatsMonitors] :: NodeStats -> Int [nodeStatsLinks] :: NodeStats -> Int [nodeStatsProcesses] :: NodeStats -> Int -- | Messages to the node controller data NCMsg NCMsg :: !Identifier -> !ProcessSignal -> NCMsg [ctrlMsgSender] :: NCMsg -> !Identifier [ctrlMsgSignal] :: NCMsg -> !ProcessSignal -- | Signals to the node controller (see NCMsg) data ProcessSignal Link :: !Identifier -> ProcessSignal Unlink :: !Identifier -> ProcessSignal Monitor :: !MonitorRef -> ProcessSignal Unmonitor :: !MonitorRef -> ProcessSignal Died :: Identifier -> !DiedReason -> ProcessSignal Spawn :: !Closure (Process ()) -> !SpawnRef -> ProcessSignal WhereIs :: !String -> ProcessSignal Register :: !String -> !NodeId -> !Maybe ProcessId -> !Bool -> ProcessSignal NamedSend :: !String -> !Message -> ProcessSignal UnreliableSend :: !LocalProcessId -> !Message -> ProcessSignal LocalSend :: !ProcessId -> !Message -> ProcessSignal LocalPortSend :: !SendPortId -> !Message -> ProcessSignal Kill :: !ProcessId -> !String -> ProcessSignal Exit :: !ProcessId -> !Message -> ProcessSignal GetInfo :: !ProcessId -> ProcessSignal SigShutdown :: ProcessSignal GetNodeStats :: !NodeId -> ProcessSignal localProcesses :: Accessor ValidLocalNodeState (Map LocalProcessId LocalProcess) localPidCounter :: Accessor ValidLocalNodeState Int32 localPidUnique :: Accessor ValidLocalNodeState Int32 localConnections :: Accessor ValidLocalNodeState (Map (Identifier, Identifier) (Connection, ImplicitReconnect)) localProcessWithId :: LocalProcessId -> Accessor ValidLocalNodeState (Maybe LocalProcess) localConnectionBetween :: Identifier -> Identifier -> Accessor ValidLocalNodeState (Maybe (Connection, ImplicitReconnect)) monitorCounter :: Accessor LocalProcessState Int32 spawnCounter :: Accessor LocalProcessState Int32 channelCounter :: Accessor LocalProcessState LocalSendPortId typedChannels :: Accessor LocalProcessState (Map LocalSendPortId TypedChannel) typedChannelWithId :: LocalSendPortId -> Accessor LocalProcessState (Maybe TypedChannel) forever' :: Monad m => m a -> m b instance GHC.Base.Alternative Control.Distributed.Process.Internal.Types.ReceivePort instance GHC.Base.Applicative Control.Distributed.Process.Internal.Types.Process instance GHC.Base.Applicative Control.Distributed.Process.Internal.Types.ReceivePort instance Data.Binary.Class.Binary Control.Distributed.Process.Internal.Types.DidSpawn instance Data.Binary.Class.Binary Control.Distributed.Process.Internal.Types.DidUnlinkNode instance Data.Binary.Class.Binary Control.Distributed.Process.Internal.Types.DidUnlinkPort instance Data.Binary.Class.Binary Control.Distributed.Process.Internal.Types.DidUnlinkProcess instance Data.Binary.Class.Binary Control.Distributed.Process.Internal.Types.DidUnmonitor instance Data.Binary.Class.Binary Control.Distributed.Process.Internal.Types.DiedReason instance Data.Binary.Class.Binary Control.Distributed.Process.Internal.Types.Identifier instance Data.Binary.Class.Binary Control.Distributed.Process.Internal.Types.LocalProcessId instance Data.Binary.Class.Binary Control.Distributed.Process.Internal.Types.Message instance Data.Binary.Class.Binary Control.Distributed.Process.Internal.Types.MonitorRef instance Data.Binary.Class.Binary Control.Distributed.Process.Internal.Types.NCMsg instance Data.Binary.Class.Binary Control.Distributed.Process.Internal.Types.NodeId instance Data.Binary.Class.Binary Control.Distributed.Process.Internal.Types.NodeMonitorNotification instance Data.Binary.Class.Binary Control.Distributed.Process.Internal.Types.NodeStats instance Data.Binary.Class.Binary Control.Distributed.Process.Internal.Types.PortMonitorNotification instance Data.Binary.Class.Binary Control.Distributed.Process.Internal.Types.ProcessId instance Data.Binary.Class.Binary Control.Distributed.Process.Internal.Types.ProcessInfo instance Data.Binary.Class.Binary Control.Distributed.Process.Internal.Types.ProcessInfoNone instance Data.Binary.Class.Binary Control.Distributed.Process.Internal.Types.ProcessMonitorNotification instance Data.Binary.Class.Binary Control.Distributed.Process.Internal.Types.ProcessSignal instance Data.Binary.Class.Binary Control.Distributed.Process.Internal.Types.RegisterReply instance Control.Distributed.Process.Serializable.Serializable a => Data.Binary.Class.Binary (Control.Distributed.Process.Internal.Types.SendPort a) instance Data.Binary.Class.Binary Control.Distributed.Process.Internal.Types.SendPortId instance Data.Binary.Class.Binary Control.Distributed.Process.Internal.Types.SpawnRef instance Data.Binary.Class.Binary Control.Distributed.Process.Internal.Types.WhereIsReply instance Data.Data.Data Control.Distributed.Process.Internal.Types.LocalProcessId instance Data.Data.Data Control.Distributed.Process.Internal.Types.NodeId instance Data.Data.Data Control.Distributed.Process.Internal.Types.ProcessId instance GHC.Classes.Eq Control.Distributed.Process.Internal.Types.DiedReason instance GHC.Classes.Eq Control.Distributed.Process.Internal.Types.Identifier instance GHC.Classes.Eq Control.Distributed.Process.Internal.Types.ImplicitReconnect instance GHC.Classes.Eq Control.Distributed.Process.Internal.Types.LocalProcessId instance GHC.Classes.Eq Control.Distributed.Process.Internal.Types.MonitorRef instance GHC.Classes.Eq Control.Distributed.Process.Internal.Types.NodeId instance GHC.Classes.Eq Control.Distributed.Process.Internal.Types.NodeStats instance GHC.Classes.Eq Control.Distributed.Process.Internal.Types.ProcessId instance GHC.Classes.Eq Control.Distributed.Process.Internal.Types.ProcessInfo instance GHC.Classes.Eq (Control.Distributed.Process.Internal.Types.SendPort a) instance GHC.Classes.Eq Control.Distributed.Process.Internal.Types.SendPortId instance GHC.Classes.Eq Control.Distributed.Process.Internal.Types.SpawnRef instance GHC.Exception.Type.Exception Control.Distributed.Process.Internal.Types.NodeClosedException instance GHC.Exception.Type.Exception Control.Distributed.Process.Internal.Types.NodeLinkException instance GHC.Exception.Type.Exception Control.Distributed.Process.Internal.Types.PortLinkException instance GHC.Exception.Type.Exception Control.Distributed.Process.Internal.Types.ProcessExitException instance GHC.Exception.Type.Exception Control.Distributed.Process.Internal.Types.ProcessLinkException instance GHC.Exception.Type.Exception Control.Distributed.Process.Internal.Types.ProcessRegistrationException instance GHC.Base.Functor Control.Distributed.Process.Internal.Types.Process instance GHC.Base.Functor Control.Distributed.Process.Internal.Types.ReceivePort instance GHC.Generics.Generic Control.Distributed.Process.Internal.Types.Identifier instance GHC.Generics.Generic Control.Distributed.Process.Internal.Types.LocalProcessId instance GHC.Generics.Generic Control.Distributed.Process.Internal.Types.MonitorRef instance GHC.Generics.Generic Control.Distributed.Process.Internal.Types.NodeId instance GHC.Generics.Generic Control.Distributed.Process.Internal.Types.ProcessId instance GHC.Generics.Generic (Control.Distributed.Process.Internal.Types.SendPort a) instance GHC.Generics.Generic Control.Distributed.Process.Internal.Types.SendPortId instance Data.Hashable.Class.Hashable Control.Distributed.Process.Internal.Types.Identifier instance Data.Hashable.Class.Hashable Control.Distributed.Process.Internal.Types.LocalProcessId instance Data.Hashable.Class.Hashable Control.Distributed.Process.Internal.Types.MonitorRef instance Data.Hashable.Class.Hashable Control.Distributed.Process.Internal.Types.NodeId instance Data.Hashable.Class.Hashable Control.Distributed.Process.Internal.Types.ProcessId instance Data.Hashable.Class.Hashable a => Data.Hashable.Class.Hashable (Control.Distributed.Process.Internal.Types.SendPort a) instance Data.Hashable.Class.Hashable Control.Distributed.Process.Internal.Types.SendPortId instance Control.Monad.Catch.MonadCatch Control.Distributed.Process.Internal.Types.Process instance Control.Monad.Fail.MonadFail Control.Distributed.Process.Internal.Types.Process instance Control.Monad.Fix.MonadFix Control.Distributed.Process.Internal.Types.Process instance Control.Monad.IO.Class.MonadIO Control.Distributed.Process.Internal.Types.Process instance Control.Monad.Catch.MonadMask Control.Distributed.Process.Internal.Types.Process instance Control.Monad.Reader.Class.MonadReader Control.Distributed.Process.Internal.Types.LocalProcess Control.Distributed.Process.Internal.Types.Process instance GHC.Base.Monad Control.Distributed.Process.Internal.Types.Process instance GHC.Base.Monad Control.Distributed.Process.Internal.Types.ReceivePort instance Control.Monad.Catch.MonadThrow Control.Distributed.Process.Internal.Types.Process instance Control.DeepSeq.NFData Control.Distributed.Process.Internal.Types.DiedReason instance Control.DeepSeq.NFData Control.Distributed.Process.Internal.Types.Identifier instance Control.DeepSeq.NFData Control.Distributed.Process.Internal.Types.Message instance Control.DeepSeq.NFData Control.Distributed.Process.Internal.Types.MonitorRef instance Control.DeepSeq.NFData Control.Distributed.Process.Internal.Types.NodeId instance Control.DeepSeq.NFData Control.Distributed.Process.Internal.Types.ProcessId instance Control.DeepSeq.NFData a => Control.DeepSeq.NFData (Control.Distributed.Process.Internal.Types.SendPort a) instance Control.DeepSeq.NFData Control.Distributed.Process.Internal.Types.SendPortId instance GHC.Classes.Ord Control.Distributed.Process.Internal.Types.Identifier instance GHC.Classes.Ord Control.Distributed.Process.Internal.Types.LocalProcessId instance GHC.Classes.Ord Control.Distributed.Process.Internal.Types.MonitorRef instance GHC.Classes.Ord Control.Distributed.Process.Internal.Types.NodeId instance GHC.Classes.Ord Control.Distributed.Process.Internal.Types.ProcessId instance GHC.Classes.Ord (Control.Distributed.Process.Internal.Types.SendPort a) instance GHC.Classes.Ord Control.Distributed.Process.Internal.Types.SendPortId instance GHC.Classes.Ord Control.Distributed.Process.Internal.Types.SpawnRef instance GHC.Show.Show Control.Distributed.Process.Internal.Types.DidSpawn instance GHC.Show.Show Control.Distributed.Process.Internal.Types.DiedReason instance GHC.Show.Show Control.Distributed.Process.Internal.Types.Identifier instance GHC.Show.Show Control.Distributed.Process.Internal.Types.ImplicitReconnect instance GHC.Show.Show Control.Distributed.Process.Internal.Types.LocalProcessId instance GHC.Show.Show Control.Distributed.Process.Internal.Types.Message instance GHC.Show.Show Control.Distributed.Process.Internal.Types.MonitorRef instance GHC.Show.Show Control.Distributed.Process.Internal.Types.NCMsg instance GHC.Show.Show Control.Distributed.Process.Internal.Types.NodeClosedException instance GHC.Show.Show Control.Distributed.Process.Internal.Types.NodeId instance GHC.Show.Show Control.Distributed.Process.Internal.Types.NodeLinkException instance GHC.Show.Show Control.Distributed.Process.Internal.Types.NodeMonitorNotification instance GHC.Show.Show Control.Distributed.Process.Internal.Types.NodeStats instance GHC.Show.Show Control.Distributed.Process.Internal.Types.PortLinkException instance GHC.Show.Show Control.Distributed.Process.Internal.Types.PortMonitorNotification instance GHC.Show.Show Control.Distributed.Process.Internal.Types.ProcessExitException instance GHC.Show.Show Control.Distributed.Process.Internal.Types.ProcessId instance GHC.Show.Show Control.Distributed.Process.Internal.Types.ProcessInfo instance GHC.Show.Show Control.Distributed.Process.Internal.Types.ProcessInfoNone instance GHC.Show.Show Control.Distributed.Process.Internal.Types.ProcessLinkException instance GHC.Show.Show Control.Distributed.Process.Internal.Types.ProcessMonitorNotification instance GHC.Show.Show Control.Distributed.Process.Internal.Types.ProcessRegistrationException instance GHC.Show.Show Control.Distributed.Process.Internal.Types.ProcessSignal instance GHC.Show.Show Control.Distributed.Process.Internal.Types.RegisterReply instance GHC.Show.Show (Control.Distributed.Process.Internal.Types.SendPort a) instance GHC.Show.Show Control.Distributed.Process.Internal.Types.SendPortId instance GHC.Show.Show Control.Distributed.Process.Internal.Types.SpawnRef instance GHC.Show.Show Control.Distributed.Process.Internal.Types.WhereIsReply module Control.Distributed.Process.Management.Internal.Types -- | A newtype wrapper for an agent id (which is a string). newtype MxAgentId MxAgentId :: String -> MxAgentId [agentId] :: MxAgentId -> String data MxAgentState s MxAgentState :: !MxAgentId -> !TChan Message -> !s -> MxAgentState s [mxAgentId] :: MxAgentState s -> !MxAgentId [mxBus] :: MxAgentState s -> !TChan Message [mxLocalState] :: MxAgentState s -> !s -- | Monad for management agents. newtype MxAgent s a MxAgent :: StateT (MxAgentState s) Process a -> MxAgent s a [unAgent] :: MxAgent s a -> StateT (MxAgentState s) Process a -- | Represents the actions a management agent can take when evaluating an -- event sink. data MxAction MxAgentDeactivate :: !String -> MxAction MxAgentPrioritise :: !ChannelSelector -> MxAction MxAgentReady :: MxAction MxAgentSkip :: MxAction data ChannelSelector InputChan :: ChannelSelector Mailbox :: ChannelSelector -- | Gross though it is, this synonym represents a function used to forking -- new processes, which has to be passed as a HOF when calling -- mxAgentController, since there's no other way to avoid a circular -- dependency with Node.hs type Fork = Process () -> IO ProcessId -- | Type of a management agent's event sink. type MxSink s = Message -> MxAgent s Maybe MxAction -- | This is the default management event, fired for various -- internal events around the NT connection and Process lifecycle. All -- published events that conform to this type, are eligible for tracing - -- i.e., they will be delivered to the trace controller. data MxEvent -- | fired whenever a local process is spawned MxSpawned :: ProcessId -> MxEvent -- | fired whenever a process/name is registered (locally) MxRegistered :: ProcessId -> String -> MxEvent -- | fired whenever a process/name is unregistered (locally) MxUnRegistered :: ProcessId -> String -> MxEvent -- | fired whenever a process dies MxProcessDied :: ProcessId -> DiedReason -> MxEvent -- | fired whenever a node dies (i.e., the connection is -- broken/disconnected) MxNodeDied :: NodeId -> DiedReason -> MxEvent -- | fired whenever a message is sent from a local process MxSent :: ProcessId -> ProcessId -> Message -> MxEvent -- | fired whenever a named send occurs MxSentToName :: String -> ProcessId -> Message -> MxEvent -- | fired whenever a sendChan occurs MxSentToPort :: ProcessId -> SendPortId -> Message -> MxEvent -- | fired whenever a message is received by a local process MxReceived :: ProcessId -> Message -> MxEvent -- | fired whenever a message is received via a typed channel MxReceivedPort :: SendPortId -> Message -> MxEvent -- | fired when a network-transport connection is first established MxConnected :: ConnectionId -> EndPointAddress -> MxEvent -- | fired when a network-transport connection is broken/disconnected MxDisconnected :: ConnectionId -> EndPointAddress -> MxEvent -- | a user defined trace event MxUser :: Message -> MxEvent -- | a logging event - used for debugging purposes only MxLog :: String -> MxEvent -- | notifies a trace listener that all subsequent traces will be sent to -- pid MxTraceTakeover :: ProcessId -> MxEvent -- | notifies a trace listener that it has been disabled/removed MxTraceDisable :: MxEvent -- | The class of things that we might be able to resolve to a -- ProcessId (or not). class Addressable a resolveToPid :: Addressable a => a -> Maybe ProcessId instance Control.Distributed.Process.Management.Internal.Types.Addressable Control.Distributed.Process.Management.Internal.Types.MxEvent instance GHC.Base.Applicative (Control.Distributed.Process.Management.Internal.Types.MxAgent s) instance Data.Binary.Class.Binary Control.Distributed.Process.Management.Internal.Types.MxAgentId instance Data.Binary.Class.Binary Control.Distributed.Process.Management.Internal.Types.MxEvent instance GHC.Classes.Eq Control.Distributed.Process.Management.Internal.Types.MxAgentId instance GHC.Base.Functor (Control.Distributed.Process.Management.Internal.Types.MxAgent s) instance GHC.Generics.Generic Control.Distributed.Process.Management.Internal.Types.MxEvent instance Control.Monad.Fix.MonadFix (Control.Distributed.Process.Management.Internal.Types.MxAgent s) instance Control.Monad.IO.Class.MonadIO (Control.Distributed.Process.Management.Internal.Types.MxAgent s) instance GHC.Base.Monad (Control.Distributed.Process.Management.Internal.Types.MxAgent s) instance Control.Monad.State.Class.MonadState (Control.Distributed.Process.Management.Internal.Types.MxAgentState s) (Control.Distributed.Process.Management.Internal.Types.MxAgent s) instance GHC.Classes.Ord Control.Distributed.Process.Management.Internal.Types.MxAgentId instance GHC.Show.Show Control.Distributed.Process.Management.Internal.Types.MxEvent -- | Interface to the management event bus. module Control.Distributed.Process.Management.Internal.Bus publishEvent :: MxEventBus -> Message -> IO () -- | Tracing/Debugging support - Types module Control.Distributed.Process.Management.Internal.Trace.Types data SetTrace TraceEnable :: !ProcessId -> SetTrace TraceDisable :: SetTrace -- | Defines which processes will be traced by a given TraceFlag, -- either by name, or ProcessId. Choosing TraceAll is -- by far the most efficient approach, as the tracer process -- therefore avoids deciding whether or not a trace event is viable. data TraceSubject TraceAll :: TraceSubject TraceProcs :: !Set ProcessId -> TraceSubject TraceNames :: !Set String -> TraceSubject -- | Defines what will be traced. Flags that control tracing of -- Process events, take a TraceSubject controlling which -- processes should generate trace events in the target process. data TraceFlags TraceFlags :: !Maybe TraceSubject -> !Maybe TraceSubject -> !Maybe TraceSubject -> !Maybe TraceSubject -> !Maybe TraceSubject -> !Maybe TraceSubject -> !Bool -> !Bool -> TraceFlags [traceSpawned] :: TraceFlags -> !Maybe TraceSubject [traceDied] :: TraceFlags -> !Maybe TraceSubject [traceRegistered] :: TraceFlags -> !Maybe TraceSubject [traceUnregistered] :: TraceFlags -> !Maybe TraceSubject [traceSend] :: TraceFlags -> !Maybe TraceSubject [traceRecv] :: TraceFlags -> !Maybe TraceSubject [traceNodes] :: TraceFlags -> !Bool [traceConnections] :: TraceFlags -> !Bool data TraceArg TraceStr :: String -> TraceArg Trace :: a -> TraceArg -- | A generic ok response from the trace coordinator. data TraceOk TraceOk :: TraceOk traceLog :: MxEventBus -> String -> IO () traceLogFmt :: MxEventBus -> String -> [TraceArg] -> IO () traceEvent :: MxEventBus -> MxEvent -> IO () traceMessage :: Serializable m => MxEventBus -> m -> IO () defaultTraceFlags :: TraceFlags enableTrace :: MxEventBus -> ProcessId -> IO () enableTraceSync :: MxEventBus -> SendPort TraceOk -> ProcessId -> IO () disableTrace :: MxEventBus -> IO () disableTraceSync :: MxEventBus -> SendPort TraceOk -> IO () getTraceFlags :: MxEventBus -> SendPort TraceFlags -> IO () setTraceFlags :: MxEventBus -> TraceFlags -> IO () setTraceFlagsSync :: MxEventBus -> SendPort TraceOk -> TraceFlags -> IO () getCurrentTraceClient :: MxEventBus -> SendPort (Maybe ProcessId) -> IO () instance Data.Binary.Class.Binary Control.Distributed.Process.Management.Internal.Trace.Types.SetTrace instance Data.Binary.Class.Binary Control.Distributed.Process.Management.Internal.Trace.Types.TraceFlags instance Data.Binary.Class.Binary Control.Distributed.Process.Management.Internal.Trace.Types.TraceOk instance Data.Binary.Class.Binary Control.Distributed.Process.Management.Internal.Trace.Types.TraceSubject instance GHC.Classes.Eq Control.Distributed.Process.Management.Internal.Trace.Types.SetTrace instance GHC.Generics.Generic Control.Distributed.Process.Management.Internal.Trace.Types.SetTrace instance GHC.Generics.Generic Control.Distributed.Process.Management.Internal.Trace.Types.TraceFlags instance GHC.Generics.Generic Control.Distributed.Process.Management.Internal.Trace.Types.TraceOk instance GHC.Generics.Generic Control.Distributed.Process.Management.Internal.Trace.Types.TraceSubject instance GHC.Show.Show Control.Distributed.Process.Management.Internal.Trace.Types.SetTrace instance GHC.Show.Show Control.Distributed.Process.Management.Internal.Trace.Types.TraceFlags instance GHC.Show.Show Control.Distributed.Process.Management.Internal.Trace.Types.TraceSubject module Control.Distributed.Process.Internal.Messaging sendPayload :: LocalNode -> Identifier -> Identifier -> ImplicitReconnect -> [ByteString] -> IO () sendBinary :: Binary a => LocalNode -> Identifier -> Identifier -> ImplicitReconnect -> a -> IO () sendMessage :: Serializable a => LocalNode -> Identifier -> Identifier -> ImplicitReconnect -> a -> IO () disconnect :: LocalNode -> Identifier -> Identifier -> IO () closeImplicitReconnections :: LocalNode -> Identifier -> IO () -- | a impliesDeathOf b is true if the death of a -- (for instance, a node) implies the death of b (for instance, -- a process on that node) impliesDeathOf :: Identifier -> Identifier -> Bool sendCtrlMsg :: Maybe NodeId -> ProcessSignal -> Process () -- |
-- do -- self <- getSelfPid -- send self (wrapMessage "blah") -- Nothing <- expectTimeout 1000000 :: Process (Maybe String) -- (Just m) <- expectTimeout 1000000 :: Process (Maybe Message) -- (Just "blah") <- unwrapMessage m :: Process (Maybe String) --wrapMessage :: Serializable a => a -> Message -- | This is the unsafe variant of wrapMessage. See -- Control.Distributed.Process.UnsafePrimitives for details. unsafeWrapMessage :: Serializable a => a -> Message -- | Attempt to unwrap a raw Message. If the type of the decoded -- message payload matches the expected type, the value will be returned -- with Just, otherwise Nothing indicates the types do -- not match. -- -- This expression, for example, will evaluate to Nothing > -- unwrapMessage (wrapMessage "foobar") :: Process (Maybe Int) -- -- Whereas this expression, will yield Just "foo" > -- unwrapMessage (wrapMessage "foo") :: Process (Maybe String) unwrapMessage :: (Monad m, Serializable a) => Message -> m (Maybe a) -- | Attempt to handle a raw Message. If the type of the message -- matches the type of the first argument to the supplied expression, -- then the message will be decoded and the expression evaluated against -- its value. If this runtime type checking fails however, -- Nothing will be returned to indicate the fact. If the check -- succeeds and evaluation proceeds, the resulting value with be wrapped -- with Just. -- -- Intended for use in catchesExit and matchAny primitives. handleMessage :: (Monad m, Serializable a) => Message -> (a -> m b) -> m (Maybe b) -- | Conditionally handle a raw Message. If the predicate (a -- -> Bool) evaluates to True, invokes the supplied -- handler, other returns Nothing to indicate failure. See -- handleMessage for further information about runtime type -- checking. handleMessageIf :: (Monad m, Serializable a) => Message -> (a -> Bool) -> (a -> m b) -> m (Maybe b) -- | As handleMessage but ignores result, which is useful if you -- don't care whether or not the handler succeeded. handleMessage_ :: (Monad m, Serializable a) => Message -> (a -> m ()) -> m () -- | Conditional version of handleMessage_. handleMessageIf_ :: (Monad m, Serializable a) => Message -> (a -> Bool) -> (a -> m ()) -> m () -- | Forward a raw Message to the given ProcessId. forward :: Message -> ProcessId -> Process () -- | Forward a raw Message to the given ProcessId. -- -- Unlike forward, this function is insensitive to -- reconnect. It will try to send the message regardless of the -- history of connection failures between the nodes. uforward :: Message -> ProcessId -> Process () -- | Receives messages and forwards them to pid if p msg == -- True. delegate :: ProcessId -> (Message -> Bool) -> Process () -- | A straight relay that forwards all messages to the supplied pid. relay :: ProcessId -> Process () -- | Proxies pid and forwards messages whenever proc -- evaluates to True. Unlike delegate the predicate -- proc runs in the Process monad, allowing for richer -- proxy behaviour. If proc returns False or the -- runtime type check fails, no action is taken and the proxy -- process will continue running. proxy :: Serializable a => ProcessId -> (a -> Process Bool) -> Process () -- | Terminate immediately (throws a ProcessTerminationException) terminate :: Process a -- | Thrown by terminate data ProcessTerminationException ProcessTerminationException :: ProcessTerminationException -- | Die immediately - throws a ProcessExitException with the given -- reason. die :: Serializable a => a -> Process b -- | Forceful request to kill a process. Where exit provides an -- exception that can be caught and handled, kill throws an -- unexposed exception type which cannot be handled explicitly (by type). kill :: ProcessId -> String -> Process () -- | Graceful request to exit a process. Throws ProcessExitException -- with the supplied reason encoded as a message. Any exit -- signal raised in this manner can be handled using the -- catchExit family of functions. exit :: Serializable a => ProcessId -> a -> Process () -- | Catches ProcessExitException. The handler will not be applied -- unless its type matches the encoded data stored in the exception (see -- the reason argument given to the exit primitive). If the -- handler cannot be applied, the exception will be re-thrown. -- -- To handle ProcessExitException without regard for -- reason, see catch. To handle multiple reasons of -- differing types, see catchesExit. catchExit :: (Show a, Serializable a) => Process b -> (ProcessId -> a -> Process b) -> Process b -- | Lift catches (almost). -- -- As ProcessExitException stores the exit reason as a -- typed, encoded message, a handler must accept inputs of the expected -- type. In order to handle a list of potentially different handlers (and -- therefore input types), a handler passed to catchesExit must -- accept Message and return Maybe (i.e., Just p -- if it handled the exit reason, otherwise Nothing). -- -- See maybeHandleMessage and Message for more details. catchesExit :: Process b -> [ProcessId -> Message -> Process (Maybe b)] -> Process b -- | Internal exception thrown indirectly by exit data ProcessExitException -- | Our own process ID getSelfPid :: Process ProcessId -- | Get the node ID of our local node getSelfNode :: Process NodeId -- | Provide information about a running process data ProcessInfo ProcessInfo :: NodeId -> [String] -> Int -> [(ProcessId, MonitorRef)] -> [ProcessId] -> ProcessInfo [infoNode] :: ProcessInfo -> NodeId [infoRegisteredNames] :: ProcessInfo -> [String] [infoMessageQueueLength] :: ProcessInfo -> Int [infoMonitors] :: ProcessInfo -> [(ProcessId, MonitorRef)] [infoLinks] :: ProcessInfo -> [ProcessId] -- | Get information about the specified process getProcessInfo :: ProcessId -> Process (Maybe ProcessInfo) data NodeStats NodeStats :: NodeId -> Int -> Int -> Int -> Int -> NodeStats [nodeStatsNode] :: NodeStats -> NodeId [nodeStatsRegisteredNames] :: NodeStats -> Int [nodeStatsMonitors] :: NodeStats -> Int [nodeStatsLinks] :: NodeStats -> Int [nodeStatsProcesses] :: NodeStats -> Int -- | Get statistics about the specified node getNodeStats :: NodeId -> Process (Either DiedReason NodeStats) -- | Get statistics about our local node getLocalNodeStats :: Process NodeStats -- | Link to a remote process (asynchronous) -- -- When process A links to process B (that is, process A calls link -- pidB) then an asynchronous exception will be thrown to process A -- when process B terminates (normally or abnormally), or when process A -- gets disconnected from process B. Although it is technically -- possible to catch these exceptions, chances are if you find yourself -- trying to do so you should probably be using monitor rather -- than link. In particular, code such as -- --
-- link pidB -- Link to process B -- expect -- Wait for a message from process B -- unlink pidB -- Unlink again ---- -- doesn't quite do what one might expect: if process B sends a message -- to process A, and subsequently terminates, then process A might -- or might not be terminated too, depending on whether the exception is -- thrown before or after the unlink (i.e., this code has a race -- condition). -- -- Linking is all-or-nothing: A is either linked to B, or it's not. A -- second call to link has no effect. -- -- Note that link provides unidirectional linking (see -- spawnSupervised). Linking makes no distinction between normal -- and abnormal termination of the remote process. link :: ProcessId -> Process () -- | Remove a link -- -- This is synchronous in the sense that once it returns you are -- guaranteed that no exception will be raised if the remote process -- dies. However, it is asynchronous in the sense that we do not wait for -- a response from the remote node. unlink :: ProcessId -> Process () -- | Monitor another process (asynchronous) -- -- When process A monitors process B (that is, process A calls -- monitor pidB) then process A will receive a -- ProcessMonitorNotification when process B terminates (normally -- or abnormally), or when process A gets disconnected from process B. -- You receive this message like any other (using expect); the -- notification includes a reason (DiedNormal, -- DiedException, DiedDisconnect, etc.). -- -- Every call to monitor returns a new monitor reference -- MonitorRef; if multiple monitors are set up, multiple -- notifications will be delivered and monitors can be disabled -- individually using unmonitor. monitor :: ProcessId -> Process MonitorRef -- | Remove a monitor -- -- This has the same synchronous/asynchronous nature as unlink. -- -- ProcessMonitorNotification messages for the given MonitorRef are -- removed from the mailbox. unmonitor :: MonitorRef -> Process () -- | Remove a monitor (asynchronous) unmonitorAsync :: MonitorRef -> Process () -- | Establishes temporary monitoring of another process. -- -- withMonitor pid code sets up monitoring of pid for -- the duration of code. Note: although monitoring is no longer -- active when withMonitor returns, there might still be -- unreceived monitor messages in the queue. withMonitor :: ProcessId -> (MonitorRef -> Process a) -> Process a -- | Establishes temporary monitoring of another process. -- -- withMonitor_ pid code sets up monitoring of pid for -- the duration of code. Note: although monitoring is no longer -- active when withMonitor_ returns, there might still be -- unreceived monitor messages in the queue. -- -- Since 0.6.1 withMonitor_ :: ProcessId -> Process a -> Process a data SayMessage SayMessage :: UTCTime -> ProcessId -> String -> SayMessage [sayTime] :: SayMessage -> UTCTime [sayProcess] :: SayMessage -> ProcessId [sayMessage] :: SayMessage -> String -- | Log a string -- -- say message sends a message of type SayMessage with -- the current time and ProcessId of the current process to the -- process registered as logger. By default, this process simply -- sends the string to stderr. Individual Cloud Haskell backends -- might replace this with a different logger process, however. say :: String -> Process () -- | Register a process with the local registry (synchronous). The name -- must not already be registered. The process need not be on this node. -- A bad registration will result in a -- ProcessRegistrationException -- -- The process to be registered does not have to be local itself. register :: String -> ProcessId -> Process () -- | Like register, but will replace an existing registration. The -- name must already be registered. reregister :: String -> ProcessId -> Process () -- | Remove a process from the local registry (asynchronous). This version -- will wait until a response is gotten from the management process. The -- name must already be registered. unregister :: String -> Process () -- | Query the local process registry whereis :: String -> Process (Maybe ProcessId) -- | Named send to a process in the local registry (asynchronous) nsend :: Serializable a => String -> a -> Process () -- | Register a process with a remote registry (asynchronous). -- -- The process to be registered does not have to live on the same remote -- node. Reply wil come in the form of a RegisterReply message -- -- See comments in whereisRemoteAsync registerRemoteAsync :: NodeId -> String -> ProcessId -> Process () reregisterRemoteAsync :: NodeId -> String -> ProcessId -> Process () -- | Remove a process from a remote registry (asynchronous). -- -- Reply wil come in the form of a RegisterReply message -- -- See comments in whereisRemoteAsync unregisterRemoteAsync :: NodeId -> String -> Process () -- | Query a remote process registry (asynchronous) -- -- Reply will come in the form of a WhereIsReply message. -- -- There is currently no synchronous version of -- whereisRemoteAsync: if you implement one yourself, be sure to -- take into account that the remote node might die or get disconnect -- before it can respond (i.e. you should use monitorNode and take -- appropriate action when you receive a NodeMonitorNotification). whereisRemoteAsync :: NodeId -> String -> Process () -- | Named send to a process in a remote registry (asynchronous) nsendRemote :: Serializable a => NodeId -> String -> a -> Process () -- | Resolve a closure unClosure :: Typeable a => Closure a -> Process a -- | Resolve a static value unStatic :: Typeable a => Static a -> Process a -- | Lift catch -- | Deprecated: Use Control.Monad.Catch.catch instead catch :: Exception e => Process a -> (e -> Process a) -> Process a -- | You need this when using catches data Handler a Handler :: (e -> Process a) -> Handler a -- | Lift catches catches :: Process a -> [Handler a] -> Process a -- | Lift try -- | Deprecated: Use Control.Monad.Catch.try instead try :: Exception e => Process a -> Process (Either e a) -- | Lift mask -- | Deprecated: Use Control.Monad.Catch.mask_ instead mask :: ((forall a. () => Process a -> Process a) -> Process b) -> Process b -- | Lift mask_ -- | Deprecated: Use Control.Monad.Catch.mask_ instead mask_ :: Process a -> Process a -- | Lift onException -- | Deprecated: Use Control.Monad.Catch.onException instead onException :: Process a -> Process b -> Process a -- | Lift bracket -- | Deprecated: Use Control.Monad.Catch.bracket instead bracket :: Process a -> (a -> Process b) -> (a -> Process c) -> Process c -- | Lift bracket_ -- | Deprecated: Use Control.Monad.Catch.bracket_ instead bracket_ :: Process a -> Process b -> Process c -> Process c -- | Lift finally -- | Deprecated: Use Control.Monad.Catch.finally instead finally :: Process a -> Process b -> Process a -- | Like expect but with a timeout expectTimeout :: Serializable a => Int -> Process (Maybe a) -- | Like receiveChan but with a timeout. If the timeout is 0, do a -- non-blocking check for a message. receiveChanTimeout :: Serializable a => Int -> ReceivePort a -> Process (Maybe a) -- | Asynchronous version of spawn -- -- (spawn is defined in terms of spawnAsync and -- expect) spawnAsync :: NodeId -> Closure (Process ()) -> Process SpawnRef -- | Link to a node (asynchronous) linkNode :: NodeId -> Process () -- | Link to a channel (asynchronous) linkPort :: SendPort a -> Process () -- | Remove a node link -- -- This has the same synchronous/asynchronous nature as unlink. unlinkNode :: NodeId -> Process () -- | Remove a channel (send port) link -- -- This has the same synchronous/asynchronous nature as unlink. unlinkPort :: SendPort a -> Process () -- | Monitor a node (asynchronous) monitorNode :: NodeId -> Process MonitorRef -- | Monitor a typed channel (asynchronous) monitorPort :: Serializable a => SendPort a -> Process MonitorRef -- | Cloud Haskell provides the illusion of connection-less, reliable, -- ordered message passing. However, when network connections get -- disrupted this illusion cannot always be maintained. Once a network -- connection breaks (even temporarily) no further communication on that -- connection will be possible. For example, if process A sends a message -- to process B, and A is then notified (by monitor notification) that it -- got disconnected from B, A will not be able to send any further -- messages to B, unless A explicitly indicates that it is -- acceptable to attempt to reconnect to B using the Cloud Haskell -- reconnect primitive. -- -- Importantly, when A calls reconnect it acknowledges that some -- messages to B might have been lost. For instance, if A sends messages -- m1 and m2 to B, then receives a monitor notification that its -- connection to B has been lost, calls reconnect and then sends -- m3, it is possible that B will receive m1 and m3 but not m2. -- -- Note that reconnect does not mean reconnect now but -- rather /it is okay to attempt to reconnect on the next send/. In -- particular, if no further communication attempts are made to B then A -- can use reconnect to clean up its connection to B. reconnect :: ProcessId -> Process () -- | Reconnect to a sendport. See reconnect for more information. reconnectPort :: SendPort a -> Process () sendCtrlMsg :: Maybe NodeId -> ProcessSignal -> Process () instance Data.Binary.Class.Binary Control.Distributed.Process.Internal.Primitives.SayMessage instance GHC.Exception.Type.Exception Control.Distributed.Process.Internal.Primitives.ProcessTerminationException instance GHC.Base.Functor Control.Distributed.Process.Internal.Primitives.Handler instance GHC.Base.Functor Control.Distributed.Process.Internal.Primitives.Match instance GHC.Show.Show Control.Distributed.Process.Internal.Primitives.ProcessTerminationException instance GHC.Show.Show Control.Distributed.Process.Internal.Primitives.SayMessage -- | Keeps the tracing API calls separate from the Tracer implementation, -- which allows us to avoid a nasty import cycle between tracing and the -- messaging primitives that rely on it, and also between the node -- controller (which requires access to the tracing related elements of -- our RemoteTable) and the Debug module, which requires -- forkProcess. This module is also used by the management -- agent, which relies on the tracing infrastructure's messaging fabric. module Control.Distributed.Process.Management.Internal.Trace.Primitives -- | Send a log message to the internal tracing facility. If tracing is -- enabled, this will create a custom trace log event. traceLog :: String -> Process () -- | Send a log message to the internal tracing facility, using the given -- list of printable TraceArgs interspersed with the preceding -- delimiter. traceLogFmt :: String -> [TraceArg] -> Process () -- | Send an arbitrary Message to the tracer process. traceMessage :: Serializable m => m -> Process () defaultTraceFlags :: TraceFlags -- | Enable tracing to the supplied process and wait for a TraceOk -- response from the trace coordinator process. enableTrace :: ProcessId -> Process () -- | Enable tracing to the supplied process. enableTraceAsync :: ProcessId -> Process () -- | Disable the currently configured trace and wait for a TraceOk -- response from the trace coordinator process. disableTrace :: Process () -- | Disable the currently configured trace. disableTraceAsync :: Process () getTraceFlags :: Process TraceFlags -- | Set the given flags for the current tracer and wait for a -- TraceOk response from the trace coordinator process. setTraceFlags :: TraceFlags -> Process () -- | Set the given flags for the current tracer. setTraceFlagsAsync :: TraceFlags -> Process () -- | Turn tracing for for a subset of trace targets. traceOnly :: Traceable a => [a] -> Maybe TraceSubject -- | Trace all targets. traceOn :: Maybe TraceSubject -- | Trace no targets. traceOff :: Maybe TraceSubject withLocalTracer :: (MxEventBus -> Process ()) -> Process () withRegisteredTracer :: (ProcessId -> Process a) -> Process a instance Control.Distributed.Process.Management.Internal.Trace.Primitives.Traceable GHC.Base.String instance Control.Distributed.Process.Management.Internal.Trace.Primitives.Traceable Control.Distributed.Process.Internal.Types.ProcessId -- | Tracing/Debugging support - Trace Implementation module Control.Distributed.Process.Management.Internal.Trace.Tracer traceController :: MVar (Weak (CQueue Message)) -> Process () defaultTracer :: Process () systemLoggerTracer :: Process () logfileTracer :: FilePath -> Process () eventLogTracer :: Process () module Control.Distributed.Process.Management.Internal.Agent -- | A triple containing a configured tracer, weak pointer to the agent -- controller's mailbox (CQueue) and an expression used to instantiate -- new agents on the current node. type AgentConfig = (Tracer, Weak CQueue Message, (TChan Message, TChan Message) -> Process () -> IO ProcessId) -- | Starts a management agent for the current node. The agent process must -- not crash or be killed, so we generally avoid publishing its -- ProcessId where possible. -- -- Our process is also responsible for forwarding messages to the trace -- controller, since having two special processes handled via the -- LocalNode would be inelegant. We forward messages directly to -- the trace controller's message queue, just as the MxEventBus -- that's set up on the LocalNode forwards messages directly to -- us. This optimises the code path for tracing and avoids overloading -- the node node controller's internal control plane with additional -- routing, at the cost of a little more complexity and two cases where -- we break encapsulation. mxAgentController :: Fork -> MVar AgentConfig -> Process () -- | Forks a new process in which an mxAgent is run. mxStartAgent :: Fork -> TChan Message -> ((TChan Message, TChan Message) -> Process ()) -> IO ProcessId -- | Start the tracer controller. startTracing :: Fork -> IO Tracer -- | Start a dead letter (agent) queue. -- -- If no agents are registered on the system, the management event bus -- will fill up and its data won't be GC'ed until someone comes along and -- reads from the broadcast channel (via dupTChan of course). This is -- effectively a leak, so to mitigate it, we start a dead letter -- queue that drains the event bus continuously, thus ensuring if -- there are no other consumers that we won't use up heap space -- unnecessarily. startDeadLetterQueue :: TChan Message -> IO () -- |
-- -- simple notification data type
--
-- data Registration = Reg { added :: Bool
-- , procId :: ProcessId
-- , name :: String
-- }
--
-- -- start a /name monitoring agent/
-- nameMonitorAgent = do
-- mxAgent (MxAgentId "name-monitor") Set.empty [
-- (mxSink $ \(pid :: ProcessId) -> do
-- mxUpdateState $ Set.insert pid
-- mxReady)
-- , (mxSink $
-- let act =
-- case ev of
-- (MxRegistered p n) -> notify True n p
-- (MxUnRegistered p n) -> notify False n p
-- _ -> return ()
-- act >> mxReady)
-- ]
-- where
-- notify a n p = do
-- Foldable.mapM_ (liftMX . deliver (Reg a n p)) =<< mxGetLocal
--
--
-- The client interface (for sending their pid) can take one of two
-- forms:
--
-- -- monitorNames = getSelfPid >>= nsend "name-monitor" -- monitorNames2 = getSelfPid >>= mxNotify ---- --
-- f :: T1 -> T2 -- f = ... -- -- remotable ['f] ---- -- with remotableDecl you would instead do -- --
-- remotableDecl [ -- [d| f :: T1 -> T2 ; -- f = ... -- |] -- ] ---- -- remotableDecl creates the function specified as well as the -- various dictionaries and static versions that remotable also -- creates. remotableDecl is sometimes necessary when you want to -- refer to, say, $(mkClosure 'f) within the definition of -- f itself. -- -- NOTE: remotableDecl creates __remoteTableDecl instead -- of __remoteTable so that you can use both remotable -- and remotableDecl within the same module. remotableDecl :: [Q [Dec]] -> Q [Dec] -- | Construct a static value. -- -- If f : forall a1 .. an. T then $(mkStatic 'f) :: forall -- a1 .. an. Static T. Be sure to pass f to -- remotable. mkStatic :: Name -> Q Exp -- | If f : T1 -> T2 is a monomorphic function then -- $(functionSDict 'f) :: Static (SerializableDict T1). -- -- Be sure to pass f to remotable. functionSDict :: Name -> Q Exp -- | If f : T1 -> Process T2 is a monomorphic function then -- $(functionTDict 'f) :: Static (SerializableDict T2). -- -- Be sure to pass f to remotable. functionTDict :: Name -> Q Exp -- | If f : T1 -> T2 then $(mkClosure 'f) :: T1 -> -- Closure T2. -- -- TODO: The current version of mkClosure is too polymorphic (@forall a. -- Binary a => a -> Closure T2). mkClosure :: Name -> Q Exp -- | Make a Closure from a static function. This is useful for -- making a closure for a top-level Process () function, because -- using mkClosure would require adding a dummy () -- argument. mkStaticClosure :: Name -> Q Exp -- |
-- import Control.Monad.Trans.State -- from the "transformers" library -- -- printState :: Show s => StateT s IO () -- printState = do -- state <- get -- liftIO $ print state ---- -- Had we omitted liftIO, we would have ended up with -- this error: -- --
-- • Couldn't match type ‘IO’ with ‘StateT s IO’ -- Expected type: StateT s IO () -- Actual type: IO () ---- -- The important part here is the mismatch between StateT s IO -- () and IO (). -- -- Luckily, we know of a function that takes an IO a and -- returns an (m a): liftIO, enabling us to run -- the program and see the expected results: -- --
-- > evalStateT printState "hello" -- "hello" -- -- > evalStateT printState 3 -- 3 --liftIO :: MonadIO m => IO a -> m a -- | Send a message send :: Serializable a => ProcessId -> a -> Process () -- | Send a message unreliably. -- -- Unlike send, this function is insensitive to reconnect. -- It will try to send the message regardless of the history of -- connection failures between the nodes. -- -- Message passing with usend is ordered for a given sender and -- receiver if the messages arrive at all. usend :: Serializable a => ProcessId -> a -> Process () -- | Wait for a message of a specific type expect :: Serializable a => Process a -- | Like expect but with a timeout expectTimeout :: Serializable a => Int -> Process (Maybe a) -- | The receive end of a typed channel (not serializable) -- -- Note that ReceivePort implements Functor, -- Applicative, Alternative and Monad. This is -- especially useful when merging receive ports. data ReceivePort a -- | The send send of a typed channel (serializable) data SendPort a -- | The (unique) ID of this send port sendPortId :: SendPort a -> SendPortId -- | Create a new typed channel, bound to the calling Process -- -- Note that the channel is bound to the lifecycle of the process that -- evaluates this function, such that when it dies/exits, the channel -- will no longer function, but will remain accessible. Thus reading from -- the ReceivePort will fail silently thereafter, blocking indefinitely -- (unless a timeout is used). newChan :: Serializable a => Process (SendPort a, ReceivePort a) -- | Send a message on a typed channel sendChan :: Serializable a => SendPort a -> a -> Process () -- | Wait for a message on a typed channel receiveChan :: Serializable a => ReceivePort a -> Process a -- | Like receiveChan but with a timeout. If the timeout is 0, do a -- non-blocking check for a message. receiveChanTimeout :: Serializable a => Int -> ReceivePort a -> Process (Maybe a) -- | Merge a list of typed channels. -- -- The result port is left-biased: if there are messages available on -- more than one port, the first available message is returned. mergePortsBiased :: Serializable a => [ReceivePort a] -> Process (ReceivePort a) -- | Like mergePortsBiased, but with a round-robin scheduler (rather -- than left-biased) mergePortsRR :: Serializable a => [ReceivePort a] -> Process (ReceivePort a) -- | Unsafe variant of send. This function makes no -- attempt to serialize and (in the case when the destination process -- resides on the same local node) therefore ensure that the payload is -- fully evaluated before it is delivered. unsafeSend :: Serializable a => ProcessId -> a -> Process () -- | Unsafe variant of usend. This function makes no -- attempt to serialize the message when the destination process resides -- on the same local node. Therefore, a local receiver would need to be -- prepared to cope with any errors resulting from evaluation of the -- message. unsafeUSend :: Serializable a => ProcessId -> a -> Process () -- | Send a message on a typed channel. This function makes no -- attempt to serialize and (in the case when the ReceivePort -- resides on the same local node) therefore ensure that the payload is -- fully evaluated before it is delivered. unsafeSendChan :: Serializable a => SendPort a -> a -> Process () -- | Named send to a process in the local registry (asynchronous). This -- function makes no attempt to serialize and (in the case when -- the destination process resides on the same local node) therefore -- ensure that the payload is fully evaluated before it is delivered. unsafeNSend :: Serializable a => String -> a -> Process () -- | Named send to a process in a remote registry (asynchronous) This -- function makes no attempt to serialize and (in the case when -- the destination process resides on the same local node) therefore -- ensure that the payload is fully evaluated before it is delivered. unsafeNSendRemote :: Serializable a => NodeId -> String -> a -> Process () -- | This is the unsafe variant of wrapMessage. See -- Control.Distributed.Process.UnsafePrimitives for details. unsafeWrapMessage :: Serializable a => a -> Message -- | Opaque type used in receiveWait and receiveTimeout data Match b -- | Test the matches in order against each message in the queue receiveWait :: [Match b] -> Process b -- | Like receiveWait but with a timeout. -- -- If the timeout is zero do a non-blocking check for matching messages. -- A non-zero timeout is applied only when waiting for incoming messages -- (that is, after we have checked the messages that are already -- in the mailbox). receiveTimeout :: Int -> [Match b] -> Process (Maybe b) -- | Match against any message of the right type match :: Serializable a => (a -> Process b) -> Match b -- | Match against any message of the right type that satisfies a predicate matchIf :: Serializable a => (a -> Bool) -> (a -> Process b) -> Match b -- | Remove any message from the queue matchUnknown :: Process b -> Match b -- | Match against an arbitrary message. matchAny removes the first -- available message from the process mailbox. To handle arbitrary -- raw messages once removed from the mailbox, see -- handleMessage and unwrapMessage. matchAny :: (Message -> Process b) -> Match b -- | Match against an arbitrary message. Intended for use with -- handleMessage and unwrapMessage, this function -- only removes a message from the process mailbox, if the -- supplied condition matches. The success (or failure) of runtime type -- checks deferred to handleMessage and friends is irrelevant -- here, i.e., if the condition evaluates to True then the -- message will be removed from the process mailbox and decoded, but that -- does not guarantee that an expression passed to -- handleMessage will pass the runtime type checks and therefore -- be evaluated. matchAnyIf :: Serializable a => (a -> Bool) -> (Message -> Process b) -> Match b -- | Match on a typed channel matchChan :: ReceivePort a -> (a -> Process b) -> Match b -- | Match on an arbitrary STM action. -- -- This rather unusaul match primitive allows us to compose -- arbitrary STM actions with checks against our process' mailbox and/or -- any typed channel ReceivePorts we may hold. -- -- This allows us to process multiple input streams along with our -- mailbox, in just the same way that matchChan supports -- checking both the mailbox and an arbitrary set of typed -- channels in one atomic transaction. -- -- Note there are no ordering guarnatees with respect to these disparate -- input sources. matchSTM :: STM a -> (a -> Process b) -> Match b -- | Messages consist of their typeRep fingerprint and their encoding data Message -- | Match against any message, regardless of the underlying (contained) -- type matchMessage :: (Message -> Process Message) -> Match Message -- | Match against any message (regardless of underlying type) that -- satisfies a predicate matchMessageIf :: (Message -> Bool) -> (Message -> Process Message) -> Match Message -- | internal use only. isEncoded :: Message -> Bool -- | Wrap a Serializable value in a Message. Note that -- Messages are Serializable - like the datum they contain -- - but also note, deserialising such a Message will yield a -- Message, not the type within it! To obtain the wrapped datum, -- use unwrapMessage or handleMessage with a specific type. -- --
-- do -- self <- getSelfPid -- send self (wrapMessage "blah") -- Nothing <- expectTimeout 1000000 :: Process (Maybe String) -- (Just m) <- expectTimeout 1000000 :: Process (Maybe Message) -- (Just "blah") <- unwrapMessage m :: Process (Maybe String) --wrapMessage :: Serializable a => a -> Message -- | Attempt to unwrap a raw Message. If the type of the decoded -- message payload matches the expected type, the value will be returned -- with Just, otherwise Nothing indicates the types do -- not match. -- -- This expression, for example, will evaluate to Nothing > -- unwrapMessage (wrapMessage "foobar") :: Process (Maybe Int) -- -- Whereas this expression, will yield Just "foo" > -- unwrapMessage (wrapMessage "foo") :: Process (Maybe String) unwrapMessage :: (Monad m, Serializable a) => Message -> m (Maybe a) -- | Attempt to handle a raw Message. If the type of the message -- matches the type of the first argument to the supplied expression, -- then the message will be decoded and the expression evaluated against -- its value. If this runtime type checking fails however, -- Nothing will be returned to indicate the fact. If the check -- succeeds and evaluation proceeds, the resulting value with be wrapped -- with Just. -- -- Intended for use in catchesExit and matchAny primitives. handleMessage :: (Monad m, Serializable a) => Message -> (a -> m b) -> m (Maybe b) -- | Conditionally handle a raw Message. If the predicate (a -- -> Bool) evaluates to True, invokes the supplied -- handler, other returns Nothing to indicate failure. See -- handleMessage for further information about runtime type -- checking. handleMessageIf :: (Monad m, Serializable a) => Message -> (a -> Bool) -> (a -> m b) -> m (Maybe b) -- | As handleMessage but ignores result, which is useful if you -- don't care whether or not the handler succeeded. handleMessage_ :: (Monad m, Serializable a) => Message -> (a -> m ()) -> m () -- | Conditional version of handleMessage_. handleMessageIf_ :: (Monad m, Serializable a) => Message -> (a -> Bool) -> (a -> m ()) -> m () -- | Forward a raw Message to the given ProcessId. forward :: Message -> ProcessId -> Process () -- | Forward a raw Message to the given ProcessId. -- -- Unlike forward, this function is insensitive to -- reconnect. It will try to send the message regardless of the -- history of connection failures between the nodes. uforward :: Message -> ProcessId -> Process () -- | Receives messages and forwards them to pid if p msg == -- True. delegate :: ProcessId -> (Message -> Bool) -> Process () -- | A straight relay that forwards all messages to the supplied pid. relay :: ProcessId -> Process () -- | Proxies pid and forwards messages whenever proc -- evaluates to True. Unlike delegate the predicate -- proc runs in the Process monad, allowing for richer -- proxy behaviour. If proc returns False or the -- runtime type check fails, no action is taken and the proxy -- process will continue running. proxy :: Serializable a => ProcessId -> (a -> Process Bool) -> Process () -- | Spawn a process -- -- For more information about Closure, see -- Control.Distributed.Process.Closure. -- -- See also call. spawn :: NodeId -> Closure (Process ()) -> Process ProcessId -- | Run a process remotely and wait for it to reply -- -- We monitor the remote process: if it dies before it can send a reply, -- we die too. -- -- For more information about Static, SerializableDict, and -- Closure, see Control.Distributed.Process.Closure. -- -- See also spawn. call :: Serializable a => Static (SerializableDict a) -> NodeId -> Closure (Process a) -> Process a -- | Terminate immediately (throws a ProcessTerminationException) terminate :: Process a -- | Die immediately - throws a ProcessExitException with the given -- reason. die :: Serializable a => a -> Process b -- | Forceful request to kill a process. Where exit provides an -- exception that can be caught and handled, kill throws an -- unexposed exception type which cannot be handled explicitly (by type). kill :: ProcessId -> String -> Process () -- | Graceful request to exit a process. Throws ProcessExitException -- with the supplied reason encoded as a message. Any exit -- signal raised in this manner can be handled using the -- catchExit family of functions. exit :: Serializable a => ProcessId -> a -> Process () -- | Catches ProcessExitException. The handler will not be applied -- unless its type matches the encoded data stored in the exception (see -- the reason argument given to the exit primitive). If the -- handler cannot be applied, the exception will be re-thrown. -- -- To handle ProcessExitException without regard for -- reason, see catch. To handle multiple reasons of -- differing types, see catchesExit. catchExit :: (Show a, Serializable a) => Process b -> (ProcessId -> a -> Process b) -> Process b -- | Lift catches (almost). -- -- As ProcessExitException stores the exit reason as a -- typed, encoded message, a handler must accept inputs of the expected -- type. In order to handle a list of potentially different handlers (and -- therefore input types), a handler passed to catchesExit must -- accept Message and return Maybe (i.e., Just p -- if it handled the exit reason, otherwise Nothing). -- -- See maybeHandleMessage and Message for more details. catchesExit :: Process b -> [ProcessId -> Message -> Process (Maybe b)] -> Process b -- | Thrown by terminate data ProcessTerminationException ProcessTerminationException :: ProcessTerminationException -- | Exception thrown when a process attempts to register a process under -- an already-registered name or to unregister a name that hasn't been -- registered. Returns the name and the identifier of the process that -- owns it, if any. data ProcessRegistrationException ProcessRegistrationException :: !String -> !Maybe ProcessId -> ProcessRegistrationException -- | SpawnRef are used to return pids of spawned processes data SpawnRef -- | Our own process ID getSelfPid :: Process ProcessId -- | Get the node ID of our local node getSelfNode :: Process NodeId -- | Provide information about a running process data ProcessInfo ProcessInfo :: NodeId -> [String] -> Int -> [(ProcessId, MonitorRef)] -> [ProcessId] -> ProcessInfo [infoNode] :: ProcessInfo -> NodeId [infoRegisteredNames] :: ProcessInfo -> [String] [infoMessageQueueLength] :: ProcessInfo -> Int [infoMonitors] :: ProcessInfo -> [(ProcessId, MonitorRef)] [infoLinks] :: ProcessInfo -> [ProcessId] -- | Get information about the specified process getProcessInfo :: ProcessId -> Process (Maybe ProcessInfo) data NodeStats NodeStats :: NodeId -> Int -> Int -> Int -> Int -> NodeStats [nodeStatsNode] :: NodeStats -> NodeId [nodeStatsRegisteredNames] :: NodeStats -> Int [nodeStatsMonitors] :: NodeStats -> Int [nodeStatsLinks] :: NodeStats -> Int [nodeStatsProcesses] :: NodeStats -> Int -- | Get statistics about the specified node getNodeStats :: NodeId -> Process (Either DiedReason NodeStats) -- | Get statistics about our local node getLocalNodeStats :: Process NodeStats -- | Link to a remote process (asynchronous) -- -- When process A links to process B (that is, process A calls link -- pidB) then an asynchronous exception will be thrown to process A -- when process B terminates (normally or abnormally), or when process A -- gets disconnected from process B. Although it is technically -- possible to catch these exceptions, chances are if you find yourself -- trying to do so you should probably be using monitor rather -- than link. In particular, code such as -- --
-- link pidB -- Link to process B -- expect -- Wait for a message from process B -- unlink pidB -- Unlink again ---- -- doesn't quite do what one might expect: if process B sends a message -- to process A, and subsequently terminates, then process A might -- or might not be terminated too, depending on whether the exception is -- thrown before or after the unlink (i.e., this code has a race -- condition). -- -- Linking is all-or-nothing: A is either linked to B, or it's not. A -- second call to link has no effect. -- -- Note that link provides unidirectional linking (see -- spawnSupervised). Linking makes no distinction between normal -- and abnormal termination of the remote process. link :: ProcessId -> Process () -- | Link to a node (asynchronous) linkNode :: NodeId -> Process () -- | Link to a channel (asynchronous) linkPort :: SendPort a -> Process () -- | Remove a link -- -- This is synchronous in the sense that once it returns you are -- guaranteed that no exception will be raised if the remote process -- dies. However, it is asynchronous in the sense that we do not wait for -- a response from the remote node. unlink :: ProcessId -> Process () -- | Remove a node link -- -- This has the same synchronous/asynchronous nature as unlink. unlinkNode :: NodeId -> Process () -- | Remove a channel (send port) link -- -- This has the same synchronous/asynchronous nature as unlink. unlinkPort :: SendPort a -> Process () -- | Monitor another process (asynchronous) -- -- When process A monitors process B (that is, process A calls -- monitor pidB) then process A will receive a -- ProcessMonitorNotification when process B terminates (normally -- or abnormally), or when process A gets disconnected from process B. -- You receive this message like any other (using expect); the -- notification includes a reason (DiedNormal, -- DiedException, DiedDisconnect, etc.). -- -- Every call to monitor returns a new monitor reference -- MonitorRef; if multiple monitors are set up, multiple -- notifications will be delivered and monitors can be disabled -- individually using unmonitor. monitor :: ProcessId -> Process MonitorRef -- | Monitor a node (asynchronous) monitorNode :: NodeId -> Process MonitorRef -- | Monitor a typed channel (asynchronous) monitorPort :: Serializable a => SendPort a -> Process MonitorRef -- | Remove a monitor -- -- This has the same synchronous/asynchronous nature as unlink. -- -- ProcessMonitorNotification messages for the given MonitorRef are -- removed from the mailbox. unmonitor :: MonitorRef -> Process () -- | Establishes temporary monitoring of another process. -- -- withMonitor pid code sets up monitoring of pid for -- the duration of code. Note: although monitoring is no longer -- active when withMonitor returns, there might still be -- unreceived monitor messages in the queue. withMonitor :: ProcessId -> (MonitorRef -> Process a) -> Process a -- | Establishes temporary monitoring of another process. -- -- withMonitor_ pid code sets up monitoring of pid for -- the duration of code. Note: although monitoring is no longer -- active when withMonitor_ returns, there might still be -- unreceived monitor messages in the queue. -- -- Since 0.6.1 withMonitor_ :: ProcessId -> Process a -> Process a -- | MonitorRef is opaque for regular Cloud Haskell processes data MonitorRef -- | Exceptions thrown when a linked process dies data ProcessLinkException ProcessLinkException :: !ProcessId -> !DiedReason -> ProcessLinkException -- | Exception thrown when a linked node dies data NodeLinkException NodeLinkException :: !NodeId -> !DiedReason -> NodeLinkException -- | Exception thrown when a linked channel (port) dies data PortLinkException PortLinkException :: !SendPortId -> !DiedReason -> PortLinkException -- | Message sent by process monitors data ProcessMonitorNotification ProcessMonitorNotification :: !MonitorRef -> !ProcessId -> !DiedReason -> ProcessMonitorNotification -- | Message sent by node monitors data NodeMonitorNotification NodeMonitorNotification :: !MonitorRef -> !NodeId -> !DiedReason -> NodeMonitorNotification -- | Message sent by channel (port) monitors data PortMonitorNotification PortMonitorNotification :: !MonitorRef -> !SendPortId -> !DiedReason -> PortMonitorNotification -- | Why did a process die? data DiedReason -- | Normal termination DiedNormal :: DiedReason -- | The process exited with an exception (provided as String -- because Exception does not implement Binary) DiedException :: !String -> DiedReason -- | We got disconnected from the process node DiedDisconnect :: DiedReason -- | The process node died DiedNodeDown :: DiedReason -- | Invalid (processnodechannel) identifier DiedUnknownId :: DiedReason -- | A closure is a static value and an encoded environment data Closure a closure :: Static (ByteString -> a) -> ByteString -> Closure a -- | A static value. Static is opaque; see staticLabel and -- staticApply. data Static a -- | Resolve a static value unStatic :: Typeable a => Static a -> Process a -- | Resolve a closure unClosure :: Typeable a => Closure a -> Process a -- | Runtime dictionary for unstatic lookups data RemoteTable -- | Log a string -- -- say message sends a message of type SayMessage with -- the current time and ProcessId of the current process to the -- process registered as logger. By default, this process simply -- sends the string to stderr. Individual Cloud Haskell backends -- might replace this with a different logger process, however. say :: String -> Process () -- | Register a process with the local registry (synchronous). The name -- must not already be registered. The process need not be on this node. -- A bad registration will result in a -- ProcessRegistrationException -- -- The process to be registered does not have to be local itself. register :: String -> ProcessId -> Process () -- | Like register, but will replace an existing registration. The -- name must already be registered. reregister :: String -> ProcessId -> Process () -- | Remove a process from the local registry (asynchronous). This version -- will wait until a response is gotten from the management process. The -- name must already be registered. unregister :: String -> Process () -- | Query the local process registry whereis :: String -> Process (Maybe ProcessId) -- | Named send to a process in the local registry (asynchronous) nsend :: Serializable a => String -> a -> Process () -- | Register a process with a remote registry (asynchronous). -- -- The process to be registered does not have to live on the same remote -- node. Reply wil come in the form of a RegisterReply message -- -- See comments in whereisRemoteAsync registerRemoteAsync :: NodeId -> String -> ProcessId -> Process () reregisterRemoteAsync :: NodeId -> String -> ProcessId -> Process () -- | Remove a process from a remote registry (asynchronous). -- -- Reply wil come in the form of a RegisterReply message -- -- See comments in whereisRemoteAsync unregisterRemoteAsync :: NodeId -> String -> Process () -- | Query a remote process registry (asynchronous) -- -- Reply will come in the form of a WhereIsReply message. -- -- There is currently no synchronous version of -- whereisRemoteAsync: if you implement one yourself, be sure to -- take into account that the remote node might die or get disconnect -- before it can respond (i.e. you should use monitorNode and take -- appropriate action when you receive a NodeMonitorNotification). whereisRemoteAsync :: NodeId -> String -> Process () -- | Named send to a process in a remote registry (asynchronous) nsendRemote :: Serializable a => NodeId -> String -> a -> Process () -- | (Asynchronous) reply from whereis data WhereIsReply WhereIsReply :: String -> Maybe ProcessId -> WhereIsReply -- | (Asynchronous) reply from register and unregister data RegisterReply RegisterReply :: String -> Bool -> Maybe ProcessId -> RegisterReply -- | Lift catch -- | Deprecated: Use Control.Monad.Catch.catch instead catch :: Exception e => Process a -> (e -> Process a) -> Process a -- | You need this when using catches data Handler a Handler :: (e -> Process a) -> Handler a -- | Lift catches catches :: Process a -> [Handler a] -> Process a -- | Lift try -- | Deprecated: Use Control.Monad.Catch.try instead try :: Exception e => Process a -> Process (Either e a) -- | Lift mask -- | Deprecated: Use Control.Monad.Catch.mask_ instead mask :: ((forall a. () => Process a -> Process a) -> Process b) -> Process b -- | Lift mask_ -- | Deprecated: Use Control.Monad.Catch.mask_ instead mask_ :: Process a -> Process a -- | Lift onException -- | Deprecated: Use Control.Monad.Catch.onException instead onException :: Process a -> Process b -> Process a -- | Lift bracket -- | Deprecated: Use Control.Monad.Catch.bracket instead bracket :: Process a -> (a -> Process b) -> (a -> Process c) -> Process c -- | Lift bracket_ -- | Deprecated: Use Control.Monad.Catch.bracket_ instead bracket_ :: Process a -> Process b -> Process c -> Process c -- | Lift finally -- | Deprecated: Use Control.Monad.Catch.finally instead finally :: Process a -> Process b -> Process a -- | Asynchronous version of spawn -- -- (spawn is defined in terms of spawnAsync and -- expect) spawnAsync :: NodeId -> Closure (Process ()) -> Process SpawnRef -- | Spawn a child process, have the child link to the parent and the -- parent monitor the child spawnSupervised :: NodeId -> Closure (Process ()) -> Process (ProcessId, MonitorRef) -- | Spawn a process and link to it -- -- Note that this is just the sequential composition of spawn and -- link. (The Unified semantics that underlies Cloud -- Haskell does not even support a synchronous link operation) spawnLink :: NodeId -> Closure (Process ()) -> Process ProcessId -- | Like spawnLink, but monitor the spawned process spawnMonitor :: NodeId -> Closure (Process ()) -> Process (ProcessId, MonitorRef) -- | Spawn a new process, supplying it with a new ReceivePort and -- return the corresponding SendPort. spawnChannel :: Serializable a => Static (SerializableDict a) -> NodeId -> Closure (ReceivePort a -> Process ()) -> Process (SendPort a) -- | (Asynchronius) reply from spawn data DidSpawn DidSpawn :: SpawnRef -> ProcessId -> DidSpawn -- | Spawn a process on the local node spawnLocal :: Process () -> Process ProcessId -- | Create a new typed channel, spawn a process on the local node, passing -- it the receive port, and return the send port spawnChannelLocal :: Serializable a => (ReceivePort a -> Process ()) -> Process (SendPort a) -- | Local version of call. Running a process in this way isolates -- it from messages sent to the caller process, and also allows silently -- dropping late or duplicate messages sent to the isolated process after -- it exits. Silently dropping messages may not always be the best -- approach. callLocal :: Process a -> Process a -- | Cloud Haskell provides the illusion of connection-less, reliable, -- ordered message passing. However, when network connections get -- disrupted this illusion cannot always be maintained. Once a network -- connection breaks (even temporarily) no further communication on that -- connection will be possible. For example, if process A sends a message -- to process B, and A is then notified (by monitor notification) that it -- got disconnected from B, A will not be able to send any further -- messages to B, unless A explicitly indicates that it is -- acceptable to attempt to reconnect to B using the Cloud Haskell -- reconnect primitive. -- -- Importantly, when A calls reconnect it acknowledges that some -- messages to B might have been lost. For instance, if A sends messages -- m1 and m2 to B, then receives a monitor notification that its -- connection to B has been lost, calls reconnect and then sends -- m3, it is possible that B will receive m1 and m3 but not m2. -- -- Note that reconnect does not mean reconnect now but -- rather /it is okay to attempt to reconnect on the next send/. In -- particular, if no further communication attempts are made to B then A -- can use reconnect to clean up its connection to B. reconnect :: ProcessId -> Process () -- | Reconnect to a sendport. See reconnect for more information. reconnectPort :: SendPort a -> Process () module Control.Distributed.Process.Internal.Closure.Explicit -- | A RemoteRegister is a trasformer on a RemoteTable to register -- additional static values. type RemoteRegister = RemoteTable -> RemoteTable class MkTDict a mkTDict :: MkTDict a => String -> a -> RemoteRegister -- | This takes an explicit name and a value, and produces both a static -- reference to the name and a RemoteRegister for it. mkStaticVal :: Serializable a => String -> a -> (Static a, RemoteRegister) -- | This takes an explicit name, a function of arity one, and creates a -- creates a function yielding a closure and a remote register for it. mkClosureValSingle :: (Serializable a, Typeable b, MkTDict b) => String -> (a -> b) -> (a -> Closure b, RemoteRegister) -- | This takes an explict name, a function of any arity, and creates a -- function yielding a closure and a remote register for it. mkClosureVal :: forall func argTuple result closureFunction. (Curry (argTuple -> Closure result) closureFunction, MkTDict result, Uncurry HTrue argTuple func result, Typeable result, Serializable argTuple, IsFunction func HTrue) => String -> func -> (closureFunction, RemoteRegister) -- | Works just like standard call, but with a simpler signature. call' :: Serializable a => NodeId -> Closure (Process a) -> Process a instance Data.Binary.Class.Binary Control.Distributed.Process.Internal.Closure.Explicit.EndOfTuple instance Control.Distributed.Process.Internal.Closure.Explicit.Curry ((a, Control.Distributed.Process.Internal.Closure.Explicit.EndOfTuple) -> b) (a -> b) instance Control.Distributed.Process.Internal.Closure.Explicit.Curry (b -> c) r => Control.Distributed.Process.Internal.Closure.Explicit.Curry ((a, b) -> c) (a -> r) instance (b GHC.Types.~ Control.Distributed.Process.Internal.Closure.Explicit.HTrue) => Control.Distributed.Process.Internal.Closure.Explicit.IsFunction (a -> c) b instance (b GHC.Types.~ Control.Distributed.Process.Internal.Closure.Explicit.HFalse) => Control.Distributed.Process.Internal.Closure.Explicit.IsFunction a b instance Control.Distributed.Process.Serializable.Serializable b => Control.Distributed.Process.Internal.Closure.Explicit.MkTDict (Control.Distributed.Process.Internal.Types.Process b) instance Control.Distributed.Process.Internal.Closure.Explicit.MkTDict a instance (Control.Distributed.Process.Internal.Closure.Explicit.IsFunction func b, Control.Distributed.Process.Internal.Closure.Explicit.Uncurry b args func result) => Control.Distributed.Process.Internal.Closure.Explicit.Uncurry'' args func result instance Control.Distributed.Process.Internal.Closure.Explicit.Uncurry Control.Distributed.Process.Internal.Closure.Explicit.HFalse Control.Distributed.Process.Internal.Closure.Explicit.EndOfTuple a a instance Control.Distributed.Process.Internal.Closure.Explicit.Uncurry'' rest f r => Control.Distributed.Process.Internal.Closure.Explicit.Uncurry Control.Distributed.Process.Internal.Closure.Explicit.HTrue (a, rest) (a -> f) r -- | Towards Haskell in the Cloud (Epstein et al., Haskell Symposium -- 2011) proposes a new type construct called static that -- characterizes values that are known statically. Cloud Haskell uses the -- Static implementation from Control.Distributed.Static. -- That module comes with its own extensive documentation, which you -- should read if you want to know the details. Here we explain the -- Template Haskell support only. -- --
-- f :: forall a1 .. an. T -- f = ... ---- -- you can use a Template Haskell splice to create a static version of -- f: -- --
-- $(mkStatic 'f) :: forall a1 .. an. (Typeable a1, .., Typeable an) => Static T ---- -- Every module that you write that contains calls to mkStatic -- needs to have a call to remotable: -- --
-- remotable [ 'f, 'g, ... ] ---- -- where you must pass every function (or other value) that you pass as -- an argument to mkStatic. The call to remotable will -- create a definition -- --
-- __remoteTable :: RemoteTable -> RemoteTable ---- -- which can be used to construct the RemoteTable used to -- initialize Cloud Haskell. You should have (at most) one call to -- remotable per module, and compose all created functions when -- initializing Cloud Haskell: -- --
-- let rtable :: RemoteTable -- rtable = M1.__remoteTable -- . M2.__remoteTable -- . ... -- . Mn.__remoteTable -- $ initRemoteTable ---- -- NOTE: If you get a type error from ghc along these lines -- --
-- The exact Name `a_a30k' is not in scope -- Probable cause: you used a unique name (NameU) in Template Haskell but did not bind it ---- -- then you need to enable the ScopedTypeVariables language -- extension. -- --
-- call :: Serializable a => Static (SerializableDict a) -> NodeId -> Closure (Process a) -> Process a ---- -- Given some serializable type T you can define -- --
-- sdictT :: SerializableDict T -- sdictT = SerializableDict ---- -- and then have -- --
-- $(mkStatic 'sdictT) :: Static (SerializableDict T) ---- -- However, since these dictionaries are so frequently required Cloud -- Haskell provides special support for them. When you call -- remotable on a monomorphic function f :: T1 -> -- T2 -- --
-- remotable ['f] ---- -- then a serialization dictionary is automatically created for you, -- which you can access with -- --
-- $(functionSDict 'f) :: Static (SerializableDict T1) ---- -- In addition, if f :: T1 -> Process T2, then a second -- dictionary is created -- --
-- $(functionTDict 'f) :: Static (SerializableDict T2) ---- --
-- isPrime :: Integer -> Process Bool ---- -- Then -- --
-- $(mkClosure 'isPrime) :: Integer -> Closure (Process Bool) ---- -- which you can then call, for example, to have a remote node -- check if a number is prime. -- -- In general, if you have a monomorphic function -- --
-- f :: T1 -> T2 ---- -- then -- --
-- $(mkClosure 'f) :: T1 -> Closure T2 ---- -- provided that T1 is serializable (*) (remember to pass -- f to remotable). -- -- (You can also create closures manually--see the documentation of -- Control.Distributed.Static for examples.) -- --
-- {-# LANGUAGE TemplateHaskell #-}
-- import System.Environment (getArgs)
-- import Control.Distributed.Process
-- import Control.Distributed.Process.Closure
-- import Control.Distributed.Process.Backend.SimpleLocalnet
-- import Control.Distributed.Process.Node (initRemoteTable)
--
-- isPrime :: Integer -> Process Bool
-- isPrime n = return . (n `elem`) . takeWhile (<= n) . sieve $ [2..]
-- where
-- sieve :: [Integer] -> [Integer]
-- sieve (p : xs) = p : sieve [x | x <- xs, x `mod` p > 0]
--
-- remotable ['isPrime]
--
-- master :: [NodeId] -> Process ()
-- master [] = liftIO $ putStrLn "no slaves"
-- master (slave:_) = do
-- isPrime79 <- call $(functionTDict 'isPrime) slave ($(mkClosure 'isPrime) (79 :: Integer))
-- liftIO $ print isPrime79
--
-- main :: IO ()
-- main = do
-- args <- getArgs
-- case args of
-- ["master", host, port] -> do
-- backend <- initializeBackend host port rtable
-- startMaster backend master
-- ["slave", host, port] -> do
-- backend <- initializeBackend host port rtable
-- startSlave backend
-- where
-- rtable :: RemoteTable
-- rtable = __remoteTable initRemoteTable
--
--
-- -- f :: T1 -> T2 -- f = ... -- -- remotable ['f] ---- -- with remotableDecl you would instead do -- --
-- remotableDecl [ -- [d| f :: T1 -> T2 ; -- f = ... -- |] -- ] ---- -- remotableDecl creates the function specified as well as the -- various dictionaries and static versions that remotable also -- creates. remotableDecl is sometimes necessary when you want to -- refer to, say, $(mkClosure 'f) within the definition of -- f itself. -- -- NOTE: remotableDecl creates __remoteTableDecl instead -- of __remoteTable so that you can use both remotable -- and remotableDecl within the same module. remotableDecl :: [Q [Dec]] -> Q [Dec] -- | Construct a static value. -- -- If f : forall a1 .. an. T then $(mkStatic 'f) :: forall -- a1 .. an. Static T. Be sure to pass f to -- remotable. mkStatic :: Name -> Q Exp -- | If f : T1 -> T2 then $(mkClosure 'f) :: T1 -> -- Closure T2. -- -- TODO: The current version of mkClosure is too polymorphic (@forall a. -- Binary a => a -> Closure T2). mkClosure :: Name -> Q Exp -- | Make a Closure from a static function. This is useful for -- making a closure for a top-level Process () function, because -- using mkClosure would require adding a dummy () -- argument. mkStaticClosure :: Name -> Q Exp -- | If f : T1 -> T2 is a monomorphic function then -- $(functionSDict 'f) :: Static (SerializableDict T1). -- -- Be sure to pass f to remotable. functionSDict :: Name -> Q Exp -- | If f : T1 -> Process T2 is a monomorphic function then -- $(functionTDict 'f) :: Static (SerializableDict T2). -- -- Be sure to pass f to remotable. functionTDict :: Name -> Q Exp