-- Hoogle documentation, generated by Haddock -- See Hoogle, http://www.haskell.org/hoogle/ -- | Cloud Haskell: Erlang-style concurrency in Haskell -- -- This is an implementation of Cloud Haskell, as described in Towards -- Haskell in the Cloud by Jeff Epstein, Andrew Black, and Simon -- Peyton Jones -- (http://research.microsoft.com/en-us/um/people/simonpj/papers/parallel/), -- although some of the details are different. The precise message -- passing semantics are based on A unified semantics for future -- Erlang by Hans Svensson, Lars-Åke Fredlund and Clara Benac Earle. -- You will probably also want to install a Cloud Haskell backend such as -- distributed-process-simplelocalnet. @package distributed-process @version 0.2.1.3 -- | Binary instances for TypeRep, and TypeRep -- equality (bug workaround) module Control.Distributed.Process.Internal.TypeRep -- | Compare two type representations -- -- For base >= 4.6 this compares fingerprints, but older versions of -- base have a bug in the fingerprint construction -- (http://hackage.haskell.org/trac/ghc/ticket/5962) compareTypeRep :: TypeRep -> TypeRep -> Bool instance Binary TyCon instance Binary TypeRep instance Binary Fingerprint -- | Reimplementation of Dynamic that supports dynBind -- -- We don't have access to the internal representation of Dynamic, -- otherwise we would not have to redefine it completely. Note that we -- use this only internally, so the incompatibility with our -- Dynamic from the standard Dynamic is not important. module Control.Distributed.Process.Internal.Dynamic data Dynamic Dynamic :: TypeRep -> Any -> Dynamic toDyn :: Typeable a => a -> Dynamic fromDyn :: Typeable a => Dynamic -> a -> a fromDynamic :: Typeable a => Dynamic -> Maybe a dynTypeRep :: Dynamic -> TypeRep dynApply :: Dynamic -> Dynamic -> Maybe Dynamic dynApp :: Dynamic -> Dynamic -> Dynamic dynBind :: TyCon -> (forall a b. m a -> (a -> m b) -> m b) -> Dynamic -> Dynamic -> Maybe Dynamic dynBind' :: TyCon -> (forall a b. m a -> (a -> m b) -> m b) -> Dynamic -> Dynamic -> Dynamic -- | Dynamically typed Kleisli composition dynKleisli :: TyCon -> (forall a b c. (a -> m b) -> (b -> m c) -> a -> m c) -> Dynamic -> Dynamic -> Maybe Dynamic -- | The function unsafeCoerce# allows you to side-step the -- typechecker entirely. That is, it allows you to coerce any type into -- any other type. If you use this function, you had better get it right, -- otherwise segmentation faults await. It is generally used when you -- want to write a program that you know is well-typed, but where -- Haskell's type system is not expressive enough to prove that it is -- well typed. -- -- The following uses of unsafeCoerce# are supposed to work -- (i.e. not lead to spurious compile-time or run-time crashes): -- -- -- -- Other uses of unsafeCoerce# are undefined. In particular, you -- should not use unsafeCoerce# to cast a T to an algebraic data -- type D, unless T is also an algebraic data type. For example, do not -- cast Int->Int to Bool, even if you later cast -- that Bool back to Int->Int before applying it. -- The reasons have to do with GHC's internal representation details (for -- the congnoscenti, data values can be entered but function closures -- cannot). If you want a safe type to cast things to, use Any, -- which is not an algebraic data type. unsafeCoerce# :: a -> b instance Typeable Dynamic instance Show Dynamic -- | Concurrent queue for single reader, single writer module Control.Distributed.Process.Internal.CQueue data CQueue a data BlockSpec NonBlocking :: BlockSpec Blocking :: BlockSpec Timeout :: Int -> BlockSpec newCQueue :: IO (CQueue a) enqueue :: CQueue a -> a -> IO () -- | Dequeue an element -- -- The timeout (if any) is applied only to waiting for incoming messages, -- not to checking messages that have already arrived dequeue :: CQueue a -> BlockSpec -> [a -> Maybe b] -> IO (Maybe b) module Control.Distributed.Process.Serializable -- | Objects that can be sent across the network class (Binary a, Typeable a) => Serializable a -- | Encode type representation as a bytestring encodeFingerprint :: Fingerprint -> ByteString -- | Decode a bytestring into a fingerprint. Throws an IO exception on -- failure decodeFingerprint :: ByteString -> Fingerprint -- | The fingerprint of the typeRep of the argument fingerprint :: Typeable a => a -> Fingerprint -- | Size of a fingerprint sizeOfFingerprint :: Int data Fingerprint :: * -- | Show fingerprint (for debugging purposes) showFingerprint :: Fingerprint -> ShowS instance (Binary a, Typeable a) => Serializable a -- | Types used throughout the Cloud Haskell framework -- -- We collect all types used internally in a single module because many -- of these data types are mutually recursive and cannot be split across -- modules. module Control.Distributed.Process.Internal.Types -- | Node identifier newtype NodeId NodeId :: EndPointAddress -> NodeId nodeAddress :: NodeId -> EndPointAddress -- | A local process ID consists of a seed which distinguishes processes -- from different instances of the same local node and a counter data LocalProcessId LocalProcessId :: Int32 -> Int32 -> LocalProcessId lpidUnique :: LocalProcessId -> Int32 lpidCounter :: LocalProcessId -> Int32 -- | Process identifier data ProcessId ProcessId :: NodeId -> LocalProcessId -> ProcessId -- | The ID of the node the process is running on processNodeId :: ProcessId -> NodeId -- | Node-local identifier for the process processLocalId :: ProcessId -> LocalProcessId -- | Union of all kinds of identifiers data Identifier NodeIdentifier :: NodeId -> Identifier ProcessIdentifier :: ProcessId -> Identifier SendPortIdentifier :: SendPortId -> Identifier nodeOf :: Identifier -> NodeId -- | Local nodes data LocalNode LocalNode :: NodeId -> EndPoint -> MVar LocalNodeState -> Chan NCMsg -> RemoteTable -> LocalNode -- | NodeId of the node localNodeId :: LocalNode -> NodeId -- | The network endpoint associated with this node localEndPoint :: LocalNode -> EndPoint -- | Local node state localState :: LocalNode -> MVar LocalNodeState -- | Channel for the node controller localCtrlChan :: LocalNode -> Chan NCMsg -- | Runtime lookup table for supporting closures TODO: this should be part -- of the CH state, not the local endpoint state remoteTable :: LocalNode -> RemoteTable -- | Local node state data LocalNodeState LocalNodeState :: Map LocalProcessId LocalProcess -> Int32 -> Int32 -> LocalNodeState _localProcesses :: LocalNodeState -> Map LocalProcessId LocalProcess _localPidCounter :: LocalNodeState -> Int32 _localPidUnique :: LocalNodeState -> Int32 -- | Processes running on our local node data LocalProcess LocalProcess :: CQueue Message -> ProcessId -> MVar LocalProcessState -> ThreadId -> LocalProcess processQueue :: LocalProcess -> CQueue Message processId :: LocalProcess -> ProcessId processState :: LocalProcess -> MVar LocalProcessState processThread :: LocalProcess -> ThreadId -- | Local process state data LocalProcessState LocalProcessState :: Int32 -> Int32 -> Int32 -> Map LocalSendPortId TypedChannel -> LocalProcessState _monitorCounter :: LocalProcessState -> Int32 _spawnCounter :: LocalProcessState -> Int32 _channelCounter :: LocalProcessState -> Int32 _typedChannels :: LocalProcessState -> Map LocalSendPortId TypedChannel -- | The Cloud Haskell Process type newtype Process a Process :: ReaderT LocalProcess (MessageT IO) a -> Process a unProcess :: Process a -> ReaderT LocalProcess (MessageT IO) a procMsg :: MessageT IO a -> Process a type LocalSendPortId = Int32 -- | A send port is identified by a SendPortId. -- -- You cannot send directly to a SendPortId; instead, use -- newChan to create a SendPort. data SendPortId SendPortId :: ProcessId -> LocalSendPortId -> SendPortId -- | The ID of the process that will receive messages sent on this port sendPortProcessId :: SendPortId -> ProcessId -- | Process-local ID of the channel sendPortLocalId :: SendPortId -> LocalSendPortId data TypedChannel TypedChannel :: (TChan a) -> TypedChannel -- | The send send of a typed channel (serializable) newtype SendPort a SendPort :: SendPortId -> SendPort a -- | The (unique) ID of this send port sendPortId :: SendPort a -> SendPortId -- | The receive end of a typed channel (not serializable) data ReceivePort a -- | A single receive port ReceivePortSingle :: (TChan a) -> ReceivePort a -- | A left-biased combination of receive ports ReceivePortBiased :: [ReceivePort a] -> ReceivePort a -- | A round-robin combination of receive ports ReceivePortRR :: (TVar [ReceivePort a]) -> ReceivePort a data StaticLabel StaticLabel :: String -> TypeRep -> StaticLabel StaticApply :: StaticLabel -> StaticLabel -> StaticLabel StaticDuplicate :: StaticLabel -> TypeRep -> StaticLabel -- | A static value is top-level bound or the application of two static -- values newtype Static a Static :: StaticLabel -> Static a -- | Apply two static values staticApply :: Static (a -> b) -> Static a -> Static b -- | Co-monadic duplicate for static values staticDuplicate :: Typeable a => Static a -> Static (Static a) staticTypeOf :: Typeable a => a -> Static a typeOfStaticLabel :: StaticLabel -> TypeRep -- | A closure is a static value and an encoded environment data Closure a Closure :: (Static (ByteString -> a)) -> ByteString -> Closure a -- | Runtime dictionary for unstatic lookups data RemoteTable RemoteTable :: Map String Dynamic -> RemoteTable _remoteTableLabels :: RemoteTable -> Map String Dynamic -- | Reification of Serializable (see -- Control.Distributed.Process.Closure) data SerializableDict a SerializableDict :: SerializableDict a -- | Messages consist of their typeRep fingerprint and their encoding data Message Message :: Fingerprint -> ByteString -> Message messageFingerprint :: Message -> Fingerprint messageEncoding :: Message -> ByteString -- | Turn any serialiable term into a message createMessage :: Serializable a => a -> Message -- | Serialize a message messageToPayload :: Message -> [ByteString] -- | Deserialize a message payloadToMessage :: [ByteString] -> Message -- | MonitorRef is opaque for regular Cloud Haskell processes data MonitorRef MonitorRef :: Identifier -> Int32 -> MonitorRef -- | ID of the entity to be monitored monitorRefIdent :: MonitorRef -> Identifier -- | Unique to distinguish multiple monitor requests by the same process monitorRefCounter :: MonitorRef -> Int32 -- | Message sent by process monitors data ProcessMonitorNotification ProcessMonitorNotification :: MonitorRef -> ProcessId -> DiedReason -> ProcessMonitorNotification -- | Message sent by node monitors data NodeMonitorNotification NodeMonitorNotification :: MonitorRef -> NodeId -> DiedReason -> NodeMonitorNotification -- | Message sent by channel (port) monitors data PortMonitorNotification PortMonitorNotification :: MonitorRef -> SendPortId -> DiedReason -> PortMonitorNotification -- | Exceptions thrown when a linked process dies data ProcessLinkException ProcessLinkException :: ProcessId -> DiedReason -> ProcessLinkException -- | Exception thrown when a linked node dies data NodeLinkException NodeLinkException :: NodeId -> DiedReason -> NodeLinkException -- | Exception thrown when a linked channel (port) dies data PortLinkException PortLinkException :: SendPortId -> DiedReason -> PortLinkException -- | Why did a process die? data DiedReason -- | Normal termination DiedNormal :: DiedReason -- | The process exited with an exception (provided as String -- because Exception does not implement Binary) DiedException :: String -> DiedReason -- | We got disconnected from the process node DiedDisconnect :: DiedReason -- | The process node died DiedNodeDown :: DiedReason -- | Invalid (processnodechannel) identifier DiedUnknownId :: DiedReason -- | (Asynchronous) reply from unmonitor newtype DidUnmonitor DidUnmonitor :: MonitorRef -> DidUnmonitor -- | (Asynchronous) reply from unlink newtype DidUnlinkProcess DidUnlinkProcess :: ProcessId -> DidUnlinkProcess -- | (Asynchronous) reply from unlinkNode newtype DidUnlinkNode DidUnlinkNode :: NodeId -> DidUnlinkNode -- | (Asynchronous) reply from unlinkPort newtype DidUnlinkPort DidUnlinkPort :: SendPortId -> DidUnlinkPort -- | SpawnRef are used to return pids of spawned processes newtype SpawnRef SpawnRef :: Int32 -> SpawnRef -- | (Asynchronius) reply from spawn data DidSpawn DidSpawn :: SpawnRef -> ProcessId -> DidSpawn -- | (Asynchronous) reply from whereis data WhereIsReply WhereIsReply :: String -> (Maybe ProcessId) -> WhereIsReply -- | Messages to the node controller data NCMsg NCMsg :: Identifier -> ProcessSignal -> NCMsg ctrlMsgSender :: NCMsg -> Identifier ctrlMsgSignal :: NCMsg -> ProcessSignal -- | Signals to the node controller (see NCMsg) data ProcessSignal Link :: Identifier -> ProcessSignal Unlink :: Identifier -> ProcessSignal Monitor :: MonitorRef -> ProcessSignal Unmonitor :: MonitorRef -> ProcessSignal Died :: Identifier -> DiedReason -> ProcessSignal Spawn :: (Closure (Process ())) -> SpawnRef -> ProcessSignal WhereIs :: String -> ProcessSignal Register :: String -> (Maybe ProcessId) -> ProcessSignal NamedSend :: String -> Message -> ProcessSignal newtype MessageT m a MessageT :: StateT MessageState m a -> MessageT m a unMessageT :: MessageT m a -> StateT MessageState m a data MessageState MessageState :: LocalNode -> Map Identifier Connection -> MessageState messageLocalNode :: MessageState -> LocalNode _messageConnections :: MessageState -> Map Identifier Connection localProcesses :: Accessor LocalNodeState (Map LocalProcessId LocalProcess) localPidCounter :: Accessor LocalNodeState Int32 localPidUnique :: Accessor LocalNodeState Int32 localProcessWithId :: LocalProcessId -> Accessor LocalNodeState (Maybe LocalProcess) monitorCounter :: Accessor LocalProcessState Int32 spawnCounter :: Accessor LocalProcessState Int32 channelCounter :: Accessor LocalProcessState LocalSendPortId typedChannels :: Accessor LocalProcessState (Map LocalSendPortId TypedChannel) typedChannelWithId :: LocalSendPortId -> Accessor LocalProcessState (Maybe TypedChannel) remoteTableLabels :: Accessor RemoteTable (Map String Dynamic) remoteTableLabel :: String -> Accessor RemoteTable (Maybe Dynamic) instance Typeable LocalProcessId instance Typeable ProcessId instance Typeable1 SendPort instance Typeable1 ReceivePort instance Typeable StaticLabel instance Typeable1 Static instance Typeable1 Closure instance Typeable1 SerializableDict instance Typeable PortLinkException instance Typeable NodeLinkException instance Typeable ProcessLinkException instance Typeable PortMonitorNotification instance Typeable NodeMonitorNotification instance Typeable ProcessMonitorNotification instance Typeable DidUnmonitor instance Typeable DidUnlinkProcess instance Typeable DidUnlinkNode instance Typeable DidUnlinkPort instance Typeable SpawnRef instance Typeable DidSpawn instance Typeable WhereIsReply instance Typeable1 Process instance Eq NodeId instance Ord NodeId instance Binary NodeId instance Eq LocalProcessId instance Ord LocalProcessId instance Eq ProcessId instance Ord ProcessId instance Eq SendPortId instance Ord SendPortId instance Eq Identifier instance Ord Identifier instance Binary (SendPort a) instance Show (SendPort a) instance Eq (SendPort a) instance Ord (SendPort a) instance Show StaticLabel instance Show (Static a) instance Show (Closure a) instance Eq MonitorRef instance Ord MonitorRef instance Show MonitorRef instance Show DiedReason instance Eq DiedReason instance Show PortLinkException instance Show NodeLinkException instance Show ProcessLinkException instance Show PortMonitorNotification instance Show NodeMonitorNotification instance Show ProcessMonitorNotification instance Binary DidUnmonitor instance Binary DidUnlinkProcess instance Binary DidUnlinkNode instance Binary DidUnlinkPort instance Show SpawnRef instance Binary SpawnRef instance Eq SpawnRef instance Show DidSpawn instance Show WhereIsReply instance Show NCMsg instance Show ProcessSignal instance Functor Process instance Monad Process instance MonadIO Process instance MonadReader LocalProcess Process instance Applicative Process instance Functor m => Functor (MessageT m) instance Monad m => Monad (MessageT m) instance MonadIO m => MonadIO (MessageT m) instance Monad m => MonadState MessageState (MessageT m) instance (Monad m, Functor m) => Applicative (MessageT m) instance Binary WhereIsReply instance Binary StaticLabel instance Binary Identifier instance Binary SendPortId instance Binary DidSpawn instance Binary (Closure a) instance Binary DiedReason instance Binary ProcessSignal instance Binary MonitorRef instance Binary NCMsg instance Binary PortMonitorNotification instance Binary NodeMonitorNotification instance Binary ProcessMonitorNotification instance Binary ProcessId instance Binary LocalProcessId instance Exception PortLinkException instance Exception NodeLinkException instance Exception ProcessLinkException instance Show Message instance Show SendPortId instance Show Identifier instance Show ProcessId instance Show LocalProcessId instance Show NodeId -- | Add message sending capability to a monad -- -- NOTE: Not thread-safe (you should not do concurrent sends within the -- same monad). module Control.Distributed.Process.Internal.MessageT data MessageT m a runMessageT :: Monad m => LocalNode -> MessageT m a -> m a getLocalNode :: Monad m => MessageT m LocalNode sendPayload :: MonadIO m => Identifier -> [ByteString] -> MessageT m () sendBinary :: (MonadIO m, Binary a) => Identifier -> a -> MessageT m () sendMessage :: (MonadIO m, Serializable a) => Identifier -> a -> MessageT m () -- | Deserialize a message payloadToMessage :: [ByteString] -> Message -- | Turn any serialiable term into a message createMessage :: Serializable a => a -> Message -- | Template Haskell support -- -- (In a separate file for convenience) module Control.Distributed.Process.Internal.Closure.TH -- | Create the closure, decoder, and metadata definitions for the given -- list of functions remotable :: [Name] -> Q [Dec] -- | Construct a static value. -- -- If f : forall a1 .. an. T then $(mkStatic 'f) :: forall -- a1 .. an. Static T. Be sure to pass f to -- remotable. mkStatic :: Name -> Q Exp -- | Serialization dictionary for a function argument (see module header) functionSDict :: Name -> Q Exp -- | Combinators on static values module Control.Distributed.Process.Internal.Closure.Static -- | Static version of const staticConst :: (Typeable a, Typeable b) => Static (a -> b -> a) -- | Static version of flip staticFlip :: (Typeable a, Typeable b, Typeable c) => Static (a -> b -> c) -> Static (b -> a -> c) -- | Static version of fst staticFst :: (Typeable a, Typeable b) => Static ((a, b) -> a) -- | Static version of snd staticSnd :: (Typeable a, Typeable b) => Static ((a, b) -> b) -- | Static version of (.) staticCompose :: (Typeable a, Typeable b, Typeable c) => Static (b -> c) -> Static (a -> b) -> Static (a -> c) -- | Static version of first staticFirst :: (Typeable a, Typeable b, Typeable c) => Static ((a -> b) -> (a, c) -> (b, c)) -- | Static version of second staticSecond :: (Typeable a, Typeable b, Typeable c) => Static ((a -> b) -> (c, a) -> (c, b)) -- | Static version of (***) staticSplit :: (Typeable a, Typeable b, Typeable c, Typeable d) => Static (a -> c) -> Static (b -> d) -> Static ((a, b) -> (c, d)) -- | Static version of '()' staticUnit :: Static () -- | Static decoder, given a static serialization dictionary. -- -- See module documentation of Control.Distributed.Process.Closure -- for an example. staticDecode :: Typeable a => Static (SerializableDict a) -> Static (ByteString -> a) -- | Convert a static value into a closure. staticClosure :: Typeable a => Static a -> Closure a -- | Convert a serializable value into a closure. toClosure :: Serializable a => Static (SerializableDict a) -> a -> Closure a -- | Serialization dictionary for '()' sdictUnit :: Static (SerializableDict ()) -- | Serialization dictionary for ProcessId sdictProcessId :: Static (SerializableDict ProcessId) -- | Serialization dictionary for SendPort sdictSendPort :: Typeable a => Static (SerializableDict a) -> Static (SerializableDict (SendPort a)) __remoteTable :: RemoteTable -> RemoteTable module Control.Distributed.Process.Internal.Closure.MkClosure -- | Create a closure -- -- If f : T1 -> T2 is a monomorphic function then -- $(mkClosure 'f) :: T1 -> Closure T2. Be sure to pass -- f to remotable. mkClosure :: Name -> Q Exp module Control.Distributed.Process.Internal.Closure.Resolution resolveClosure :: RemoteTable -> StaticLabel -> ByteString -> Maybe Dynamic module Control.Distributed.Process.Internal.Node -- | Deconstructor for Process (not exported to the public API) runLocalProcess :: LocalNode -> Process a -> LocalProcess -> IO a -- | Cloud Haskell primitives -- -- We define these in a separate module so that we don't have to rely on -- the closure combinators module Control.Distributed.Process.Internal.Primitives -- | Send a message send :: Serializable a => ProcessId -> a -> Process () -- | Wait for a message of a specific type expect :: Serializable a => Process a -- | Create a new typed channel newChan :: Serializable a => Process (SendPort a, ReceivePort a) -- | Send a message on a typed channel sendChan :: Serializable a => SendPort a -> a -> Process () -- | Wait for a message on a typed channel receiveChan :: Serializable a => ReceivePort a -> Process a -- | Merge a list of typed channels. -- -- The result port is left-biased: if there are messages available on -- more than one port, the first available message is returned. mergePortsBiased :: Serializable a => [ReceivePort a] -> Process (ReceivePort a) -- | Like mergePortsBiased, but with a round-robin scheduler (rather -- than left-biased) mergePortsRR :: Serializable a => [ReceivePort a] -> Process (ReceivePort a) -- | Opaque type used in receiveWait and receiveTimeout data Match b -- | Test the matches in order against each message in the queue receiveWait :: [Match b] -> Process b -- | Like receiveWait but with a timeout. -- -- If the timeout is zero do a non-blocking check for matching messages. -- A non-zero timeout is applied only when waiting for incoming messages -- (that is, after we have checked the messages that are already -- in the mailbox). receiveTimeout :: Int -> [Match b] -> Process (Maybe b) -- | Match against any message of the right type match :: Serializable a => (a -> Process b) -> Match b -- | Match against any message of the right type that satisfies a predicate matchIf :: Serializable a => (a -> Bool) -> (a -> Process b) -> Match b -- | Remove any message from the queue matchUnknown :: Process b -> Match b -- | Terminate (throws a ProcessTerminationException) terminate :: Process a -- | Thrown by terminate data ProcessTerminationException ProcessTerminationException :: ProcessTerminationException -- | Our own process ID getSelfPid :: Process ProcessId -- | Get the node ID of our local node getSelfNode :: Process NodeId -- | Link to a remote process (asynchronous) -- -- Note that link provides unidirectional linking (see -- spawnSupervised). Linking makes no distinction between normal -- and abnormal termination of the remote process. link :: ProcessId -> Process () -- | Remove a link (synchronous) unlink :: ProcessId -> Process () -- | Monitor another process (asynchronous) monitor :: ProcessId -> Process MonitorRef -- | Remove a monitor (synchronous) unmonitor :: MonitorRef -> Process () -- | Log a string -- -- say message sends a message (time, pid of the current -- process, message) to the process registered as logger. By -- default, this process simply sends the string to stderr. -- Individual Cloud Haskell backends might replace this with a different -- logger process, however. say :: String -> Process () -- | Register a process with the local registry (asynchronous). -- -- The process to be registered does not have to be local itself. register :: String -> ProcessId -> Process () -- | Remove a process from the local registry (asynchronous). unregister :: String -> Process () -- | Query the local process registry (synchronous). whereis :: String -> Process (Maybe ProcessId) -- | Named send to a process in the local registry (asynchronous) nsend :: Serializable a => String -> a -> Process () -- | Register a process with a remote registry (asynchronous). -- -- The process to be registered does not have to live on the same remote -- node. registerRemote :: NodeId -> String -> ProcessId -> Process () -- | Remove a process from a remote registry (asynchronous). unregisterRemote :: NodeId -> String -> Process () -- | Query a remote process registry (synchronous) whereisRemote :: NodeId -> String -> Process (Maybe ProcessId) -- | Query a remote process registry (asynchronous) -- -- Reply will come in the form of a WhereIsReply message whereisRemoteAsync :: NodeId -> String -> Process () -- | Named send to a process in a remote registry (asynchronous) nsendRemote :: Serializable a => NodeId -> String -> a -> Process () -- | Deserialize a closure unClosure :: Typeable a => Closure a -> Process a -- | Catch exceptions within a process catch :: Exception e => Process a -> (e -> Process a) -> Process a -- | Like expect but with a timeout expectTimeout :: Serializable a => Int -> Process (Maybe a) -- | Asynchronous version of spawn -- -- (spawn is defined in terms of spawnAsync and -- expect) spawnAsync :: NodeId -> Closure (Process ()) -> Process SpawnRef -- | Link to a node linkNode :: NodeId -> Process () -- | Link to a channel (send port) linkPort :: SendPort a -> Process () -- | Remove a node link (synchronous) unlinkNode :: NodeId -> Process () -- | Remove a channel (send port) link (synchronous) unlinkPort :: SendPort a -> Process () -- | Monitor a node monitorNode :: NodeId -> Process MonitorRef -- | Monitor a typed channel monitorPort :: Serializable a => SendPort a -> Process MonitorRef instance Typeable ProcessTerminationException instance Show ProcessTerminationException instance Exception ProcessTerminationException -- | Combinator for process closures module Control.Distributed.Process.Internal.Closure.CP -- | 'CP a b' represents the closure of a process parameterized by -- a and returning b. 'CP a b' forms a (restricted) -- generalized arrow (http://www.cs.berkeley.edu/~megacz/garrows/) type CP a b = Closure (a -> Process b) -- | CP introduction form cpIntro :: (Typeable a, Typeable b) => Closure (Process b) -> Closure (a -> Process b) -- | CP elimination form cpElim :: Typeable a => CP () a -> Closure (Process a) -- | Identity (Closure version of return) cpId :: Typeable a => CP a a -- | Left-to-right composition (Closure version of >=>) cpComp :: (Typeable a, Typeable b, Typeable c) => CP a b -> CP b c -> CP a c -- | First cpFirst :: (Typeable a, Typeable b, Typeable c) => CP a b -> CP (a, c) (b, c) -- | Second cpSecond :: (Typeable a, Typeable b, Typeable c) => CP a b -> CP (c, a) (c, b) -- | Split (Like ***) cpSplit :: (Typeable a, Typeable b, Typeable c, Typeable d) => CP a c -> CP b d -> CP (a, b) (c, d) -- | Left cancellation cpCancelL :: Typeable a => CP ((), a) a -- | Right cancellation cpCancelR :: Typeable a => CP (a, ()) a -- | Closure version of link cpLink :: ProcessId -> Closure (Process ()) -- | Closure version of unlink cpUnlink :: ProcessId -> Closure (Process ()) -- | Closure version of send cpSend :: Typeable a => Static (SerializableDict a) -> ProcessId -> Closure (a -> Process ()) -- | Closure version of expect cpExpect :: Typeable a => Static (SerializableDict a) -> Closure (Process a) -- | Closure version of newChan cpNewChan :: Typeable a => Static (SerializableDict a) -> Closure (Process (SendPort a, ReceivePort a)) -- | Not-quite-monadic return cpReturn :: Serializable a => Static (SerializableDict a) -> a -> Closure (Process a) -- | Not-quite-monadic bind (>>=) cpBind :: (Typeable a, Typeable b) => Closure (Process a) -> Closure (a -> Process b) -> Closure (Process b) -- | Monadic sequencing (>>) cpSeq :: Closure (Process ()) -> Closure (Process ()) -> Closure (Process ()) __remoteTable :: RemoteTable -> RemoteTable -- | Static values and Closures -- -- -- -- Towards Haskell in the Cloud (Epstein et al., Haskell Symposium -- 2011) proposes a new type construct called static that -- characterizes values that are known statically. There is no support -- for static in ghc yet, however, so we emulate it using -- Template Haskell. Given a top-level definition -- --
--   f :: forall a1 .. an. T
--   f = ...
--   
-- -- you can use a Template Haskell splice to create a static version of -- f: -- --
--   $(mkStatic 'f) :: forall a1 .. an. Static T
--   
-- -- Every module that you write that contains calls to mkStatic -- needs to have a call to remotable: -- --
--   remotable [ 'f, 'g, ... ]
--   
-- -- where you must pass every function (or other value) that you pass as -- an argument to mkStatic. The call to remotable will -- create a definition -- --
--   __remoteTable :: RemoteTable -> RemoteTable
--   
-- -- which can be used to construct the RemoteTable used to -- initialize Cloud Haskell. You should have (at most) one call to -- remotable per module, and compose all created functions when -- initializing Cloud Haskell: -- --
--   let rtable :: RemoteTable 
--       rtable = M1.__remoteTable
--              . M2.__remoteTable
--              . ...
--              . Mn.__remoteTable
--              $ initRemoteTable 
--   
-- -- -- -- We generalize the notion of static as described in the paper, -- and also provide -- --
--   staticApply :: Static (a -> b) -> Static a -> Static b
--   
-- -- This makes it possible to define a rich set of combinators on -- static values, a number of which are provided in this module. -- -- -- -- Suppose you have a process -- --
--   factorial :: Int -> Process Int
--   
-- -- Then you can use the supplied Template Haskell function -- mkClosure to define -- --
--   factorialClosure :: Int -> Closure (Process Int)
--   factorialClosure = $(mkClosure 'factorial)
--   
-- -- You can then pass 'factorialClosure n' to spawn, for example, -- to have a remote node compute a factorial number. -- -- In general, if you have a monomorphic function -- --
--   f :: T1 -> T2
--   
-- -- then -- --
--   $(mkClosure 'f) :: T1 -> Closure T2
--   
-- -- provided that T1 is serializable (*). -- -- -- -- You don't need to use mkClosure, however. Closures are -- defined exactly as described in Towards Haskell in the Cloud: -- --
--   data Closure a = Closure (Static (ByteString -> a)) ByteString
--   
-- -- The splice $(mkClosure 'factorial) above expands to -- (prettified a bit): -- --
--   factorialClosure :: Int -> Closure (Process Int)
--   factorialClosure n = Closure decoder (encode n)
--     where
--       decoder :: Static (ByteString -> Process Int)
--       decoder = $(mkStatic 'factorial) 
--               `staticCompose`  
--                 staticDecode $(functionSDict 'factorial)
--   
-- -- mkStatic we have already seen: -- --
--   $(mkStatic 'factorial) :: Static (Int -> Process Int)
--   
-- -- staticCompose is function composition on static functions. -- staticDecode has type (**) -- --
--   staticDecode :: Typeable a 
--                => Static (SerializableDict a) -> Static (ByteString -> a)
--   
-- -- and gives you a static decoder, given a static Serializable -- dictionary. SerializableDict is a reified type class -- dictionary, and defined simply as -- --
--   data SerializableDict a where
--     SerializableDict :: Serializable a => SerializableDict a
--   
-- -- That means that for any serialziable type T, you can define -- --
--   sdictForMyType :: SerializableDict T
--   sdictForMyType = SerializableDict
--   
-- -- and then use -- --
--   $(mkStatic 'sdictForMyType) :: Static (SerializableDict T)
--   
-- -- to obtain a static serializable dictionary for T (make sure -- to pass sdictForMyType to remotable). -- -- However, since these serialization dictionaries are so frequently -- required, when you call remotable on a monomorphic function -- f : T1 -> T2 -- --
--   remotable ['f]
--   
-- -- then a serialization dictionary is automatically created for you, -- which you can access with -- --
--   $(functionSDict 'f) :: Static (SerializableDict T1)
--   
-- -- This is the dictionary that mkClosure uses. -- -- -- -- Support for staticApply (described above) also means that we -- can define combinators on Closures, and we provide a number of them in -- this module, the most important of which is cpBind. Have a look -- at the implementation of call for an example use. -- -- -- -- (*) If T1 is not serializable you will get a type error in -- the generated code. Unfortunately, the Template Haskell infrastructure -- cannot check a priori if T1 is serializable or not due to a -- bug in the Template Haskell libraries -- (http://hackage.haskell.org/trac/ghc/ticket/7066) -- -- (**) Even though staticDecode is passed an explicit -- serialization dictionary, we still need the Typeable -- constraint because Static is not the true static. If -- it was, we could unstatic the dictionary and pattern match on -- it to bring the Typeable instance into scope, but unless -- proper static support is added to ghc we need both the type -- class argument and the explicit dictionary. module Control.Distributed.Process.Closure -- | Create the closure, decoder, and metadata definitions for the given -- list of functions remotable :: [Name] -> Q [Dec] -- | Construct a static value. -- -- If f : forall a1 .. an. T then $(mkStatic 'f) :: forall -- a1 .. an. Static T. Be sure to pass f to -- remotable. mkStatic :: Name -> Q Exp -- | Create a closure -- -- If f : T1 -> T2 is a monomorphic function then -- $(mkClosure 'f) :: T1 -> Closure T2. Be sure to pass -- f to remotable. mkClosure :: Name -> Q Exp -- | Serialization dictionary for a function argument (see module header) functionSDict :: Name -> Q Exp -- | Apply two static values staticApply :: Static (a -> b) -> Static a -> Static b -- | Co-monadic duplicate for static values staticDuplicate :: Typeable a => Static a -> Static (Static a) -- | Static version of const staticConst :: (Typeable a, Typeable b) => Static (a -> b -> a) -- | Static version of flip staticFlip :: (Typeable a, Typeable b, Typeable c) => Static (a -> b -> c) -> Static (b -> a -> c) -- | Static version of fst staticFst :: (Typeable a, Typeable b) => Static ((a, b) -> a) -- | Static version of snd staticSnd :: (Typeable a, Typeable b) => Static ((a, b) -> b) -- | Static version of (.) staticCompose :: (Typeable a, Typeable b, Typeable c) => Static (b -> c) -> Static (a -> b) -> Static (a -> c) -- | Static version of first staticFirst :: (Typeable a, Typeable b, Typeable c) => Static ((a -> b) -> (a, c) -> (b, c)) -- | Static version of second staticSecond :: (Typeable a, Typeable b, Typeable c) => Static ((a -> b) -> (c, a) -> (c, b)) -- | Static version of (***) staticSplit :: (Typeable a, Typeable b, Typeable c, Typeable d) => Static (a -> c) -> Static (b -> d) -> Static ((a, b) -> (c, d)) -- | Static version of '()' staticUnit :: Static () -- | Static decoder, given a static serialization dictionary. -- -- See module documentation of Control.Distributed.Process.Closure -- for an example. staticDecode :: Typeable a => Static (SerializableDict a) -> Static (ByteString -> a) -- | Convert a static value into a closure. staticClosure :: Typeable a => Static a -> Closure a -- | Convert a serializable value into a closure. toClosure :: Serializable a => Static (SerializableDict a) -> a -> Closure a -- | Reification of Serializable (see -- Control.Distributed.Process.Closure) data SerializableDict a SerializableDict :: SerializableDict a -- | Serialization dictionary for '()' sdictUnit :: Static (SerializableDict ()) -- | Serialization dictionary for ProcessId sdictProcessId :: Static (SerializableDict ProcessId) -- | Serialization dictionary for SendPort sdictSendPort :: Typeable a => Static (SerializableDict a) -> Static (SerializableDict (SendPort a)) -- | 'CP a b' represents the closure of a process parameterized by -- a and returning b. 'CP a b' forms a (restricted) -- generalized arrow (http://www.cs.berkeley.edu/~megacz/garrows/) type CP a b = Closure (a -> Process b) -- | CP introduction form cpIntro :: (Typeable a, Typeable b) => Closure (Process b) -> Closure (a -> Process b) -- | CP elimination form cpElim :: Typeable a => CP () a -> Closure (Process a) -- | Identity (Closure version of return) cpId :: Typeable a => CP a a -- | Left-to-right composition (Closure version of >=>) cpComp :: (Typeable a, Typeable b, Typeable c) => CP a b -> CP b c -> CP a c -- | First cpFirst :: (Typeable a, Typeable b, Typeable c) => CP a b -> CP (a, c) (b, c) -- | Second cpSecond :: (Typeable a, Typeable b, Typeable c) => CP a b -> CP (c, a) (c, b) -- | Split (Like ***) cpSplit :: (Typeable a, Typeable b, Typeable c, Typeable d) => CP a c -> CP b d -> CP (a, b) (c, d) -- | Left cancellation cpCancelL :: Typeable a => CP ((), a) a -- | Right cancellation cpCancelR :: Typeable a => CP (a, ()) a -- | Closure version of link cpLink :: ProcessId -> Closure (Process ()) -- | Closure version of unlink cpUnlink :: ProcessId -> Closure (Process ()) -- | Closure version of send cpSend :: Typeable a => Static (SerializableDict a) -> ProcessId -> Closure (a -> Process ()) -- | Closure version of expect cpExpect :: Typeable a => Static (SerializableDict a) -> Closure (Process a) -- | Closure version of newChan cpNewChan :: Typeable a => Static (SerializableDict a) -> Closure (Process (SendPort a, ReceivePort a)) -- | Not-quite-monadic return cpReturn :: Serializable a => Static (SerializableDict a) -> a -> Closure (Process a) -- | Not-quite-monadic bind (>>=) cpBind :: (Typeable a, Typeable b) => Closure (Process a) -> Closure (a -> Process b) -> Closure (Process b) -- | Monadic sequencing (>>) cpSeq :: Closure (Process ()) -> Closure (Process ()) -> Closure (Process ()) -- | Local nodes module Control.Distributed.Process.Node -- | Local nodes data LocalNode -- | Initialize a new local node. -- -- Note that proper Cloud Haskell initialization and configuration is -- still to do. newLocalNode :: Transport -> RemoteTable -> IO LocalNode -- | Force-close a local node -- -- TODO: for now we just close the associated endpoint closeLocalNode :: LocalNode -> IO () -- | Spawn a new process on a local node forkProcess :: LocalNode -> Process () -> IO ProcessId -- | Run a process on a local node and wait for it to finish runProcess :: LocalNode -> Process () -> IO () initRemoteTable :: RemoteTable -- | NodeId of the node localNodeId :: LocalNode -> NodeId instance Functor NC instance Monad NC instance MonadIO NC instance MonadState NCState NC -- | -- -- This is an implementation of Cloud Haskell, as described in Towards -- Haskell in the Cloud by Jeff Epstein, Andrew Black, and Simon -- Peyton Jones -- (http://research.microsoft.com/en-us/um/people/simonpj/papers/parallel/), -- although some of the details are different. The precise message -- passing semantics are based on A unified semantics for future -- Erlang by Hans Svensson, Lars-Åke Fredlund and Clara Benac Earle. module Control.Distributed.Process -- | Process identifier data ProcessId -- | Node identifier data NodeId -- | The Cloud Haskell Process type data Process a -- | A send port is identified by a SendPortId. -- -- You cannot send directly to a SendPortId; instead, use -- newChan to create a SendPort. data SendPortId -- | The ID of the node the process is running on processNodeId :: ProcessId -> NodeId -- | The ID of the process that will receive messages sent on this port sendPortProcessId :: SendPortId -> ProcessId -- | Lift a computation from the IO monad. liftIO :: MonadIO m => forall a. IO a -> m a -- | Send a message send :: Serializable a => ProcessId -> a -> Process () -- | Wait for a message of a specific type expect :: Serializable a => Process a -- | The receive end of a typed channel (not serializable) data ReceivePort a -- | The send send of a typed channel (serializable) data SendPort a -- | The (unique) ID of this send port sendPortId :: SendPort a -> SendPortId -- | Create a new typed channel newChan :: Serializable a => Process (SendPort a, ReceivePort a) -- | Send a message on a typed channel sendChan :: Serializable a => SendPort a -> a -> Process () -- | Wait for a message on a typed channel receiveChan :: Serializable a => ReceivePort a -> Process a -- | Merge a list of typed channels. -- -- The result port is left-biased: if there are messages available on -- more than one port, the first available message is returned. mergePortsBiased :: Serializable a => [ReceivePort a] -> Process (ReceivePort a) -- | Like mergePortsBiased, but with a round-robin scheduler (rather -- than left-biased) mergePortsRR :: Serializable a => [ReceivePort a] -> Process (ReceivePort a) -- | Opaque type used in receiveWait and receiveTimeout data Match b -- | Test the matches in order against each message in the queue receiveWait :: [Match b] -> Process b -- | Like receiveWait but with a timeout. -- -- If the timeout is zero do a non-blocking check for matching messages. -- A non-zero timeout is applied only when waiting for incoming messages -- (that is, after we have checked the messages that are already -- in the mailbox). receiveTimeout :: Int -> [Match b] -> Process (Maybe b) -- | Match against any message of the right type match :: Serializable a => (a -> Process b) -> Match b -- | Match against any message of the right type that satisfies a predicate matchIf :: Serializable a => (a -> Bool) -> (a -> Process b) -> Match b -- | Remove any message from the queue matchUnknown :: Process b -> Match b -- | Spawn a process -- -- For more information about Closure, see -- Control.Distributed.Process.Closure. -- -- See also call. spawn :: NodeId -> Closure (Process ()) -> Process ProcessId -- | Run a process remotely and wait for it to reply -- -- We monitor the remote process: if it dies before it can send a reply, -- we die too. -- -- For more information about Static, SerializableDict, and -- Closure, see Control.Distributed.Process.Closure. -- -- See also spawn. call :: Serializable a => Static (SerializableDict a) -> NodeId -> Closure (Process a) -> Process a -- | Terminate (throws a ProcessTerminationException) terminate :: Process a -- | Thrown by terminate data ProcessTerminationException ProcessTerminationException :: ProcessTerminationException -- | SpawnRef are used to return pids of spawned processes data SpawnRef -- | Our own process ID getSelfPid :: Process ProcessId -- | Get the node ID of our local node getSelfNode :: Process NodeId -- | Link to a remote process (asynchronous) -- -- Note that link provides unidirectional linking (see -- spawnSupervised). Linking makes no distinction between normal -- and abnormal termination of the remote process. link :: ProcessId -> Process () -- | Link to a node linkNode :: NodeId -> Process () -- | Link to a channel (send port) linkPort :: SendPort a -> Process () -- | Remove a link (synchronous) unlink :: ProcessId -> Process () -- | Remove a node link (synchronous) unlinkNode :: NodeId -> Process () -- | Remove a channel (send port) link (synchronous) unlinkPort :: SendPort a -> Process () -- | Monitor another process (asynchronous) monitor :: ProcessId -> Process MonitorRef -- | Monitor a node monitorNode :: NodeId -> Process MonitorRef -- | Monitor a typed channel monitorPort :: Serializable a => SendPort a -> Process MonitorRef -- | Remove a monitor (synchronous) unmonitor :: MonitorRef -> Process () -- | Exceptions thrown when a linked process dies data ProcessLinkException ProcessLinkException :: ProcessId -> DiedReason -> ProcessLinkException -- | Exception thrown when a linked node dies data NodeLinkException NodeLinkException :: NodeId -> DiedReason -> NodeLinkException -- | Exception thrown when a linked channel (port) dies data PortLinkException PortLinkException :: SendPortId -> DiedReason -> PortLinkException -- | MonitorRef is opaque for regular Cloud Haskell processes data MonitorRef -- | Message sent by process monitors data ProcessMonitorNotification ProcessMonitorNotification :: MonitorRef -> ProcessId -> DiedReason -> ProcessMonitorNotification -- | Message sent by node monitors data NodeMonitorNotification NodeMonitorNotification :: MonitorRef -> NodeId -> DiedReason -> NodeMonitorNotification -- | Message sent by channel (port) monitors data PortMonitorNotification PortMonitorNotification :: MonitorRef -> SendPortId -> DiedReason -> PortMonitorNotification -- | Why did a process die? data DiedReason -- | Normal termination DiedNormal :: DiedReason -- | The process exited with an exception (provided as String -- because Exception does not implement Binary) DiedException :: String -> DiedReason -- | We got disconnected from the process node DiedDisconnect :: DiedReason -- | The process node died DiedNodeDown :: DiedReason -- | Invalid (processnodechannel) identifier DiedUnknownId :: DiedReason -- | A closure is a static value and an encoded environment data Closure a -- | A static value is top-level bound or the application of two static -- values data Static a -- | Deserialize a closure unClosure :: Typeable a => Closure a -> Process a -- | Runtime dictionary for unstatic lookups data RemoteTable -- | Log a string -- -- say message sends a message (time, pid of the current -- process, message) to the process registered as logger. By -- default, this process simply sends the string to stderr. -- Individual Cloud Haskell backends might replace this with a different -- logger process, however. say :: String -> Process () -- | Register a process with the local registry (asynchronous). -- -- The process to be registered does not have to be local itself. register :: String -> ProcessId -> Process () -- | Remove a process from the local registry (asynchronous). unregister :: String -> Process () -- | Query the local process registry (synchronous). whereis :: String -> Process (Maybe ProcessId) -- | Named send to a process in the local registry (asynchronous) nsend :: Serializable a => String -> a -> Process () -- | Register a process with a remote registry (asynchronous). -- -- The process to be registered does not have to live on the same remote -- node. registerRemote :: NodeId -> String -> ProcessId -> Process () -- | Remove a process from a remote registry (asynchronous). unregisterRemote :: NodeId -> String -> Process () -- | Query a remote process registry (synchronous) whereisRemote :: NodeId -> String -> Process (Maybe ProcessId) -- | Query a remote process registry (asynchronous) -- -- Reply will come in the form of a WhereIsReply message whereisRemoteAsync :: NodeId -> String -> Process () -- | Named send to a process in a remote registry (asynchronous) nsendRemote :: Serializable a => NodeId -> String -> a -> Process () -- | (Asynchronous) reply from whereis data WhereIsReply WhereIsReply :: String -> (Maybe ProcessId) -> WhereIsReply -- | Catch exceptions within a process catch :: Exception e => Process a -> (e -> Process a) -> Process a -- | Like expect but with a timeout expectTimeout :: Serializable a => Int -> Process (Maybe a) -- | Asynchronous version of spawn -- -- (spawn is defined in terms of spawnAsync and -- expect) spawnAsync :: NodeId -> Closure (Process ()) -> Process SpawnRef -- | Spawn a child process, have the child link to the parent and the -- parent monitor the child spawnSupervised :: NodeId -> Closure (Process ()) -> Process (ProcessId, MonitorRef) -- | Spawn a process and link to it -- -- Note that this is just the sequential composition of spawn and -- link. (The Unified semantics that underlies Cloud -- Haskell does not even support a synchronous link operation) spawnLink :: NodeId -> Closure (Process ()) -> Process ProcessId -- | Like spawnLink, but monitor the spawned process spawnMonitor :: NodeId -> Closure (Process ()) -> Process (ProcessId, MonitorRef) -- | Spawn a new process, supplying it with a new ReceivePort and -- return the corresponding SendPort. spawnChannel :: Typeable a => Static (SerializableDict a) -> NodeId -> Closure (ReceivePort a -> Process ()) -> Process (SendPort a) -- | (Asynchronius) reply from spawn data DidSpawn DidSpawn :: SpawnRef -> ProcessId -> DidSpawn -- | Spawn a process on the local node spawnLocal :: Process () -> Process ProcessId -- | Create a new typed channel, spawn a process on the local node, passing -- it the receive port, and return the send port spawnChannelLocal :: Serializable a => (ReceivePort a -> Process ()) -> Process (SendPort a)