-- Hoogle documentation, generated by Haddock
-- See Hoogle, http://www.haskell.org/hoogle/
-- | Cloud Haskell: Erlang-style concurrency in Haskell
--
-- This is an implementation of Cloud Haskell, as described in Towards
-- Haskell in the Cloud by Jeff Epstein, Andrew Black, and Simon
-- Peyton Jones
-- (http://research.microsoft.com/en-us/um/people/simonpj/papers/parallel/),
-- although some of the details are different. The precise message
-- passing semantics are based on A unified semantics for future
-- Erlang by Hans Svensson, Lars-Åke Fredlund and Clara Benac Earle.
-- You will probably also want to install a Cloud Haskell backend such as
-- distributed-process-simplelocalnet.
@package distributed-process
@version 0.2.0
-- | Binary instances for TypeRep
module Control.Distributed.Process.Internal.TypeRep
instance Binary TyCon
instance Binary TypeRep
instance Binary Fingerprint
-- | Concurrent queue for single reader, single writer
module Control.Distributed.Process.Internal.CQueue
data CQueue a
data BlockSpec
NonBlocking :: BlockSpec
Blocking :: BlockSpec
Timeout :: Int -> BlockSpec
newCQueue :: IO (CQueue a)
enqueue :: CQueue a -> a -> IO ()
-- | Dequeue an element
--
-- The timeout (if any) is applied only to waiting for incoming messages,
-- not to checking messages that have already arrived
dequeue :: CQueue a -> BlockSpec -> [a -> Maybe b] -> IO (Maybe b)
module Control.Distributed.Process.Serializable
-- | Objects that can be sent across the network
class (Binary a, Typeable a) => Serializable a
-- | Encode type representation as a bytestring
encodeFingerprint :: Fingerprint -> ByteString
-- | Decode a bytestring into a fingerprint. Throws an IO exception on
-- failure
decodeFingerprint :: ByteString -> Fingerprint
-- | The fingerprint of the typeRep of the argument
fingerprint :: Typeable a => a -> Fingerprint
-- | Size of a fingerprint
sizeOfFingerprint :: Int
data Fingerprint :: *
-- | Show fingerprint (for debugging purposes)
showFingerprint :: Fingerprint -> ShowS
instance (Binary a, Typeable a) => Serializable a
-- | Reimplementation of Dynamic that supports dynBind
--
-- We don't have access to the internal representation of Dynamic,
-- otherwise we would not have to redefine it completely. Note that we
-- use this only internally, so the incompatibility with our
-- Dynamic from the standard Dynamic is not important.
module Control.Distributed.Process.Internal.Dynamic
data Dynamic
Dynamic :: TypeRep -> Any -> Dynamic
toDyn :: Typeable a => a -> Dynamic
fromDyn :: Typeable a => Dynamic -> a -> a
fromDynamic :: Typeable a => Dynamic -> Maybe a
dynTypeRep :: Dynamic -> TypeRep
dynApply :: Dynamic -> Dynamic -> Maybe Dynamic
dynApp :: Dynamic -> Dynamic -> Dynamic
-- | The function unsafeCoerce# allows you to side-step the
-- typechecker entirely. That is, it allows you to coerce any type into
-- any other type. If you use this function, you had better get it right,
-- otherwise segmentation faults await. It is generally used when you
-- want to write a program that you know is well-typed, but where
-- Haskell's type system is not expressive enough to prove that it is
-- well typed.
--
-- The following uses of unsafeCoerce# are supposed to work
-- (i.e. not lead to spurious compile-time or run-time crashes):
--
--
-- - Casting any lifted type to Any
-- - Casting Any back to the real type
-- - Casting an unboxed type to another unboxed type of the same size
-- (but not coercions between floating-point and integral types)
-- - Casting between two types that have the same runtime
-- representation. One case is when the two types differ only in
-- "phantom" type parameters, for example Ptr Int to Ptr
-- Float, or [Int] to [Float] when the list is
-- known to be empty. Also, a newtype of a type T has
-- the same representation at runtime as T.
--
--
-- Other uses of unsafeCoerce# are undefined. In particular, you
-- should not use unsafeCoerce# to cast a T to an algebraic data
-- type D, unless T is also an algebraic data type. For example, do not
-- cast Int->Int to Bool, even if you later cast
-- that Bool back to Int->Int before applying it.
-- The reasons have to do with GHC's internal representation details (for
-- the congnoscenti, data values can be entered but function closures
-- cannot). If you want a safe type to cast things to, use Any,
-- which is not an algebraic data type.
unsafeCoerce# :: a -> b
instance Show Dynamic
-- | Types used throughout the Cloud Haskell framework
--
-- We collect all types used internally in a single module because many
-- of these data types are mutually recursive and cannot be split across
-- modules.
module Control.Distributed.Process.Internal.Types
-- | Node identifier
newtype NodeId
NodeId :: EndPointAddress -> NodeId
nodeAddress :: NodeId -> EndPointAddress
-- | A local process ID consists of a seed which distinguishes processes
-- from different instances of the same local node and a counter
data LocalProcessId
LocalProcessId :: Int32 -> Int32 -> LocalProcessId
lpidUnique :: LocalProcessId -> Int32
lpidCounter :: LocalProcessId -> Int32
-- | Process identifier
data ProcessId
ProcessId :: NodeId -> LocalProcessId -> ProcessId
-- | The ID of the node the process is running on
processNodeId :: ProcessId -> NodeId
-- | Node-local identifier for the process
processLocalId :: ProcessId -> LocalProcessId
-- | Union of all kinds of identifiers
data Identifier
NodeIdentifier :: NodeId -> Identifier
ProcessIdentifier :: ProcessId -> Identifier
SendPortIdentifier :: SendPortId -> Identifier
nodeOf :: Identifier -> NodeId
-- | Local nodes
data LocalNode
LocalNode :: NodeId -> EndPoint -> MVar LocalNodeState -> Chan NCMsg -> RemoteTable -> LocalNode
-- | NodeId of the node
localNodeId :: LocalNode -> NodeId
-- | The network endpoint associated with this node
localEndPoint :: LocalNode -> EndPoint
-- | Local node state
localState :: LocalNode -> MVar LocalNodeState
-- | Channel for the node controller
localCtrlChan :: LocalNode -> Chan NCMsg
-- | Runtime lookup table for supporting closures TODO: this should be part
-- of the CH state, not the local endpoint state
remoteTable :: LocalNode -> RemoteTable
-- | Local node state
data LocalNodeState
LocalNodeState :: Map LocalProcessId LocalProcess -> Int32 -> Int32 -> LocalNodeState
_localProcesses :: LocalNodeState -> Map LocalProcessId LocalProcess
_localPidCounter :: LocalNodeState -> Int32
_localPidUnique :: LocalNodeState -> Int32
-- | Processes running on our local node
data LocalProcess
LocalProcess :: CQueue Message -> ProcessId -> MVar LocalProcessState -> ThreadId -> LocalProcess
processQueue :: LocalProcess -> CQueue Message
processId :: LocalProcess -> ProcessId
processState :: LocalProcess -> MVar LocalProcessState
processThread :: LocalProcess -> ThreadId
-- | Local process state
data LocalProcessState
LocalProcessState :: Int32 -> Int32 -> Int32 -> Map LocalSendPortId TypedChannel -> LocalProcessState
_monitorCounter :: LocalProcessState -> Int32
_spawnCounter :: LocalProcessState -> Int32
_channelCounter :: LocalProcessState -> Int32
_typedChannels :: LocalProcessState -> Map LocalSendPortId TypedChannel
-- | The Cloud Haskell Process type
newtype Process a
Process :: ReaderT LocalProcess (MessageT IO) a -> Process a
unProcess :: Process a -> ReaderT LocalProcess (MessageT IO) a
procMsg :: MessageT IO a -> Process a
type LocalSendPortId = Int32
-- | A send port is identified by a SendPortId.
--
-- You cannot send directly to a SendPortId; instead, use
-- newChan to create a SendPort.
data SendPortId
SendPortId :: ProcessId -> LocalSendPortId -> SendPortId
-- | The ID of the process that will receive messages sent on this port
sendPortProcessId :: SendPortId -> ProcessId
-- | Process-local ID of the channel
sendPortLocalId :: SendPortId -> LocalSendPortId
data TypedChannel
TypedChannel :: (TChan a) -> TypedChannel
-- | The send send of a typed channel (serializable)
newtype SendPort a
SendPort :: SendPortId -> SendPort a
-- | The (unique) ID of this send port
sendPortId :: SendPort a -> SendPortId
-- | The receive end of a typed channel (not serializable)
data ReceivePort a
-- | A single receive port
ReceivePortSingle :: (TChan a) -> ReceivePort a
-- | A left-biased combination of receive ports
ReceivePortBiased :: [ReceivePort a] -> ReceivePort a
-- | A round-robin combination of receive ports
ReceivePortRR :: (TVar [ReceivePort a]) -> ReceivePort a
data StaticLabel
UserStatic :: String -> StaticLabel
ClosureReturn :: StaticLabel
ClosureSend :: StaticLabel
ClosureExpect :: StaticLabel
ClosureApply :: StaticLabel
ClosureConst :: StaticLabel
ClosureUnit :: StaticLabel
CpId :: StaticLabel
CpComp :: StaticLabel
CpFirst :: StaticLabel
CpSwap :: StaticLabel
CpCopy :: StaticLabel
CpLeft :: StaticLabel
CpMirror :: StaticLabel
CpUntag :: StaticLabel
CpApply :: StaticLabel
-- | A static value is one that is bound at top-level.
newtype Static a
Static :: StaticLabel -> Static a
-- | A closure is a static value and an encoded environment
data Closure a
Closure :: (Static (ByteString -> a)) -> ByteString -> Closure a
-- | Used to fake static (see paper)
data RemoteTable
RemoteTable :: Map String Dynamic -> Map TypeRep RuntimeSerializableSupport -> RemoteTable
-- | If the user creates a closure of type a -> Closure b from
-- a function f : a -> b, then _remoteTableLabels
-- should have an entry for f of type ByteString -> b
-- (basically, f . encode)
_remoteTableLabels :: RemoteTable -> Map String Dynamic
-- | Runtime counterpart to SerializableDict
_remoteTableDicts :: RemoteTable -> Map TypeRep RuntimeSerializableSupport
-- | Reification of Serializable (see
-- Control.Distributed.Process.Closure)
data SerializableDict a
SerializableDict :: SerializableDict a
-- | Runtime support for implementing polymorphic functions with a
-- Serializable qualifier (sendClosure, returnClosure, ..).
--
-- We don't attempt to keep this minimal, but instead just add functions
-- as convenient. This will be replaced anyway once static has
-- been implemented.
data RuntimeSerializableSupport
RuntimeSerializableSupport :: Dynamic -> Dynamic -> Dynamic -> RuntimeSerializableSupport
rssSend :: RuntimeSerializableSupport -> Dynamic
rssReturn :: RuntimeSerializableSupport -> Dynamic
rssExpect :: RuntimeSerializableSupport -> Dynamic
-- | Messages consist of their typeRep fingerprint and their encoding
data Message
Message :: Fingerprint -> ByteString -> Message
messageFingerprint :: Message -> Fingerprint
messageEncoding :: Message -> ByteString
-- | Turn any serialiable term into a message
createMessage :: Serializable a => a -> Message
-- | Serialize a message
messageToPayload :: Message -> [ByteString]
-- | Deserialize a message
payloadToMessage :: [ByteString] -> Message
-- | MonitorRef is opaque for regular Cloud Haskell processes
data MonitorRef
MonitorRef :: Identifier -> Int32 -> MonitorRef
-- | ID of the entity to be monitored
monitorRefIdent :: MonitorRef -> Identifier
-- | Unique to distinguish multiple monitor requests by the same process
monitorRefCounter :: MonitorRef -> Int32
-- | Message sent by process monitors
data ProcessMonitorNotification
ProcessMonitorNotification :: MonitorRef -> ProcessId -> DiedReason -> ProcessMonitorNotification
-- | Message sent by node monitors
data NodeMonitorNotification
NodeMonitorNotification :: MonitorRef -> NodeId -> DiedReason -> NodeMonitorNotification
-- | Message sent by channel (port) monitors
data PortMonitorNotification
PortMonitorNotification :: MonitorRef -> SendPortId -> DiedReason -> PortMonitorNotification
-- | Exceptions thrown when a linked process dies
data ProcessLinkException
ProcessLinkException :: ProcessId -> DiedReason -> ProcessLinkException
-- | Exception thrown when a linked node dies
data NodeLinkException
NodeLinkException :: NodeId -> DiedReason -> NodeLinkException
-- | Exception thrown when a linked channel (port) dies
data PortLinkException
PortLinkException :: SendPortId -> DiedReason -> PortLinkException
-- | Why did a process die?
data DiedReason
-- | Normal termination
DiedNormal :: DiedReason
-- | The process exited with an exception (provided as String
-- because Exception does not implement Binary)
DiedException :: String -> DiedReason
-- | We got disconnected from the process node
DiedDisconnect :: DiedReason
-- | The process node died
DiedNodeDown :: DiedReason
-- | Invalid (processnodechannel) identifier
DiedUnknownId :: DiedReason
-- | (Asynchronous) reply from unmonitor
newtype DidUnmonitor
DidUnmonitor :: MonitorRef -> DidUnmonitor
-- | (Asynchronous) reply from unlink
newtype DidUnlinkProcess
DidUnlinkProcess :: ProcessId -> DidUnlinkProcess
-- | (Asynchronous) reply from unlinkNode
newtype DidUnlinkNode
DidUnlinkNode :: NodeId -> DidUnlinkNode
-- | (Asynchronous) reply from unlinkPort
newtype DidUnlinkPort
DidUnlinkPort :: SendPortId -> DidUnlinkPort
-- | SpawnRef are used to return pids of spawned processes
newtype SpawnRef
SpawnRef :: Int32 -> SpawnRef
-- | (Asynchronius) reply from spawn
data DidSpawn
DidSpawn :: SpawnRef -> ProcessId -> DidSpawn
-- | (Asynchronous) reply from whereis
data WhereIsReply
WhereIsReply :: String -> (Maybe ProcessId) -> WhereIsReply
-- | Messages to the node controller
data NCMsg
NCMsg :: Identifier -> ProcessSignal -> NCMsg
ctrlMsgSender :: NCMsg -> Identifier
ctrlMsgSignal :: NCMsg -> ProcessSignal
-- | Signals to the node controller (see NCMsg)
data ProcessSignal
Link :: Identifier -> ProcessSignal
Unlink :: Identifier -> ProcessSignal
Monitor :: MonitorRef -> ProcessSignal
Unmonitor :: MonitorRef -> ProcessSignal
Died :: Identifier -> DiedReason -> ProcessSignal
Spawn :: (Closure (Process ())) -> SpawnRef -> ProcessSignal
WhereIs :: String -> ProcessSignal
Register :: String -> (Maybe ProcessId) -> ProcessSignal
NamedSend :: String -> Message -> ProcessSignal
newtype MessageT m a
MessageT :: StateT MessageState m a -> MessageT m a
unMessageT :: MessageT m a -> StateT MessageState m a
data MessageState
MessageState :: LocalNode -> Map Identifier Connection -> MessageState
messageLocalNode :: MessageState -> LocalNode
_messageConnections :: MessageState -> Map Identifier Connection
localProcesses :: Accessor LocalNodeState (Map LocalProcessId LocalProcess)
localPidCounter :: Accessor LocalNodeState Int32
localPidUnique :: Accessor LocalNodeState Int32
localProcessWithId :: LocalProcessId -> Accessor LocalNodeState (Maybe LocalProcess)
monitorCounter :: Accessor LocalProcessState Int32
spawnCounter :: Accessor LocalProcessState Int32
channelCounter :: Accessor LocalProcessState LocalSendPortId
typedChannels :: Accessor LocalProcessState (Map LocalSendPortId TypedChannel)
typedChannelWithId :: LocalSendPortId -> Accessor LocalProcessState (Maybe TypedChannel)
remoteTableLabels :: Accessor RemoteTable (Map String Dynamic)
remoteTableDicts :: Accessor RemoteTable (Map TypeRep RuntimeSerializableSupport)
remoteTableLabel :: String -> Accessor RemoteTable (Maybe Dynamic)
remoteTableDict :: TypeRep -> Accessor RemoteTable (Maybe RuntimeSerializableSupport)
instance Typeable LocalProcessId
instance Typeable ProcessId
instance Typeable1 SendPort
instance Typeable StaticLabel
instance Typeable1 Static
instance Typeable1 Closure
instance Typeable1 SerializableDict
instance Typeable PortLinkException
instance Typeable NodeLinkException
instance Typeable ProcessLinkException
instance Typeable PortMonitorNotification
instance Typeable NodeMonitorNotification
instance Typeable ProcessMonitorNotification
instance Typeable DidUnmonitor
instance Typeable DidUnlinkProcess
instance Typeable DidUnlinkNode
instance Typeable DidUnlinkPort
instance Typeable SpawnRef
instance Typeable DidSpawn
instance Typeable WhereIsReply
instance Typeable1 Process
instance Eq NodeId
instance Ord NodeId
instance Binary NodeId
instance Eq LocalProcessId
instance Ord LocalProcessId
instance Eq ProcessId
instance Ord ProcessId
instance Eq SendPortId
instance Ord SendPortId
instance Eq Identifier
instance Ord Identifier
instance Binary (SendPort a)
instance Show (SendPort a)
instance Eq (SendPort a)
instance Ord (SendPort a)
instance Show StaticLabel
instance Show (Static a)
instance Show (Closure a)
instance Eq MonitorRef
instance Ord MonitorRef
instance Show MonitorRef
instance Show DiedReason
instance Eq DiedReason
instance Show PortLinkException
instance Show NodeLinkException
instance Show ProcessLinkException
instance Show PortMonitorNotification
instance Show NodeMonitorNotification
instance Show ProcessMonitorNotification
instance Binary DidUnmonitor
instance Binary DidUnlinkProcess
instance Binary DidUnlinkNode
instance Binary DidUnlinkPort
instance Show SpawnRef
instance Binary SpawnRef
instance Eq SpawnRef
instance Show DidSpawn
instance Show WhereIsReply
instance Show NCMsg
instance Show ProcessSignal
instance Functor Process
instance Monad Process
instance MonadIO Process
instance MonadReader LocalProcess Process
instance Applicative Process
instance Functor m => Functor (MessageT m)
instance Monad m => Monad (MessageT m)
instance MonadIO m => MonadIO (MessageT m)
instance Monad m => MonadState MessageState (MessageT m)
instance (Monad m, Functor m) => Applicative (MessageT m)
instance Binary WhereIsReply
instance Binary StaticLabel
instance Binary Identifier
instance Binary SendPortId
instance Binary DidSpawn
instance Binary (Closure a)
instance Binary DiedReason
instance Binary ProcessSignal
instance Binary MonitorRef
instance Binary NCMsg
instance Binary PortMonitorNotification
instance Binary NodeMonitorNotification
instance Binary ProcessMonitorNotification
instance Binary ProcessId
instance Binary LocalProcessId
instance Exception PortLinkException
instance Exception NodeLinkException
instance Exception ProcessLinkException
instance Show Message
instance Show SendPortId
instance Show Identifier
instance Show ProcessId
instance Show LocalProcessId
instance Show NodeId
module Control.Distributed.Process.Internal.Closure.Combinators
closureApply :: Closure (a -> b) -> Closure a -> Closure b
closureConst :: (Typeable a, Typeable b) => Closure (a -> b -> a)
closureUnit :: Closure ()
type CP a b = Closure (a -> Process b)
cpIntro :: (Typeable a, Typeable b) => Closure (Process b) -> CP a b
cpElim :: Typeable a => CP () a -> Closure (Process a)
cpId :: Typeable a => CP a a
cpComp :: (Typeable a, Typeable b, Typeable c) => CP a b -> CP b c -> CP a c
cpFirst :: (Typeable a, Typeable b, Typeable c) => CP a b -> CP (a, c) (b, c)
cpSwap :: (Typeable a, Typeable b) => CP (a, b) (b, a)
cpSecond :: (Typeable a, Typeable b, Typeable c) => CP a b -> CP (c, a) (c, b)
cpPair :: (Typeable a, Typeable a', Typeable b, Typeable b') => CP a b -> CP a' b' -> CP (a, a') (b, b')
cpCopy :: Typeable a => CP a (a, a)
cpFanOut :: (Typeable a, Typeable b, Typeable c) => CP a b -> CP a c -> CP a (b, c)
cpLeft :: (Typeable a, Typeable b, Typeable c) => CP a b -> CP (Either a c) (Either b c)
cpMirror :: (Typeable a, Typeable b) => CP (Either a b) (Either b a)
cpRight :: (Typeable a, Typeable b, Typeable c) => CP a b -> CP (Either c a) (Either c b)
cpEither :: (Typeable a, Typeable a', Typeable b, Typeable b') => CP a b -> CP a' b' -> CP (Either a a') (Either b b')
cpUntag :: Typeable a => CP (Either a a) a
cpFanIn :: (Typeable a, Typeable b, Typeable c) => CP a c -> CP b c -> CP (Either a b) c
cpApply :: (Typeable a, Typeable b) => CP (CP a b, a) b
cpBind :: (Typeable a, Typeable b) => Closure (Process a) -> Closure (a -> Process b) -> Closure (Process b)
cpSeq :: Closure (Process ()) -> Closure (Process ()) -> Closure (Process ())
-- | Add message sending capability to a monad
--
-- NOTE: Not thread-safe (you should not do concurrent sends within the
-- same monad).
module Control.Distributed.Process.Internal.MessageT
data MessageT m a
runMessageT :: Monad m => LocalNode -> MessageT m a -> m a
getLocalNode :: Monad m => MessageT m LocalNode
sendPayload :: MonadIO m => Identifier -> [ByteString] -> MessageT m ()
sendBinary :: (MonadIO m, Binary a) => Identifier -> a -> MessageT m ()
sendMessage :: (MonadIO m, Serializable a) => Identifier -> a -> MessageT m ()
-- | Deserialize a message
payloadToMessage :: [ByteString] -> Message
-- | Turn any serialiable term into a message
createMessage :: Serializable a => a -> Message
module Control.Distributed.Process.Internal.Node
-- | Deconstructor for Process (not exported to the public API)
runLocalProcess :: LocalNode -> Process a -> LocalProcess -> IO a
-- | Cloud Haskell primitives
--
-- We define these in a separate module so that we don't have to rely on
-- the closure combinators
module Control.Distributed.Process.Internal.Primitives
-- | Send a message
send :: Serializable a => ProcessId -> a -> Process ()
-- | Wait for a message of a specific type
expect :: Serializable a => Process a
-- | Create a new typed channel
newChan :: Serializable a => Process (SendPort a, ReceivePort a)
-- | Send a message on a typed channel
sendChan :: Serializable a => SendPort a -> a -> Process ()
-- | Wait for a message on a typed channel
receiveChan :: Serializable a => ReceivePort a -> Process a
-- | Merge a list of typed channels.
--
-- The result port is left-biased: if there are messages available on
-- more than one port, the first available message is returned.
mergePortsBiased :: Serializable a => [ReceivePort a] -> Process (ReceivePort a)
-- | Like mergePortsBiased, but with a round-robin scheduler (rather
-- than left-biased)
mergePortsRR :: Serializable a => [ReceivePort a] -> Process (ReceivePort a)
-- | Opaque type used in receiveWait and receiveTimeout
data Match b
-- | Test the matches in order against each message in the queue
receiveWait :: [Match b] -> Process b
-- | Like receiveWait but with a timeout.
--
-- If the timeout is zero do a non-blocking check for matching messages.
-- A non-zero timeout is applied only when waiting for incoming messages
-- (that is, after we have checked the messages that are already
-- in the mailbox).
receiveTimeout :: Int -> [Match b] -> Process (Maybe b)
-- | Match against any message of the right type
match :: Serializable a => (a -> Process b) -> Match b
-- | Match against any message of the right type that satisfies a predicate
matchIf :: Serializable a => (a -> Bool) -> (a -> Process b) -> Match b
-- | Remove any message from the queue
matchUnknown :: Process b -> Match b
-- | Terminate (throws a ProcessTerminationException)
terminate :: Process a
-- | Thrown by terminate
data ProcessTerminationException
ProcessTerminationException :: ProcessTerminationException
-- | Our own process ID
getSelfPid :: Process ProcessId
-- | Get the node ID of our local node
getSelfNode :: Process NodeId
-- | Link to a remote process (asynchronous)
--
-- Note that link provides unidirectional linking (see
-- spawnSupervised). Linking makes no distinction between normal
-- and abnormal termination of the remote process.
link :: ProcessId -> Process ()
-- | Remove a link (synchronous)
unlink :: ProcessId -> Process ()
-- | Monitor another process (asynchronous)
monitor :: ProcessId -> Process MonitorRef
-- | Remove a monitor (synchronous)
unmonitor :: MonitorRef -> Process ()
-- | Log a string
--
-- say message sends a message (time, pid of the current
-- process, message) to the process registered as logger. By
-- default, this process simply sends the string to stderr.
-- Individual Cloud Haskell backends might replace this with a different
-- logger process, however.
say :: String -> Process ()
-- | Register a process with the local registry (asynchronous).
--
-- The process to be registered does not have to be local itself.
register :: String -> ProcessId -> Process ()
-- | Remove a process from the local registry (asynchronous).
unregister :: String -> Process ()
-- | Query the local process registry (synchronous).
whereis :: String -> Process (Maybe ProcessId)
-- | Named send to a process in the local registry (asynchronous)
nsend :: Serializable a => String -> a -> Process ()
-- | Register a process with a remote registry (asynchronous).
--
-- The process to be registered does not have to live on the same remote
-- node.
registerRemote :: NodeId -> String -> ProcessId -> Process ()
-- | Remove a process from a remote registry (asynchronous).
unregisterRemote :: NodeId -> String -> Process ()
-- | Query a remote process registry (synchronous)
whereisRemote :: NodeId -> String -> Process (Maybe ProcessId)
-- | Query a remote process registry (asynchronous)
--
-- Reply will come in the form of a WhereIsReply message
whereisRemoteAsync :: NodeId -> String -> Process ()
-- | Named send to a process in a remote registry (asynchronous)
nsendRemote :: Serializable a => NodeId -> String -> a -> Process ()
-- | Catch exceptions within a process
catch :: Exception e => Process a -> (e -> Process a) -> Process a
-- | Like expect but with a timeout
expectTimeout :: Serializable a => Int -> Process (Maybe a)
-- | Asynchronous version of spawn
--
-- (spawn is defined in terms of spawnAsync and
-- expect)
spawnAsync :: NodeId -> Closure (Process ()) -> Process SpawnRef
-- | Link to a node
linkNode :: NodeId -> Process ()
-- | Link to a channel (send port)
linkPort :: SendPort a -> Process ()
-- | Remove a node link (synchronous)
unlinkNode :: NodeId -> Process ()
-- | Remove a channel (send port) link (synchronous)
unlinkPort :: SendPort a -> Process ()
-- | Monitor a node
monitorNode :: NodeId -> Process MonitorRef
-- | Monitor a typed channel
monitorPort :: Serializable a => SendPort a -> Process MonitorRef
instance Typeable ProcessTerminationException
instance Show ProcessTerminationException
instance Exception ProcessTerminationException
-- | Template Haskell support
--
-- (In a separate file for convenience)
module Control.Distributed.Process.Internal.Closure.TH
-- | Create the closure, decoder, and metadata definitions for the given
-- list of functions
remotable :: [Name] -> Q [Dec]
-- | Create a closure
--
-- If f :: a -> b then mkClosure :: a -> Closure
-- b. Make sure to pass f as an argument to
-- remotable too.
mkClosure :: Name -> Q Exp
module Control.Distributed.Process.Internal.Closure.BuiltIn
remoteTable :: RemoteTable -> RemoteTable
serializableDictUnit :: SerializableDict ()
-- | Closure version of link
linkClosure :: ProcessId -> Closure (Process ())
-- | Closure version of unlink
unlinkClosure :: ProcessId -> Closure (Process ())
-- | Closure version of send
sendClosure :: SerializableDict a -> ProcessId -> Closure (a -> Process ())
-- | Return any value
returnClosure :: SerializableDict a -> a -> Closure (Process a)
-- | Closure version of expect
expectClosure :: SerializableDict a -> Closure (Process a)
module Control.Distributed.Process.Internal.Closure
-- | Initial (empty) remote-call meta data
initRemoteTable :: RemoteTable
resolveClosure :: RemoteTable -> StaticLabel -> ByteString -> Maybe Dynamic
-- | Implementation of Closure that works around the absence of
-- static.
--
--
--
-- We offer a number of standard commonly useful closures.
--
--
-- - Closure combinators
--
--
-- Closures combinators allow to create closures from other closures. For
-- example, spawnSupervised is defined as follows:
--
--
-- spawnSupervised :: NodeId
-- -> Closure (Process ())
-- -> Process (ProcessId, MonitorRef)
-- spawnSupervised nid proc = do
-- us <- getSelfPid
-- them <- spawn nid (linkClosure us `cpSeq` proc)
-- ref <- monitor them
-- return (them, ref)
--
--
--
-- - User-defined closures
--
--
-- Suppose we have a monomorphic function
--
--
-- addInt :: Int -> Int -> Int
-- addInt x y = x + y
--
--
-- Then the Template Haskell splice
--
--
-- remotable ['addInt]
--
--
-- creates a function
--
--
-- $(mkClosure 'addInt) :: Int -> Closure (Int -> Int)
--
--
-- which can be used to partially apply addInt and turn it into
-- a Closure, which can be sent across the network. Closures can
-- be deserialized with
--
--
-- unClosure :: Typeable a => Closure a -> Process a
--
--
-- In general, given a monomorphic function f :: a -> b the
-- corresponding function $(mkClosure 'f) will have type a
-- -> Closure b.
--
-- The call to remotable will also generate a function
--
--
-- __remoteTable :: RemoteTable -> RemoteTable
--
--
-- which can be used to construct the RemoteTable used to
-- initialize Cloud Haskell. You should have (at most) one call to
-- remotable per module, and compose all created functions when
-- initializing Cloud Haskell:
--
--
-- let rtable = M1.__remoteTable
-- . M2.__remoteTable
-- . ...
-- . Mn.__remoteTable
-- $ initRemoteTable
--
--
-- See Section 6, Faking It, of Towards Haskell in the
-- Cloud for more info.
--
--
-- - Serializable Dictionaries
--
--
-- Some functions (such as sendClosure or returnClosure)
-- require an explicit (reified) serializable dictionary. To create such
-- a dictionary do
--
--
-- serializableDictInt :: SerializableDict Int
-- serializableDictInt = SerializableDict
--
--
-- and then pass 'serializableDictInt to remotable. This
-- will fail if the type is not serializable.
module Control.Distributed.Process.Closure
-- | Create the closure, decoder, and metadata definitions for the given
-- list of functions
remotable :: [Name] -> Q [Dec]
-- | Create a closure
--
-- If f :: a -> b then mkClosure :: a -> Closure
-- b. Make sure to pass f as an argument to
-- remotable too.
mkClosure :: Name -> Q Exp
-- | Reification of Serializable (see
-- Control.Distributed.Process.Closure)
data SerializableDict a
SerializableDict :: SerializableDict a
-- | Closure version of link
linkClosure :: ProcessId -> Closure (Process ())
-- | Closure version of unlink
unlinkClosure :: ProcessId -> Closure (Process ())
-- | Closure version of send
sendClosure :: SerializableDict a -> ProcessId -> Closure (a -> Process ())
-- | Return any value
returnClosure :: SerializableDict a -> a -> Closure (Process a)
-- | Closure version of expect
expectClosure :: SerializableDict a -> Closure (Process a)
closureApply :: Closure (a -> b) -> Closure a -> Closure b
closureConst :: (Typeable a, Typeable b) => Closure (a -> b -> a)
closureUnit :: Closure ()
type CP a b = Closure (a -> Process b)
cpIntro :: (Typeable a, Typeable b) => Closure (Process b) -> CP a b
cpElim :: Typeable a => CP () a -> Closure (Process a)
cpId :: Typeable a => CP a a
cpComp :: (Typeable a, Typeable b, Typeable c) => CP a b -> CP b c -> CP a c
cpFirst :: (Typeable a, Typeable b, Typeable c) => CP a b -> CP (a, c) (b, c)
cpSwap :: (Typeable a, Typeable b) => CP (a, b) (b, a)
cpSecond :: (Typeable a, Typeable b, Typeable c) => CP a b -> CP (c, a) (c, b)
cpPair :: (Typeable a, Typeable a', Typeable b, Typeable b') => CP a b -> CP a' b' -> CP (a, a') (b, b')
cpCopy :: Typeable a => CP a (a, a)
cpFanOut :: (Typeable a, Typeable b, Typeable c) => CP a b -> CP a c -> CP a (b, c)
cpLeft :: (Typeable a, Typeable b, Typeable c) => CP a b -> CP (Either a c) (Either b c)
cpMirror :: (Typeable a, Typeable b) => CP (Either a b) (Either b a)
cpRight :: (Typeable a, Typeable b, Typeable c) => CP a b -> CP (Either c a) (Either c b)
cpEither :: (Typeable a, Typeable a', Typeable b, Typeable b') => CP a b -> CP a' b' -> CP (Either a a') (Either b b')
cpUntag :: Typeable a => CP (Either a a) a
cpFanIn :: (Typeable a, Typeable b, Typeable c) => CP a c -> CP b c -> CP (Either a b) c
cpApply :: (Typeable a, Typeable b) => CP (CP a b, a) b
cpBind :: (Typeable a, Typeable b) => Closure (Process a) -> Closure (a -> Process b) -> Closure (Process b)
cpSeq :: Closure (Process ()) -> Closure (Process ()) -> Closure (Process ())
-- | Local nodes
module Control.Distributed.Process.Node
-- | Local nodes
data LocalNode
-- | Initialize a new local node.
--
-- Note that proper Cloud Haskell initialization and configuration is
-- still to do.
newLocalNode :: Transport -> RemoteTable -> IO LocalNode
-- | Force-close a local node
--
-- TODO: for now we just close the associated endpoint
closeLocalNode :: LocalNode -> IO ()
-- | Spawn a new process on a local node
forkProcess :: LocalNode -> Process () -> IO ProcessId
-- | Run a process on a local node and wait for it to finish
runProcess :: LocalNode -> Process () -> IO ()
-- | Initial (empty) remote-call meta data
initRemoteTable :: RemoteTable
-- | NodeId of the node
localNodeId :: LocalNode -> NodeId
instance Functor NC
instance Monad NC
instance MonadIO NC
instance MonadState NCState NC
-- |
--
-- This is an implementation of Cloud Haskell, as described in Towards
-- Haskell in the Cloud by Jeff Epstein, Andrew Black, and Simon
-- Peyton Jones
-- (http://research.microsoft.com/en-us/um/people/simonpj/papers/parallel/),
-- although some of the details are different. The precise message
-- passing semantics are based on A unified semantics for future
-- Erlang by Hans Svensson, Lars-Åke Fredlund and Clara Benac Earle.
module Control.Distributed.Process
-- | Process identifier
data ProcessId
-- | Node identifier
data NodeId
-- | The Cloud Haskell Process type
data Process a
-- | A send port is identified by a SendPortId.
--
-- You cannot send directly to a SendPortId; instead, use
-- newChan to create a SendPort.
data SendPortId
-- | The ID of the node the process is running on
processNodeId :: ProcessId -> NodeId
-- | The ID of the process that will receive messages sent on this port
sendPortProcessId :: SendPortId -> ProcessId
-- | Lift a computation from the IO monad.
liftIO :: MonadIO m => forall a. IO a -> m a
-- | Send a message
send :: Serializable a => ProcessId -> a -> Process ()
-- | Wait for a message of a specific type
expect :: Serializable a => Process a
-- | The receive end of a typed channel (not serializable)
data ReceivePort a
-- | The send send of a typed channel (serializable)
data SendPort a
-- | The (unique) ID of this send port
sendPortId :: SendPort a -> SendPortId
-- | Create a new typed channel
newChan :: Serializable a => Process (SendPort a, ReceivePort a)
-- | Send a message on a typed channel
sendChan :: Serializable a => SendPort a -> a -> Process ()
-- | Wait for a message on a typed channel
receiveChan :: Serializable a => ReceivePort a -> Process a
-- | Merge a list of typed channels.
--
-- The result port is left-biased: if there are messages available on
-- more than one port, the first available message is returned.
mergePortsBiased :: Serializable a => [ReceivePort a] -> Process (ReceivePort a)
-- | Like mergePortsBiased, but with a round-robin scheduler (rather
-- than left-biased)
mergePortsRR :: Serializable a => [ReceivePort a] -> Process (ReceivePort a)
-- | Opaque type used in receiveWait and receiveTimeout
data Match b
-- | Test the matches in order against each message in the queue
receiveWait :: [Match b] -> Process b
-- | Like receiveWait but with a timeout.
--
-- If the timeout is zero do a non-blocking check for matching messages.
-- A non-zero timeout is applied only when waiting for incoming messages
-- (that is, after we have checked the messages that are already
-- in the mailbox).
receiveTimeout :: Int -> [Match b] -> Process (Maybe b)
-- | Match against any message of the right type
match :: Serializable a => (a -> Process b) -> Match b
-- | Match against any message of the right type that satisfies a predicate
matchIf :: Serializable a => (a -> Bool) -> (a -> Process b) -> Match b
-- | Remove any message from the queue
matchUnknown :: Process b -> Match b
-- | Spawn a process
spawn :: NodeId -> Closure (Process ()) -> Process ProcessId
-- | Run a process remotely and wait for it to reply
--
-- We monitor the remote process; if it dies before it can send a reply,
-- we die too
call :: SerializableDict a -> NodeId -> Closure (Process a) -> Process a
-- | Terminate (throws a ProcessTerminationException)
terminate :: Process a
-- | Thrown by terminate
data ProcessTerminationException
ProcessTerminationException :: ProcessTerminationException
-- | SpawnRef are used to return pids of spawned processes
data SpawnRef
-- | Our own process ID
getSelfPid :: Process ProcessId
-- | Get the node ID of our local node
getSelfNode :: Process NodeId
-- | Link to a remote process (asynchronous)
--
-- Note that link provides unidirectional linking (see
-- spawnSupervised). Linking makes no distinction between normal
-- and abnormal termination of the remote process.
link :: ProcessId -> Process ()
-- | Link to a node
linkNode :: NodeId -> Process ()
-- | Link to a channel (send port)
linkPort :: SendPort a -> Process ()
-- | Remove a link (synchronous)
unlink :: ProcessId -> Process ()
-- | Remove a node link (synchronous)
unlinkNode :: NodeId -> Process ()
-- | Remove a channel (send port) link (synchronous)
unlinkPort :: SendPort a -> Process ()
-- | Monitor another process (asynchronous)
monitor :: ProcessId -> Process MonitorRef
-- | Monitor a node
monitorNode :: NodeId -> Process MonitorRef
-- | Monitor a typed channel
monitorPort :: Serializable a => SendPort a -> Process MonitorRef
-- | Remove a monitor (synchronous)
unmonitor :: MonitorRef -> Process ()
-- | Exceptions thrown when a linked process dies
data ProcessLinkException
ProcessLinkException :: ProcessId -> DiedReason -> ProcessLinkException
-- | Exception thrown when a linked node dies
data NodeLinkException
NodeLinkException :: NodeId -> DiedReason -> NodeLinkException
-- | Exception thrown when a linked channel (port) dies
data PortLinkException
PortLinkException :: SendPortId -> DiedReason -> PortLinkException
-- | MonitorRef is opaque for regular Cloud Haskell processes
data MonitorRef
-- | Message sent by process monitors
data ProcessMonitorNotification
ProcessMonitorNotification :: MonitorRef -> ProcessId -> DiedReason -> ProcessMonitorNotification
-- | Message sent by node monitors
data NodeMonitorNotification
NodeMonitorNotification :: MonitorRef -> NodeId -> DiedReason -> NodeMonitorNotification
-- | Message sent by channel (port) monitors
data PortMonitorNotification
PortMonitorNotification :: MonitorRef -> SendPortId -> DiedReason -> PortMonitorNotification
-- | Why did a process die?
data DiedReason
-- | Normal termination
DiedNormal :: DiedReason
-- | The process exited with an exception (provided as String
-- because Exception does not implement Binary)
DiedException :: String -> DiedReason
-- | We got disconnected from the process node
DiedDisconnect :: DiedReason
-- | The process node died
DiedNodeDown :: DiedReason
-- | Invalid (processnodechannel) identifier
DiedUnknownId :: DiedReason
-- | A closure is a static value and an encoded environment
data Closure a
-- | A static value is one that is bound at top-level.
data Static a
-- | Deserialize a closure
unClosure :: Typeable a => Closure a -> Process a
-- | Used to fake static (see paper)
data RemoteTable
-- | Log a string
--
-- say message sends a message (time, pid of the current
-- process, message) to the process registered as logger. By
-- default, this process simply sends the string to stderr.
-- Individual Cloud Haskell backends might replace this with a different
-- logger process, however.
say :: String -> Process ()
-- | Register a process with the local registry (asynchronous).
--
-- The process to be registered does not have to be local itself.
register :: String -> ProcessId -> Process ()
-- | Remove a process from the local registry (asynchronous).
unregister :: String -> Process ()
-- | Query the local process registry (synchronous).
whereis :: String -> Process (Maybe ProcessId)
-- | Named send to a process in the local registry (asynchronous)
nsend :: Serializable a => String -> a -> Process ()
-- | Register a process with a remote registry (asynchronous).
--
-- The process to be registered does not have to live on the same remote
-- node.
registerRemote :: NodeId -> String -> ProcessId -> Process ()
-- | Remove a process from a remote registry (asynchronous).
unregisterRemote :: NodeId -> String -> Process ()
-- | Query a remote process registry (synchronous)
whereisRemote :: NodeId -> String -> Process (Maybe ProcessId)
-- | Query a remote process registry (asynchronous)
--
-- Reply will come in the form of a WhereIsReply message
whereisRemoteAsync :: NodeId -> String -> Process ()
-- | Named send to a process in a remote registry (asynchronous)
nsendRemote :: Serializable a => NodeId -> String -> a -> Process ()
-- | (Asynchronous) reply from whereis
data WhereIsReply
WhereIsReply :: String -> (Maybe ProcessId) -> WhereIsReply
-- | Catch exceptions within a process
catch :: Exception e => Process a -> (e -> Process a) -> Process a
-- | Like expect but with a timeout
expectTimeout :: Serializable a => Int -> Process (Maybe a)
-- | Asynchronous version of spawn
--
-- (spawn is defined in terms of spawnAsync and
-- expect)
spawnAsync :: NodeId -> Closure (Process ()) -> Process SpawnRef
-- | Spawn a child process, have the child link to the parent and the
-- parent monitor the child
spawnSupervised :: NodeId -> Closure (Process ()) -> Process (ProcessId, MonitorRef)
-- | Spawn a process and link to it
--
-- Note that this is just the sequential composition of spawn and
-- link. (The Unified semantics that underlies Cloud
-- Haskell does not even support a synchronous link operation)
spawnLink :: NodeId -> Closure (Process ()) -> Process ProcessId
-- | Like spawnLink, but monitor the spawned process
spawnMonitor :: NodeId -> Closure (Process ()) -> Process (ProcessId, MonitorRef)
-- | (Asynchronius) reply from spawn
data DidSpawn
DidSpawn :: SpawnRef -> ProcessId -> DidSpawn