{-# LANGUAGE CPP #-} {-# LANGUAGE RankNTypes #-} {-# LANGUAGE DeriveDataTypeable #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE ExistentialQuantification #-} {-# LANGUAGE BangPatterns #-} -- | Cloud Haskell primitives -- -- We define these in a separate module so that we don't have to rely on -- the closure combinators module Control.Distributed.Process.Internal.Primitives ( -- * Basic messaging send , expect -- * Channels , newChan , sendChan , receiveChan , mergePortsBiased , mergePortsRR -- * Unsafe messaging variants , unsafeSend , unsafeSendChan , unsafeNSend -- * Advanced messaging , Match , receiveWait , receiveTimeout , match , matchIf , matchUnknown , matchAny , matchAnyIf , matchChan , matchSTM , matchMessage , matchMessageIf , isEncoded , wrapMessage , unsafeWrapMessage , unwrapMessage , handleMessage , handleMessageIf , handleMessage_ , handleMessageIf_ , forward , delegate , relay , proxy -- * Process management , terminate , ProcessTerminationException(..) , die , kill , exit , catchExit , catchesExit -- keep the exception constructor hidden, so that handling exit -- reasons /must/ take place via the 'catchExit' family of primitives , ProcessExitException() , getSelfPid , getSelfNode , ProcessInfo(..) , getProcessInfo , NodeStats(..) , getNodeStats , getLocalNodeStats -- * Monitoring and linking , link , unlink , monitor , unmonitor , withMonitor -- * Logging , say -- * Registry , register , reregister , unregister , whereis , nsend , registerRemoteAsync , reregisterRemoteAsync , unregisterRemoteAsync , whereisRemoteAsync , nsendRemote -- * Closures , unClosure , unStatic -- * Exception handling , catch , Handler(..) , catches , try , mask , onException , bracket , bracket_ , finally -- * Auxiliary API , expectTimeout , receiveChanTimeout , spawnAsync , linkNode , linkPort , unlinkNode , unlinkPort , monitorNode , monitorPort -- * Reconnecting , reconnect , reconnectPort -- * Internal Exports , sendCtrlMsg ) where #if ! MIN_VERSION_base(4,6,0) import Prelude hiding (catch) #endif import Data.Binary (decode) import Data.Time.Clock (getCurrentTime) import Data.Time.Format (formatTime) import System.Locale (defaultTimeLocale) import System.Timeout (timeout) import Control.Monad (when) import Control.Monad.Reader (ask) import Control.Monad.IO.Class (MonadIO, liftIO) import Control.Applicative ((<$>)) import Control.Exception (Exception(..), throw, throwIO, SomeException) import qualified Control.Exception as Ex (catch, mask, try) import Control.Distributed.Process.Internal.StrictMVar ( StrictMVar , modifyMVar , modifyMVar_ ) import Control.Concurrent.STM ( STM , TVar , atomically , orElse , newTVar , readTVar , writeTVar ) import Control.Distributed.Process.Internal.CQueue ( dequeue , BlockSpec(..) , MatchOn(..) ) import Control.Distributed.Process.Serializable (Serializable, fingerprint) import Data.Accessor ((^.), (^:), (^=)) import Control.Distributed.Static ( Static , Closure ) import Data.Rank1Typeable (Typeable) import qualified Control.Distributed.Static as Static (unstatic, unclosure) import qualified Control.Distributed.Process.UnsafePrimitives as Unsafe import Control.Distributed.Process.Internal.Types ( NodeId(..) , ProcessId(..) , LocalNode(..) , LocalProcess(..) , Process(..) , Message(..) , MonitorRef(..) , SpawnRef(..) , ProcessSignal(..) , NodeMonitorNotification(..) , monitorCounter , spawnCounter , SendPort(..) , ReceivePort(..) , channelCounter , typedChannelWithId , TypedChannel(..) , SendPortId(..) , Identifier(..) , ProcessExitException(..) , DiedReason(..) , DidUnmonitor(..) , DidUnlinkProcess(..) , DidUnlinkNode(..) , DidUnlinkPort(..) , WhereIsReply(..) , RegisterReply(..) , ProcessRegistrationException(..) , ProcessInfo(..) , ProcessInfoNone(..) , NodeStats(..) , isEncoded , createMessage , createUnencodedMessage , runLocalProcess , ImplicitReconnect( NoImplicitReconnect) , LocalProcessState , LocalSendPortId , messageToPayload ) import Control.Distributed.Process.Internal.Messaging ( sendMessage , sendBinary , sendPayload , disconnect , sendCtrlMsg ) import Control.Distributed.Process.Management.Internal.Types ( MxEvent(..) ) import Control.Distributed.Process.Management.Internal.Trace.Types ( traceEvent ) import Control.Distributed.Process.Internal.WeakTQueue ( newTQueueIO , readTQueue , mkWeakTQueue ) import Unsafe.Coerce -------------------------------------------------------------------------------- -- Basic messaging -- -------------------------------------------------------------------------------- -- | Send a message send :: Serializable a => ProcessId -> a -> Process () -- This requires a lookup on every send. If we want to avoid that we need to -- modify serializable to allow for stateful (IO) deserialization. send them msg = do proc <- ask let us = processId proc node = processNode proc nodeId = localNodeId node destNode = (processNodeId them) in do case destNode == nodeId of True -> sendLocal them msg False -> liftIO $ sendMessage (processNode proc) (ProcessIdentifier (processId proc)) (ProcessIdentifier them) NoImplicitReconnect msg -- We do not fire the trace event until after the sending is complete; -- In the remote case, 'sendMessage' can block in the networking stack. liftIO $ traceEvent (localEventBus node) (MxSent them us (createUnencodedMessage msg)) -- | /Unsafe/ variant of 'send'. This function makes /no/ attempt to serialize -- and (in the case when the destination process resides on the same local -- node) therefore ensure that the payload is fully evaluated before it is -- delivered. unsafeSend :: Serializable a => ProcessId -> a -> Process () unsafeSend = Unsafe.send -- | Wait for a message of a specific type expect :: forall a. Serializable a => Process a expect = receiveWait [match return] -------------------------------------------------------------------------------- -- Channels -- -------------------------------------------------------------------------------- -- | Create a new typed channel newChan :: Serializable a => Process (SendPort a, ReceivePort a) newChan = do proc <- ask liftIO . modifyMVar (processState proc) $ \st -> do let lcid = st ^. channelCounter let cid = SendPortId { sendPortProcessId = processId proc , sendPortLocalId = lcid } let sport = SendPort cid chan <- liftIO newTQueueIO chan' <- mkWeakTQueue chan $ finalizer (processState proc) lcid let rport = ReceivePort $ readTQueue chan let tch = TypedChannel chan' return ( (channelCounter ^: (+ 1)) . (typedChannelWithId lcid ^= Just tch) $ st , (sport, rport) ) where finalizer :: StrictMVar LocalProcessState -> LocalSendPortId -> IO () finalizer st lcid = modifyMVar_ st $ return . (typedChannelWithId lcid ^= Nothing) -- | Send a message on a typed channel sendChan :: Serializable a => SendPort a -> a -> Process () sendChan (SendPort cid) msg = do proc <- ask let node = localNodeId (processNode proc) destNode = processNodeId (sendPortProcessId cid) in do case destNode == node of True -> sendChanLocal cid msg False -> do liftIO $ sendBinary (processNode proc) (ProcessIdentifier (processId proc)) (SendPortIdentifier cid) NoImplicitReconnect msg -- | Send a message on a typed channel. This function makes /no/ attempt to -- serialize and (in the case when the @ReceivePort@ resides on the same local -- node) therefore ensure that the payload is fully evaluated before it is -- delivered. unsafeSendChan :: Serializable a => SendPort a -> a -> Process () unsafeSendChan = Unsafe.sendChan -- | Wait for a message on a typed channel receiveChan :: Serializable a => ReceivePort a -> Process a receiveChan = liftIO . atomically . receiveSTM -- | Like 'receiveChan' but with a timeout. If the timeout is 0, do a -- non-blocking check for a message. receiveChanTimeout :: Serializable a => Int -> ReceivePort a -> Process (Maybe a) receiveChanTimeout 0 ch = liftIO . atomically $ (Just <$> receiveSTM ch) `orElse` return Nothing receiveChanTimeout n ch = liftIO . timeout n . atomically $ receiveSTM ch -- | Merge a list of typed channels. -- -- The result port is left-biased: if there are messages available on more -- than one port, the first available message is returned. mergePortsBiased :: Serializable a => [ReceivePort a] -> Process (ReceivePort a) mergePortsBiased = return . ReceivePort. foldr1 orElse . map receiveSTM -- | Like 'mergePortsBiased', but with a round-robin scheduler (rather than -- left-biased) mergePortsRR :: Serializable a => [ReceivePort a] -> Process (ReceivePort a) mergePortsRR = \ps -> do psVar <- liftIO . atomically $ newTVar (map receiveSTM ps) return $ ReceivePort (rr psVar) where rotate :: [a] -> [a] rotate [] = [] rotate (x:xs) = xs ++ [x] rr :: TVar [STM a] -> STM a rr psVar = do ps <- readTVar psVar a <- foldr1 orElse ps writeTVar psVar (rotate ps) return a -------------------------------------------------------------------------------- -- Advanced messaging -- -------------------------------------------------------------------------------- -- | Opaque type used in 'receiveWait' and 'receiveTimeout' newtype Match b = Match { unMatch :: MatchOn Message (Process b) } -- | Test the matches in order against each message in the queue receiveWait :: [Match b] -> Process b receiveWait ms = do queue <- processQueue <$> ask Just proc <- liftIO $ dequeue queue Blocking (map unMatch ms) proc -- | Like 'receiveWait' but with a timeout. -- -- If the timeout is zero do a non-blocking check for matching messages. A -- non-zero timeout is applied only when waiting for incoming messages (that is, -- /after/ we have checked the messages that are already in the mailbox). receiveTimeout :: Int -> [Match b] -> Process (Maybe b) receiveTimeout t ms = do queue <- processQueue <$> ask let blockSpec = if t == 0 then NonBlocking else Timeout t mProc <- liftIO $ dequeue queue blockSpec (map unMatch ms) case mProc of Nothing -> return Nothing Just proc -> Just <$> proc -- | Match on a typed channel matchChan :: ReceivePort a -> (a -> Process b) -> Match b matchChan p fn = Match $ MatchChan (fmap fn (receiveSTM p)) -- | Match on an arbitrary STM action. -- -- This rather unusaul /match primitive/ allows us to compose arbitrary STM -- actions with checks against our process' mailbox and/or any typed channel -- @ReceivePort@s we may hold. -- -- This allows us to process multiple input streams /along with our mailbox/, -- in just the same way that 'matchChan' supports checking both the mailbox -- /and/ an arbitrary set of typed channels in one atomic transaction. -- -- Note there are no ordering guarnatees with respect to these disparate input -- sources. -- matchSTM :: STM a -> (a -> Process b) -> Match b matchSTM stm fn = Match $ MatchChan (fmap fn stm) -- | Match against any message of the right type match :: forall a b. Serializable a => (a -> Process b) -> Match b match = matchIf (const True) -- | Match against any message of the right type that satisfies a predicate matchIf :: forall a b. Serializable a => (a -> Bool) -> (a -> Process b) -> Match b matchIf c p = Match $ MatchMsg $ \msg -> case messageFingerprint msg == fingerprint (undefined :: a) of False -> Nothing True -> case msg of (UnencodedMessage _ m) -> let m' = unsafeCoerce m :: a in case (c m') of True -> Just (p m') False -> Nothing (EncodedMessage _ _) -> if (c decoded) then Just (p decoded) else Nothing where decoded :: a -- Make sure the value is fully decoded so that we don't hang to -- bytestrings when the process calling 'matchIf' doesn't process -- the values immediately !decoded = decode (messageEncoding msg) -- | Match against any message, regardless of the underlying (contained) type matchMessage :: (Message -> Process Message) -> Match Message matchMessage p = Match $ MatchMsg $ \msg -> Just (p msg) -- | Match against any message (regardless of underlying type) that satisfies a predicate matchMessageIf :: (Message -> Bool) -> (Message -> Process Message) -> Match Message matchMessageIf c p = Match $ MatchMsg $ \msg -> case (c msg) of True -> Just (p msg) False -> Nothing -- | Forward a raw 'Message' to the given 'ProcessId'. forward :: Message -> ProcessId -> Process () forward msg them = do proc <- ask let node = processNode proc us = processId proc nid = localNodeId node destNode = (processNodeId them) in do case destNode == nid of True -> sendCtrlMsg Nothing (LocalSend them msg) False -> liftIO $ sendPayload (processNode proc) (ProcessIdentifier (processId proc)) (ProcessIdentifier them) NoImplicitReconnect (messageToPayload msg) -- We do not fire the trace event until after the sending is complete; -- In the remote case, 'sendMessage' can block in the networking stack. liftIO $ traceEvent (localEventBus node) (MxSent them us msg) -- | Wrap a 'Serializable' value in a 'Message'. Note that 'Message's are -- 'Serializable' - like the datum they contain - but also note, deserialising -- such a 'Message' will yield a 'Message', not the type within it! To obtain the -- wrapped datum, use 'unwrapMessage' or 'handleMessage' with a specific type. -- -- @ -- do -- self <- getSelfPid -- send self (wrapMessage "blah") -- Nothing <- expectTimeout 1000000 :: Process (Maybe String) -- (Just m) <- expectTimeout 1000000 :: Process (Maybe Message) -- (Just "blah") <- unwrapMessage m :: Process (Maybe String) -- @ -- wrapMessage :: Serializable a => a -> Message wrapMessage = createUnencodedMessage -- see [note Serializable UnencodedMessage] -- | This is the /unsafe/ variant of 'wrapMessage'. See -- "Control.Distributed.Process.UnsafePrimitives" for details. unsafeWrapMessage :: Serializable a => a -> Message unsafeWrapMessage = Unsafe.wrapMessage -- [note Serializable UnencodedMessage] -- if we attempt to serialise an UnencodedMessage, it will be converted to an -- EncodedMessage first, so we're ok here. See 'messageToPayload' in -- Primitives.hs for the gory details -- | Attempt to unwrap a raw 'Message'. -- If the type of the decoded message payload matches the expected type, the -- value will be returned with @Just@, otherwise @Nothing@ indicates the types -- do not match. -- -- This expression, for example, will evaluate to @Nothing@ -- > unwrapMessage (wrapMessage "foobar") :: Process (Maybe Int) -- -- Whereas this expression, will yield @Just "foo"@ -- > unwrapMessage (wrapMessage "foo") :: Process (Maybe String) -- unwrapMessage :: forall m a. (Monad m, Serializable a) => Message -> m (Maybe a) unwrapMessage msg = case messageFingerprint msg == fingerprint (undefined :: a) of False -> return Nothing :: m (Maybe a) True -> case msg of (UnencodedMessage _ ms) -> let ms' = unsafeCoerce ms :: a in return (Just ms') (EncodedMessage _ _) -> return (Just (decoded)) where decoded :: a -- note [decoding] !decoded = decode (messageEncoding msg) -- | Attempt to handle a raw 'Message'. -- If the type of the message matches the type of the first argument to -- the supplied expression, then the message will be decoded and the expression -- evaluated against its value. If this runtime type checking fails however, -- @Nothing@ will be returned to indicate the fact. If the check succeeds and -- evaluation proceeds, the resulting value with be wrapped with @Just@. -- -- Intended for use in `catchesExit` and `matchAny` primitives. -- handleMessage :: forall m a b. (Monad m, Serializable a) => Message -> (a -> m b) -> m (Maybe b) handleMessage msg proc = handleMessageIf msg (const True) proc -- | Conditionally handle a raw 'Message'. -- If the predicate @(a -> Bool)@ evaluates to @True@, invokes the supplied -- handler, other returns @Nothing@ to indicate failure. See 'handleMessage' -- for further information about runtime type checking. -- handleMessageIf :: forall m a b . (Monad m, Serializable a) => Message -> (a -> Bool) -> (a -> m b) -> m (Maybe b) handleMessageIf msg c proc = do case messageFingerprint msg == fingerprint (undefined :: a) of False -> return Nothing :: m (Maybe b) True -> case msg of (UnencodedMessage _ ms) -> let ms' = unsafeCoerce ms :: a in case (c ms') of True -> do { r <- proc ms'; return (Just r) } False -> return Nothing :: m (Maybe b) (EncodedMessage _ _) -> case (c decoded) of True -> do { r <- proc (decoded :: a); return (Just r) } False -> return Nothing :: m (Maybe b) where decoded :: a -- note [decoding] !decoded = decode (messageEncoding msg) -- | As 'handleMessage' but ignores result, which is useful if you don't -- care whether or not the handler succeeded. -- handleMessage_ :: forall m a . (Monad m, Serializable a) => Message -> (a -> m ()) -> m () handleMessage_ msg proc = handleMessageIf_ msg (const True) proc -- | Conditional version of 'handleMessage_'. -- handleMessageIf_ :: forall m a . (Monad m, Serializable a) => Message -> (a -> Bool) -> (a -> m ()) -> m () handleMessageIf_ msg c proc = handleMessageIf msg c proc >> return () -- | Match against an arbitrary message. 'matchAny' removes the first available -- message from the process mailbox. To handle arbitrary /raw/ messages once -- removed from the mailbox, see 'handleMessage' and 'unwrapMessage'. -- matchAny :: forall b. (Message -> Process b) -> Match b matchAny p = Match $ MatchMsg $ \msg -> Just (p msg) -- | Match against an arbitrary message. Intended for use with 'handleMessage' -- and 'unwrapMessage', this function /only/ removes a message from the process -- mailbox, /if/ the supplied condition matches. The success (or failure) of -- runtime type checks deferred to @handleMessage@ and friends is irrelevant -- here, i.e., if the condition evaluates to @True@ then the message will -- be removed from the process mailbox and decoded, but that does /not/ -- guarantee that an expression passed to @handleMessage@ will pass the -- runtime type checks and therefore be evaluated. -- matchAnyIf :: forall a b. (Serializable a) => (a -> Bool) -> (Message -> Process b) -> Match b matchAnyIf c p = Match $ MatchMsg $ \msg -> case messageFingerprint msg == fingerprint (undefined :: a) of True | check -> Just (p msg) where check :: Bool !check = case msg of (EncodedMessage _ _) -> c decoded (UnencodedMessage _ m') -> c (unsafeCoerce m') decoded :: a -- note [decoding] !decoded = decode (messageEncoding msg) _ -> Nothing {- note [decoding] For an EncodedMessage, we need to ensure the value is fully decoded so that we don't hang to bytestrings if the calling process doesn't evaluate immediately. For UnencodedMessage we know (because the fingerprint comparison succeeds) that unsafeCoerce will not fail. -} -- | Remove any message from the queue matchUnknown :: Process b -> Match b matchUnknown p = Match $ MatchMsg (const (Just p)) -- | Receives messages and forwards them to @pid@ if @p msg == True@. delegate :: ProcessId -> (Message -> Bool) -> Process () delegate pid p = do receiveWait [ matchAny (\m -> case (p m) of True -> forward m pid False -> return ()) ] delegate pid p -- | A straight relay that forwards all messages to the supplied pid. relay :: ProcessId -> Process () relay !pid = receiveWait [ matchAny (\m -> forward m pid) ] >> relay pid -- | Proxies @pid@ and forwards messages whenever @proc@ evaluates to @True@. -- Unlike 'delegate' the predicate @proc@ runs in the 'Process' monad, allowing -- for richer proxy behaviour. If @proc@ returns @False@ or the /runtime type check/ -- fails, no action is taken and the proxy process will continue running. proxy :: Serializable a => ProcessId -> (a -> Process Bool) -> Process () proxy pid proc = do receiveWait [ matchAny (\m -> do next <- handleMessage m proc case next of Just True -> forward m pid Just False -> return () -- explicitly ignored Nothing -> return ()) -- un-routable ] proxy pid proc -------------------------------------------------------------------------------- -- Process management -- -------------------------------------------------------------------------------- -- | Thrown by 'terminate' data ProcessTerminationException = ProcessTerminationException deriving (Show, Typeable) instance Exception ProcessTerminationException -- | Terminate immediately (throws a ProcessTerminationException) terminate :: Process a terminate = liftIO $ throwIO ProcessTerminationException -- [Issue #110] -- | Die immediately - throws a 'ProcessExitException' with the given @reason@. die :: Serializable a => a -> Process b die reason = do -- NOTE: We do /not/ want to use UnencodedMessage here, as the exception -- could be decoded by a handler passed to 'catchExit', re-thrown or even -- passed to another process via the tracing mechanism. pid <- getSelfPid liftIO $ throwIO (ProcessExitException pid (createMessage reason)) -- | Forceful request to kill a process. Where 'exit' provides an exception -- that can be caught and handled, 'kill' throws an unexposed exception type -- which cannot be handled explicitly (by type). kill :: ProcessId -> String -> Process () -- NOTE: We send the message to our local node controller, which will then -- forward it to a remote node controller (if applicable). Sending it directly -- to a remote node controller means that that the message may overtake a -- 'monitor' or 'link' request. kill them reason = sendCtrlMsg Nothing (Kill them reason) -- | Graceful request to exit a process. Throws 'ProcessExitException' with the -- supplied @reason@ encoded as a message. Any /exit signal/ raised in this -- manner can be handled using the 'catchExit' family of functions. exit :: Serializable a => ProcessId -> a -> Process () -- NOTE: We send the message to our local node controller, which will then -- forward it to a remote node controller (if applicable). Sending it directly -- to a remote node controller means that that the message may overtake a -- 'monitor' or 'link' request. exit them reason = sendCtrlMsg Nothing (Exit them (createMessage reason)) -- | Catches 'ProcessExitException'. The handler will not be applied unless its -- type matches the encoded data stored in the exception (see the /reason/ -- argument given to the 'exit' primitive). If the handler cannot be applied, -- the exception will be re-thrown. -- -- To handle 'ProcessExitException' without regard for /reason/, see 'catch'. -- To handle multiple /reasons/ of differing types, see 'catchesExit'. catchExit :: forall a b . (Show a, Serializable a) => Process b -> (ProcessId -> a -> Process b) -> Process b catchExit act exitHandler = catch act handleExit where handleExit ex@(ProcessExitException from msg) = if messageFingerprint msg == fingerprint (undefined :: a) then exitHandler from decoded else liftIO $ throwIO ex where decoded :: a -- Make sure the value is fully decoded so that we don't hang to -- bytestrings if the caller doesn't use the value immediately !decoded = decode (messageEncoding msg) -- | Lift 'Control.Exception.catches' (almost). -- -- As 'ProcessExitException' stores the exit @reason@ as a typed, encoded -- message, a handler must accept inputs of the expected type. In order to -- handle a list of potentially different handlers (and therefore input types), -- a handler passed to 'catchesExit' must accept 'Message' and return -- @Maybe@ (i.e., @Just p@ if it handled the exit reason, otherwise @Nothing@). -- -- See 'maybeHandleMessage' and 'Message' for more details. catchesExit :: Process b -> [(ProcessId -> Message -> (Process (Maybe b)))] -> Process b catchesExit act handlers = catch act ((flip handleExit) handlers) where handleExit :: ProcessExitException -> [(ProcessId -> Message -> Process (Maybe b))] -> Process b handleExit ex [] = liftIO $ throwIO ex handleExit ex@(ProcessExitException from msg) (h:hs) = do r <- h from msg case r of Nothing -> handleExit ex hs Just p -> return p -- | Our own process ID getSelfPid :: Process ProcessId getSelfPid = processId <$> ask -- | Get the node ID of our local node getSelfNode :: Process NodeId getSelfNode = localNodeId . processNode <$> ask -- | Get statistics about the specified node getNodeStats :: NodeId -> Process (Either DiedReason NodeStats) getNodeStats nid = do selfNode <- getSelfNode if nid == selfNode then Right `fmap` getLocalNodeStats -- optimisation else getNodeStatsRemote nid where getNodeStatsRemote :: NodeId -> Process (Either DiedReason NodeStats) getNodeStatsRemote nid = do sendCtrlMsg (Just nid) $ GetNodeStats nid bracket (monitorNode nid) unmonitor $ \mRef -> receiveWait [ match (\(stats :: NodeStats) -> return $ Right stats) , matchIf (\(NodeMonitorNotification ref _ _) -> ref == mRef) (\(NodeMonitorNotification _ _ dr) -> return $ Left dr) ] -- | Get statistics about our local node getLocalNodeStats :: Process NodeStats getLocalNodeStats = do self <- getSelfNode sendCtrlMsg Nothing $ GetNodeStats self receiveWait [ match (\(stats :: NodeStats) -> return stats) ] -- | Get information about the specified process getProcessInfo :: ProcessId -> Process (Maybe ProcessInfo) getProcessInfo pid = let them = processNodeId pid in do us <- getSelfNode dest <- mkNode them us sendCtrlMsg dest $ GetInfo pid receiveWait [ match (\(p :: ProcessInfo) -> return $ Just p) , match (\(_ :: ProcessInfoNone) -> return Nothing) ] where mkNode :: NodeId -> NodeId -> Process (Maybe NodeId) mkNode them us = case them == us of True -> return Nothing _ -> return $ Just them -------------------------------------------------------------------------------- -- Monitoring and linking -- -------------------------------------------------------------------------------- -- | Link to a remote process (asynchronous) -- -- When process A links to process B (that is, process A calls -- @link pidB@) then an asynchronous exception will be thrown to process A -- when process B terminates (normally or abnormally), or when process A gets -- disconnected from process B. Although it is /technically/ possible to catch -- these exceptions, chances are if you find yourself trying to do so you should -- probably be using 'monitor' rather than 'link'. In particular, code such as -- -- > link pidB -- Link to process B -- > expect -- Wait for a message from process B -- > unlink pidB -- Unlink again -- -- doesn't quite do what one might expect: if process B sends a message to -- process A, and /subsequently terminates/, then process A might or might not -- be terminated too, depending on whether the exception is thrown before or -- after the 'unlink' (i.e., this code has a race condition). -- -- Linking is all-or-nothing: A is either linked to B, or it's not. A second -- call to 'link' has no effect. -- -- Note that 'link' provides unidirectional linking (see 'spawnSupervised'). -- Linking makes no distinction between normal and abnormal termination of -- the remote process. link :: ProcessId -> Process () link = sendCtrlMsg Nothing . Link . ProcessIdentifier -- | Monitor another process (asynchronous) -- -- When process A monitors process B (that is, process A calls -- @monitor pidB@) then process A will receive a 'ProcessMonitorNotification' -- when process B terminates (normally or abnormally), or when process A gets -- disconnected from process B. You receive this message like any other (using -- 'expect'); the notification includes a reason ('DiedNormal', 'DiedException', -- 'DiedDisconnect', etc.). -- -- Every call to 'monitor' returns a new monitor reference 'MonitorRef'; if -- multiple monitors are set up, multiple notifications will be delivered -- and monitors can be disabled individually using 'unmonitor'. monitor :: ProcessId -> Process MonitorRef monitor = monitor' . ProcessIdentifier -- | Establishes temporary monitoring of another process. -- -- @withMonitor pid code@ sets up monitoring of @pid@ for the duration -- of @code@. Note: although monitoring is no longer active when -- @withMonitor@ returns, there might still be unreceived monitor -- messages in the queue. -- withMonitor :: ProcessId -> Process a -> Process a withMonitor pid code = bracket (monitor pid) unmonitor (\_ -> code) -- unmonitor blocks waiting for the response, so there's a possibility -- that an exception might interrupt withMonitor before the unmonitor -- has completed. I think that's better than making the unmonitor -- uninterruptible. -- | Remove a link -- -- This is synchronous in the sense that once it returns you are guaranteed -- that no exception will be raised if the remote process dies. However, it is -- asynchronous in the sense that we do not wait for a response from the remote -- node. unlink :: ProcessId -> Process () unlink pid = do unlinkAsync pid receiveWait [ matchIf (\(DidUnlinkProcess pid') -> pid' == pid) (\_ -> return ()) ] -- | Remove a node link -- -- This has the same synchronous/asynchronous nature as 'unlink'. unlinkNode :: NodeId -> Process () unlinkNode nid = do unlinkNodeAsync nid receiveWait [ matchIf (\(DidUnlinkNode nid') -> nid' == nid) (\_ -> return ()) ] -- | Remove a channel (send port) link -- -- This has the same synchronous/asynchronous nature as 'unlink'. unlinkPort :: SendPort a -> Process () unlinkPort sport = do unlinkPortAsync sport receiveWait [ matchIf (\(DidUnlinkPort cid) -> cid == sendPortId sport) (\_ -> return ()) ] -- | Remove a monitor -- -- This has the same synchronous/asynchronous nature as 'unlink'. unmonitor :: MonitorRef -> Process () unmonitor ref = do unmonitorAsync ref receiveWait [ matchIf (\(DidUnmonitor ref') -> ref' == ref) (\_ -> return ()) ] -------------------------------------------------------------------------------- -- Exception handling -- -------------------------------------------------------------------------------- -- | Lift 'Control.Exception.catch' catch :: Exception e => Process a -> (e -> Process a) -> Process a catch p h = do lproc <- ask liftIO $ Ex.catch (runLocalProcess lproc p) (runLocalProcess lproc . h) -- | Lift 'Control.Exception.try' try :: Exception e => Process a -> Process (Either e a) try p = do lproc <- ask liftIO $ Ex.try (runLocalProcess lproc p) -- | Lift 'Control.Exception.mask' mask :: ((forall a. Process a -> Process a) -> Process b) -> Process b mask p = do lproc <- ask liftIO $ Ex.mask $ \restore -> runLocalProcess lproc (p (liftRestore restore)) where liftRestore :: (forall a. IO a -> IO a) -> (forall a. Process a -> Process a) liftRestore restoreIO = \p2 -> do ourLocalProc <- ask liftIO $ restoreIO $ runLocalProcess ourLocalProc p2 -- | Lift 'Control.Exception.onException' onException :: Process a -> Process b -> Process a onException p what = p `catch` \e -> do _ <- what liftIO $ throwIO (e :: SomeException) -- | Lift 'Control.Exception.bracket' bracket :: Process a -> (a -> Process b) -> (a -> Process c) -> Process c bracket before after thing = mask $ \restore -> do a <- before r <- restore (thing a) `onException` after a _ <- after a return r -- | Lift 'Control.Exception.bracket_' bracket_ :: Process a -> Process b -> Process c -> Process c bracket_ before after thing = bracket before (const after) (const thing) -- | Lift 'Control.Exception.finally' finally :: Process a -> Process b -> Process a finally a sequel = bracket_ (return ()) sequel a -- | You need this when using 'catches' data Handler a = forall e . Exception e => Handler (e -> Process a) instance Functor Handler where fmap f (Handler h) = Handler (fmap f . h) -- | Lift 'Control.Exception.catches' catches :: Process a -> [Handler a] -> Process a catches proc handlers = proc `catch` catchesHandler handlers catchesHandler :: [Handler a] -> SomeException -> Process a catchesHandler handlers e = foldr tryHandler (throw e) handlers where tryHandler (Handler handler) res = case fromException e of Just e' -> handler e' Nothing -> res -------------------------------------------------------------------------------- -- Auxiliary API -- -------------------------------------------------------------------------------- -- | Like 'expect' but with a timeout expectTimeout :: forall a. Serializable a => Int -> Process (Maybe a) expectTimeout n = receiveTimeout n [match return] -- | Asynchronous version of 'spawn' -- -- ('spawn' is defined in terms of 'spawnAsync' and 'expect') spawnAsync :: NodeId -> Closure (Process ()) -> Process SpawnRef spawnAsync nid proc = do spawnRef <- getSpawnRef sendCtrlMsg (Just nid) $ Spawn proc spawnRef return spawnRef -- | Monitor a node (asynchronous) monitorNode :: NodeId -> Process MonitorRef monitorNode = monitor' . NodeIdentifier -- | Monitor a typed channel (asynchronous) monitorPort :: forall a. Serializable a => SendPort a -> Process MonitorRef monitorPort (SendPort cid) = monitor' (SendPortIdentifier cid) -- | Remove a monitor (asynchronous) unmonitorAsync :: MonitorRef -> Process () unmonitorAsync = sendCtrlMsg Nothing . Unmonitor -- | Link to a node (asynchronous) linkNode :: NodeId -> Process () linkNode = link' . NodeIdentifier -- | Link to a channel (asynchronous) linkPort :: SendPort a -> Process () linkPort (SendPort cid) = link' (SendPortIdentifier cid) -- | Remove a process link (asynchronous) unlinkAsync :: ProcessId -> Process () unlinkAsync = sendCtrlMsg Nothing . Unlink . ProcessIdentifier -- | Remove a node link (asynchronous) unlinkNodeAsync :: NodeId -> Process () unlinkNodeAsync = sendCtrlMsg Nothing . Unlink . NodeIdentifier -- | Remove a channel (send port) link (asynchronous) unlinkPortAsync :: SendPort a -> Process () unlinkPortAsync (SendPort cid) = sendCtrlMsg Nothing . Unlink $ SendPortIdentifier cid -------------------------------------------------------------------------------- -- Logging -- -------------------------------------------------------------------------------- -- | Log a string -- -- @say message@ sends a message (time, pid of the current process, message) -- to the process registered as 'logger'. By default, this process simply -- sends the string to 'stderr'. Individual Cloud Haskell backends might -- replace this with a different logger process, however. say :: String -> Process () say string = do now <- liftIO getCurrentTime us <- getSelfPid nsend "logger" (formatTime defaultTimeLocale "%c" now, us, string) -------------------------------------------------------------------------------- -- Registry -- -------------------------------------------------------------------------------- -- | Register a process with the local registry (asynchronous). -- This version will wait until a response is gotten from the -- management process. The name must not already be registered. -- The process need not be on this node. -- A bad registration will result in a 'ProcessRegistrationException' -- -- The process to be registered does not have to be local itself. register :: String -> ProcessId -> Process () register = registerImpl False -- | Like 'register', but will replace an existing registration. -- The name must already be registered. reregister :: String -> ProcessId -> Process () reregister = registerImpl True registerImpl :: Bool -> String -> ProcessId -> Process () registerImpl force label pid = do mynid <- getSelfNode sendCtrlMsg Nothing (Register label mynid (Just pid) force) receiveWait [ matchIf (\(RegisterReply label' _) -> label == label') (\(RegisterReply _ ok) -> handleRegistrationReply label ok) ] -- | Register a process with a remote registry (asynchronous). -- -- The process to be registered does not have to live on the same remote node. -- Reply wil come in the form of a 'RegisterReply' message -- -- See comments in 'whereisRemoteAsync' registerRemoteAsync :: NodeId -> String -> ProcessId -> Process () registerRemoteAsync nid label pid = sendCtrlMsg (Just nid) (Register label nid (Just pid) False) reregisterRemoteAsync :: NodeId -> String -> ProcessId -> Process () reregisterRemoteAsync nid label pid = sendCtrlMsg (Just nid) (Register label nid (Just pid) True) -- | Remove a process from the local registry (asynchronous). -- This version will wait until a response is gotten from the -- management process. The name must already be registered. unregister :: String -> Process () unregister label = do mynid <- getSelfNode sendCtrlMsg Nothing (Register label mynid Nothing False) receiveWait [ matchIf (\(RegisterReply label' _) -> label == label') (\(RegisterReply _ ok) -> handleRegistrationReply label ok) ] -- | Deal with the result from an attempted registration or unregistration -- by throwing an exception if necessary handleRegistrationReply :: String -> Bool -> Process () handleRegistrationReply label ok = when (not ok) $ liftIO $ throwIO $ ProcessRegistrationException label -- | Remove a process from a remote registry (asynchronous). -- -- Reply wil come in the form of a 'RegisterReply' message -- -- See comments in 'whereisRemoteAsync' unregisterRemoteAsync :: NodeId -> String -> Process () unregisterRemoteAsync nid label = sendCtrlMsg (Just nid) (Register label nid Nothing False) -- | Query the local process registry whereis :: String -> Process (Maybe ProcessId) whereis label = do sendCtrlMsg Nothing (WhereIs label) receiveWait [ matchIf (\(WhereIsReply label' _) -> label == label') (\(WhereIsReply _ mPid) -> return mPid) ] -- | Query a remote process registry (asynchronous) -- -- Reply will come in the form of a 'WhereIsReply' message. -- -- There is currently no synchronous version of 'whereisRemoteAsync': if -- you implement one yourself, be sure to take into account that the remote -- node might die or get disconnect before it can respond (i.e. you should -- use 'monitorNode' and take appropriate action when you receive a -- 'NodeMonitorNotification'). whereisRemoteAsync :: NodeId -> String -> Process () whereisRemoteAsync nid label = sendCtrlMsg (Just nid) (WhereIs label) -- | Named send to a process in the local registry (asynchronous) nsend :: Serializable a => String -> a -> Process () nsend label msg = sendCtrlMsg Nothing (NamedSend label (createUnencodedMessage msg)) -- | Named send to a process in the local registry (asynchronous). -- This function makes /no/ attempt to serialize and (in the case when the -- destination process resides on the same local node) therefore ensure that -- the payload is fully evaluated before it is delivered. unsafeNSend :: Serializable a => String -> a -> Process () unsafeNSend = Unsafe.nsend -- | Named send to a process in a remote registry (asynchronous) nsendRemote :: Serializable a => NodeId -> String -> a -> Process () nsendRemote nid label msg = sendCtrlMsg (Just nid) (NamedSend label (createMessage msg)) -------------------------------------------------------------------------------- -- Closures -- -------------------------------------------------------------------------------- -- | Resolve a static value unStatic :: Typeable a => Static a -> Process a unStatic static = do rtable <- remoteTable . processNode <$> ask case Static.unstatic rtable static of Left err -> fail $ "Could not resolve static value: " ++ err Right x -> return x -- | Resolve a closure unClosure :: Typeable a => Closure a -> Process a unClosure closure = do rtable <- remoteTable . processNode <$> ask case Static.unclosure rtable closure of Left err -> fail $ "Could not resolve closure: " ++ err Right x -> return x -------------------------------------------------------------------------------- -- Reconnecting -- -------------------------------------------------------------------------------- -- | Cloud Haskell provides the illusion of connection-less, reliable, ordered -- message passing. However, when network connections get disrupted this -- illusion cannot always be maintained. Once a network connection breaks (even -- temporarily) no further communication on that connection will be possible. -- For example, if process A sends a message to process B, and A is then -- notified (by monitor notification) that it got disconnected from B, A will -- not be able to send any further messages to B, /unless/ A explicitly -- indicates that it is acceptable to attempt to reconnect to B using the -- Cloud Haskell 'reconnect' primitive. -- -- Importantly, when A calls 'reconnect' it acknowledges that some messages to -- B might have been lost. For instance, if A sends messages m1 and m2 to B, -- then receives a monitor notification that its connection to B has been lost, -- calls 'reconnect' and then sends m3, it is possible that B will receive m1 -- and m3 but not m2. -- -- Note that 'reconnect' does not mean /reconnect now/ but rather /it is okay -- to attempt to reconnect on the next send/. In particular, if no further -- communication attempts are made to B then A can use reconnect to clean up -- its connection to B. reconnect :: ProcessId -> Process () reconnect them = do us <- getSelfPid node <- processNode <$> ask liftIO $ disconnect node (ProcessIdentifier us) (ProcessIdentifier them) -- | Reconnect to a sendport. See 'reconnect' for more information. reconnectPort :: SendPort a -> Process () reconnectPort them = do us <- getSelfPid node <- processNode <$> ask liftIO $ disconnect node (ProcessIdentifier us) (SendPortIdentifier (sendPortId them)) -------------------------------------------------------------------------------- -- Auxiliary functions -- -------------------------------------------------------------------------------- sendLocal :: (Serializable a) => ProcessId -> a -> Process () sendLocal to msg = sendCtrlMsg Nothing $ LocalSend to (createUnencodedMessage msg) sendChanLocal :: (Serializable a) => SendPortId -> a -> Process () sendChanLocal spId msg = -- we *must* fully serialize/encode the message here, because -- attempting to use `unsafeCoerce' in the node controller -- won't work since we know nothing about the required type sendCtrlMsg Nothing $ LocalPortSend spId (createUnencodedMessage msg) getMonitorRefFor :: Identifier -> Process MonitorRef getMonitorRefFor ident = do proc <- ask liftIO $ modifyMVar (processState proc) $ \st -> do let counter = st ^. monitorCounter return ( monitorCounter ^: (+ 1) $ st , MonitorRef ident counter ) getSpawnRef :: Process SpawnRef getSpawnRef = do proc <- ask liftIO $ modifyMVar (processState proc) $ \st -> do let counter = st ^. spawnCounter return ( spawnCounter ^: (+ 1) $ st , SpawnRef counter ) -- | Monitor a process/node/channel monitor' :: Identifier -> Process MonitorRef monitor' ident = do monitorRef <- getMonitorRefFor ident sendCtrlMsg Nothing $ Monitor monitorRef return monitorRef -- | Link to a process/node/channel link' :: Identifier -> Process () link' = sendCtrlMsg Nothing . Link