{-# LANGUAGE DeriveDataTypeable #-} -- | This module is the core of Cloud Haskell. It provides -- processes, messages, monitoring, and configuration. module Remote.Process ( -- * The Process monad ProcessM, NodeId,ProcessId, PeerInfo, nullPid,getSelfPid,getSelfNode,isPidLocal, -- * Message receiving expect, MatchM,receive,receiveWait,receiveTimeout, match,matchIf,matchCond,matchUnknown,matchUnknownThrow,matchProcessDown, -- * Message sending send,sendQuiet, -- * Logging functions logS,say, LogSphere,LogLevel(..),LogTarget(..),LogFilter(..),LogConfig(..), setLogConfig,getLogConfig,setNodeLogConfig,setRemoteNodeLogConfig,defaultLogConfig, -- * Exception handling ptry,ptimeout,pbracket,pfinally, UnknownMessageException(..),ServiceException(..), TransmitException(..),TransmitStatus(..), -- * Process naming nameSet, nameQuery, nameQueryOrStart, -- * Process spawning and monitoring spawnLocal,spawnLocalAnd,forkProcess,spawn,spawnAnd,spawnLink,unpause, AmSpawnOptions(..),defaultSpawnOptions,MonitorAction(..),SignalReason(..), ProcessMonitorException(..),linkProcess,monitorProcess,unmonitorProcess,withMonitor,pingNode, callRemote,callRemotePure,callRemoteIO, terminate, -- * Config file readConfig,emptyConfig,Config(..),getConfig, getCfgArgs, -- * Initialization initNode,roleDispatch,setDaemonic, waitForThreads,performFinalization,forkAndListenAndDeliver,runLocalProcess, -- * Closures makeClosure,invokeClosure,evaluateClosure, -- * Debugging aids getQueueLength,nodeFromPid,localFromPid,hostFromNid, -- * Various internals, not for general use PortId,LocalProcessId, localRegistryHello,localRegistryRegisterNode, localRegistryQueryNodes,localRegistryUnregisterNode, sendSimple,makeNodeFromHost,getNewMessageLocal, getProcess,Message,Process,prNodeRef, roundtripResponse,roundtripResponseAsync,roundtripQuery,roundtripQueryMulti, makePayloadClosure,getLookup,diffTime,roundtripQueryImpl,roundtripQueryUnsafe, PayloadDisposition(..),suppressTransmitException,Node,getMessagePayload,getMessageType, -- * System service processes, not for general use startSpawnerService, startLoggingService, startProcessMonitorService, startLocalRegistry, startFinalizerService, startNodeMonitorService, startProcessRegistryService, standaloneLocalRegistry ) where import qualified Prelude as Prelude import Prelude hiding (catch, id, init, last, lookup, pi) import Control.Concurrent (forkIO,ThreadId,threadDelay) import Control.Concurrent.MVar (MVar,newMVar, newEmptyMVar,takeMVar,putMVar,modifyMVar,modifyMVar_,readMVar) import Control.Exception (ErrorCall(..),throwTo,bracket,try,Exception,throw,evaluate,finally,SomeException,catch) import Control.Monad (foldM,when,liftM,forever) import Control.Monad.Trans (MonadIO,liftIO) import Data.Binary (Binary,put,get,putWord8,getWord8) import Data.Char (isSpace,isDigit) import Data.List (isSuffixOf,foldl', isPrefixOf) import Data.Maybe (catMaybes,isNothing) import Data.Typeable (Typeable) import Data.Data (Data) import Data.Unique (newUnique,hashUnique) import System.IO (Handle,hClose,hSetBuffering,hGetChar,hPutChar,BufferMode(..),hFlush) import System.IO.Error (isEOFError,isDoesNotExistError,isUserError) import Network.BSD (getHostName) import Network (HostName,PortID(..),listenOn,accept,sClose,connectTo) import Network.Socket (setSocketOption,SocketOption(..),socketPort,aNY_PORT ) import qualified Data.Map as Map (Map,keys,fromList,unionWith,elems,singleton,member,update,empty,adjust,alter,insert,delete,lookup,toList,size,insertWith') import Remote.Reg (getEntryByIdent,Lookup,empty) import Remote.Encoding (serialEncode,serialDecode,serialEncodePure,serialDecodePure,dynamicEncodePure,dynamicDecodePure,DynamicPayload,Payload,Serializable,hPutPayload,hGetPayload,getPayloadType,getDynamicPayloadType) import System.Environment (getArgs) import qualified System.Timeout (timeout) import Data.Time (toModifiedJulianDay,Day(..),picosecondsToDiffTime,getCurrentTime,diffUTCTime,UTCTime(..),utcToLocalZonedTime) import Remote.Closure (Closure (..)) import Control.Concurrent.STM (STM,atomically,retry,orElse) import Control.Concurrent.STM.TChan (TChan,isEmptyTChan,readTChan,newTChanIO,writeTChan) import Control.Concurrent.Chan (newChan,readChan,writeChan) import Control.Concurrent.STM.TVar (TVar,newTVarIO,readTVar,writeTVar) import Control.Concurrent.QSem (QSem,newQSem,waitQSem,signalQSem) import Data.IORef (IORef,newIORef,readIORef,writeIORef) import Data.Fixed (Pico) ---------------------------------------------- -- * Process monad ---------------------------------------------- type PortId = Int type LocalProcessId = Int -- | The Config structure encapsulates the user-settable configuration options for each node. -- This settings are usually read in from a configuration file or from the executable's -- command line; in either case, see 'Remote.Init.remoteInit' and 'readConfig' data Config = Config { cfgRole :: !String, -- ^ The user-assigned role of this node determines what its initial behavior is and how it presents itself to its peers. Default to NODE cfgHostName :: !HostName, -- ^ The hostname, used as a basis for creating the name of the node. If unspecified, the OS will be queried. Since the hostname is part of the nodename, the computer must be accessible to other nodes using this name. cfgListenPort :: !PortId, -- ^ The TCP port on which to listen to for new connections. If unassigned or 0, the OS will assign a free port. cfgLocalRegistryListenPort :: !PortId, -- ^ The TCP port on which to communicate with the local node registry, or to start the local node registry if it isn't already running. This defaults to 38813 and shouldn't be changed unless you have prohibitive firewall rules cfgPeerDiscoveryPort :: !PortId, -- ^ The UDP port on which local peer discovery broadcasts are sent. Defaults to 38813, and only matters if you rely on dynamic peer discovery cfgNetworkMagic :: !String, -- ^ The unique identifying string for this network or application. Must not contain spaces. The uniqueness of this string ensures that multiple applications running on the same physical network won't accidentally communicate with each other. All nodes of your application should have the same network magic. Defaults to MAGIC cfgKnownHosts :: ![String], -- ^ A list of hosts where nodes may be running. When 'Remote.Peer.getPeers' or 'Remote.Peer.getPeerStatic' is called, each host on this list will be queried for its nodes. Only matters if you rely on static peer discovery. cfgRoundtripTimeout :: !Int, -- ^ Microseconds to wait for a response from a system service on a remote node. If your network has high latency or congestion, you may need to increase this to avoid incorrect reports of node inaccessibility. 0 to wait indefinitely (not recommended). cfgMaxOutgoing :: !Int, -- ^ A limit on the number of simultaneous outgoing connections per node cfgPromiseFlushDelay :: !Int, -- ^ Time in microseconds before an in-memory promise is flushed to disk. 0 to disable disk flush entirely. cfgPromisePrefix :: !String, -- ^ Prepended to the filename of flushed promises. cfgArgs :: [String] -- ^ Command-line arguments that are not part of the node configuration are placed here and can be examined by your application -- logConfig :: LogConfig } deriving (Show) type ProcessTable = Map.Map LocalProcessId ProcessTableEntry type AdminProcessTable = Map.Map ServiceId LocalProcessId data Node = Node { -- ndNodeId :: NodeId ndProcessTable :: ProcessTable, ndAdminProcessTable :: AdminProcessTable, -- connectionTable :: Map.Map HostName Handle -- outgoing connections only -- ndHostEntries :: [HostEntry], ndHostName :: HostName, ndListenPort :: PortId, ndConfig :: Config, ndLookup :: Lookup, ndLogConfig :: LogConfig, ndNodeFinalizer :: MVar (), ndNodeFinalized :: MVar (), ndOutgoing :: QSem, ndNextProcessId :: LocalProcessId --conncetion table -- each one is MVared -- also contains time info about last contact } -- | Identifies a node somewhere on the network. These -- can be queried from 'getPeers'. See also 'getSelfNode' data NodeId = NodeId !HostName !PortId deriving (Typeable,Eq,Ord,Data) -- | Identifies a process somewhere on the network. These -- are produced by the 'spawn' family of functions and -- consumed by 'send'. When a process ends, its process ID -- ceases to be valid. See also 'getSelfPid' data ProcessId = ProcessId !NodeId !LocalProcessId deriving (Typeable,Eq,Ord,Data) instance Binary NodeId where put (NodeId h p) = put h >> put p get = get >>= \h -> get >>= \p -> return (NodeId h p) instance Show NodeId where show (NodeId hostname portid) = concat ["nid://",hostname,":",show portid,"/"] instance Read NodeId where readsPrec _ s = if isPrefixOf "nid://" s then let a = drop 6 s hostname = takeWhile ((/=) ':') a b = drop (length hostname+1) a port= takeWhile ((/=) '/') b c = drop (length port+1) b result = NodeId hostname (read port) in if (not.null) hostname && (not.null) port then [(result,c)] else error "Bad parse looking for NodeId" else error "Bad parse looking for NodeId" instance Binary ProcessId where put (ProcessId n p) = put n >> put p get = get >>= \n -> get >>= \p -> return (ProcessId n p) instance Show ProcessId where show (ProcessId (NodeId hostname portid) localprocessid) = concat ["pid://",hostname,":",show portid,"/",show localprocessid,"/"] instance Read ProcessId where readsPrec _ s = if isPrefixOf "pid://" s then let a = drop 6 s hostname = takeWhile ((/=) ':') a b = drop (length hostname+1) a port= takeWhile ((/=) '/') b c = drop (length port+1) b pid = takeWhile ((/=) '/') c d = drop (length pid+1) c result = ProcessId (NodeId hostname (read port)) (read pid) in if (not.null) hostname && (not.null) pid && (not.null) port then [(result,d)] else error "Bad parse looking for ProcessId" else error "Bad parse looking for ProcessId" data PayloadDisposition = PldUser | PldAdmin deriving (Typeable,Read,Show,Eq) data Message = EncodedMessage { msgEDisposition :: PayloadDisposition, msgEHeader :: Maybe Payload, msgEPayload :: !Payload } | DynamicMessage { msgDDisposition :: PayloadDisposition, msgDHeader :: Maybe DynamicPayload, msgDPayload :: DynamicPayload } makeMessage :: (Serializable msg, Serializable hdr) => Bool -> PayloadDisposition -> msg -> Maybe hdr -> Message makeMessage True pld msg hdr = DynamicMessage {msgDDisposition=pld,msgDHeader=maybe Nothing (Just . dynamicEncodePure) hdr, msgDPayload=dynamicEncodePure msg} makeMessage False pld msg hdr = EncodedMessage {msgEDisposition=pld,msgEHeader=maybe Nothing (Just . serialEncodePure) hdr,msgEPayload=serialEncodePure msg} getMessageType :: Message -> String getMessageType (EncodedMessage _ _ a) = getPayloadType a getMessageType (DynamicMessage _ _ a) = getDynamicPayloadType a getMessagePayload :: (Serializable a) => Message -> Maybe a getMessagePayload (EncodedMessage _ _ a) = serialDecodePure a getMessagePayload (DynamicMessage _ _ a) = dynamicDecodePure a {- UNUSED messageHasHeader :: Message -> Bool messageHasHeader (EncodedMessage _ (Just _) _) = True messageHasHeader (DynamicMessage _ (Just _) _) = True messageHasHeader _ = False -} getMessageHeader :: (Serializable a) => Message -> Maybe a getMessageHeader (EncodedMessage _ a _) = maybe Nothing serialDecodePure a getMessageHeader (DynamicMessage _ a _) = maybe Nothing dynamicDecodePure a getMessageDisposition :: Message -> PayloadDisposition getMessageDisposition (EncodedMessage a _ _) = a getMessageDisposition (DynamicMessage a _ _) = a data ProcessTableEntry = ProcessTableEntry { pteThread :: ThreadId, pteChannel :: TChan Message, pteDeathT :: TVar Bool, pteDeath :: MVar (), pteDaemonic :: Bool } data ProcessState = ProcessState { prQueue :: Queue Message } data Process = Process { prPid :: ProcessId, prSelf :: LocalProcessId, prNodeRef :: MVar Node, prThread :: ThreadId, prChannel :: TChan Message, prState :: TVar ProcessState, prPool :: IORef (Map.Map NodeId Handle), prLogConfig :: Maybe LogConfig } -- | The monad ProcessM is the core of the process layer. Functions -- in the ProcessM monad may participate in messaging and create -- additional concurrent processes. You can create -- a ProcessM context from an 'IO' context with the 'remoteInit' function. data ProcessM a = ProcessM {runProcessM :: Process -> IO (Process,a)} deriving Typeable instance Monad ProcessM where m >>= k = ProcessM $ (\p -> (runProcessM m) p >>= (\(news,newa) -> runProcessM (k newa) news)) return x = ProcessM $ \s -> return (s,x) instance Functor ProcessM where fmap f v = ProcessM $ (\p -> (runProcessM v) p >>= (\x -> return $ fmap f x)) instance MonadIO ProcessM where liftIO arg = ProcessM $ \pr -> (arg >>= (\x -> return (pr,x))) getProcess :: ProcessM (Process) getProcess = ProcessM $ \x -> return (x,x) -- | Returns command-line arguments provided to the -- executable, excluding any command line arguments -- that were processed by the framework. getCfgArgs :: ProcessM [String] getCfgArgs = do cfg <- getConfig return $ cfgArgs cfg getConfig :: ProcessM (Config) getConfig = do p <- getProcess node <- liftIO $ readMVar (prNodeRef p) return $ ndConfig node getConfigI :: MVar Node -> IO Config getConfigI mnode = do node <- readMVar mnode return $ ndConfig node getLookup :: ProcessM (Lookup) getLookup = do p <- getProcess node <- liftIO $ readMVar (prNodeRef p) return $ ndLookup node putProcess :: Process -> ProcessM () putProcess p = ProcessM $ \_ -> return (p,()) ---------------------------------------------- -- * Message pattern matching ---------------------------------------------- -- | This monad provides the state and structure for -- matching received messages from the incoming message queue. -- It's the interface between the 'receive' family of functions, -- and the 'match' family, which together can express which -- messages can be accepted. data MatchM q a = MatchM { runMatchM :: MatchBlock -> STM ((MatchBlock,Maybe (ProcessM q)),a) } instance Monad (MatchM q) where m >>= k = MatchM $ \mbi -> do (mb,a) <- runMatchM m mbi (mb',a2) <- runMatchM (k a) (fst mb) return (mb',a2) return x = MatchM $ \mb -> return $ ((mb,Nothing),x) -- fail _ = MatchM $ \_ -> return (False,Nothing) returnHalt :: a -> ProcessM q -> MatchM q a returnHalt x invoker = MatchM $ \mb -> return $ ((mb,Just (invoker)),x) liftSTM :: STM a -> MatchM q a liftSTM arg = MatchM $ \mb -> do a <- arg return ((mb,Nothing),a) data MatchBlock = MatchBlock { mbMessage :: Message } getMatch :: MatchM q MatchBlock getMatch = MatchM $ \x -> return ((x,Nothing), x) getNewMessage :: Process -> STM Message getNewMessage p = readTChan $ prChannel p getNewMessageLocal :: Node -> LocalProcessId -> STM (Maybe Message) getNewMessageLocal node lpid = do mpte <- getProcessTableEntry (node) lpid case mpte of Just pte -> do msg <- readTChan (pteChannel pte) return $ Just msg Nothing -> return Nothing getQueueLength :: ProcessM Int getQueueLength = do p <- getProcess liftIO $ atomically $ do q <- getCurrentMessages p return $ length q getCurrentMessages :: Process -> STM [Message] getCurrentMessages p = do msgs <- cleanChannel (prChannel p) [] ps <- readTVar (prState p) let q = (prQueue ps) let newq = queueInsertMulti q msgs writeTVar (prState p) ps {prQueue = newq} return $ queueToList newq where cleanChannel c m = do isEmpty <- isEmptyTChan c if isEmpty then return m else do item <- readTChan c cleanChannel c (item:m) matchMessage :: [MatchM q ()] -> Message -> STM (Maybe (ProcessM q)) matchMessage matchers msg = do (_mb,r) <- (foldl orElse (retry) (map executor matchers)) `orElse` (return (theMatchBlock,Nothing)) return r where executor x = do (ok@(_mb,matchfound),_) <- runMatchM x theMatchBlock case matchfound of Nothing -> retry _ -> return ok theMatchBlock = MatchBlock {mbMessage = msg} matchMessages :: [MatchM q ()] -> [(Message,STM ())] -> STM (Maybe (ProcessM q)) matchMessages matchers msgs = (foldl orElse (retry) (map executor msgs)) `orElse` (return Nothing) where executor (msg,acceptor) = do res <- matchMessage matchers msg case res of Nothing -> retry Just pmq -> acceptor >> return (Just pmq) -- | Examines the message queue of the current process, matching each message against each of the -- provided message pattern clauses (typically provided by a function from the 'match' family). If -- a message matches, the corresponding handler is invoked and its result is returned. If no -- message matches, Nothing is returned. receive :: [MatchM q ()] -> ProcessM (Maybe q) receive m = do p <- getProcess res <- convertErrorCall $ liftIO $ atomically $ do msgs <- getCurrentMessages p matchMessages m (messageHandlerGenerator (prState p) msgs) case res of Nothing -> return Nothing Just n -> do q <- n return $ Just q -- | A simple way to receive messages. -- This will return the first message received -- of the specified type; if no such message -- is available, the function will block. -- Unlike the 'receive' family of functions, -- this function does not allow the notion -- of choice in message extraction. expect :: (Serializable a) => ProcessM a expect = receiveWait [match return] -- | Examines the message queue of the current process, matching each message against each of the -- provided message pattern clauses (typically provided by a function from the 'match' family). If -- a message matches, the corresponding handler is invoked and its result is returned. If no -- message matches, the function blocks until a matching message is received. receiveWait :: [MatchM q ()] -> ProcessM q receiveWait m = do f <- receiveWaitImpl m f -- | Examines the message queue of the current process, matching each message against each of the -- provided message pattern clauses (typically provided by a function from the 'match' family). If -- a message matches, the corresponding handler is invoked and its result is returned. If no -- message matches, the function blocks until a matching message is received, or until the -- specified time in microseconds has elapsed, at which point it will return Nothing. -- If the specified time is 0, this function is equivalent to 'receive'. receiveTimeout :: Int -> [MatchM q ()] -> ProcessM (Maybe q) receiveTimeout 0 m = receive m receiveTimeout to m | to > 0 = do res <- ptimeout to $ receiveWaitImpl m case res of Nothing -> return Nothing Just f -> do q <- f return $ Just q messageHandlerGenerator :: TVar ProcessState -> [Message] -> [(Message, STM ())] messageHandlerGenerator prSt msgs = map (\(m,q) -> (m,do ps <- readTVar prSt writeTVar prSt ps {prQueue = queueFromList q})) (exclusionList msgs) receiveWaitImpl :: [MatchM q ()] -> ProcessM (ProcessM q) receiveWaitImpl m = do p <- getProcess v <- attempt1 p attempt2 p v where attempt2 p v = case v of Just n -> return n Nothing -> do ret <- convertErrorCall $ liftIO $ atomically $ do msg <- getNewMessage p ps <- readTVar (prState p) let oldq = prQueue ps writeTVar (prState p) ps {prQueue = queueInsert oldq msg} matchMessages m [(msg,writeTVar (prState p) ps)] attempt2 p ret attempt1 p = convertErrorCall $ liftIO $ atomically $ do msgs <- getCurrentMessages p matchMessages m (messageHandlerGenerator (prState p) msgs) convertErrorCall :: ProcessM a -> ProcessM a convertErrorCall f = do a <- ptry ff case a of Right c -> return c Left b -> throw $ TransmitException $ QteOther $ show (b::ErrorCall) where ff = do q <- f q `seq` return q {- UNUSED matchDebug :: (Message -> ProcessM q) -> MatchM q () matchDebug f = do mb <- getMatch returnHalt () (f (mbMessage mb)) -} -- | A catch-all variant of 'match' that invokes user-provided code and -- will extact any message from the queue. This is useful for matching -- against messages that are not recognized. Since message matching patterns -- are evaluated in order, this function, if used, should be the last element -- in the list of matchers given to 'receiveWait' and similar functions. matchUnknown :: ProcessM q -> MatchM q () matchUnknown body = returnHalt () body -- | A variant of 'matchUnknown' that throws a 'UnknownMessageException' -- if the process receives a message that isn't extracted by another message matcher. -- Equivalent to: -- -- > matchUnknown (throw (UnknownMessageException "...")) matchUnknownThrow :: MatchM q () matchUnknownThrow = do mb <- getMatch returnHalt () (throw $ UnknownMessageException (getMessageType (mbMessage mb))) -- | Used to specify a message pattern in 'receiveWait' and related functions. -- Only messages containing data of type /a/, where /a/ is the argument to the user-provided -- function in the first parameter of 'match', will be removed from the queue, at which point -- the user-provided function will be invoked. match :: (Serializable a) => (a -> ProcessM q) -> MatchM q () match = matchCoreHeaderless (const True) -- | Similar to 'match', but allows for additional criteria to be checked prior to message acceptance. -- Here, the first user-provided function operates as a filter, and the message will be accepted -- only if it returns True. Once it's been accepted, the second user-defined function is invoked, -- as in 'match' matchIf :: (Serializable a) => (a -> Bool) -> (a -> ProcessM q) -> MatchM q () matchIf = matchCoreHeaderless matchCond :: (Serializable a) => (a -> Maybe (ProcessM q)) -> MatchM q () matchCond f = matchIf (not . isNothing . f) run where run a = case f a of Nothing -> throw $ TransmitException $ QteOther $ "Indecesive predicate in matchCond" Just q -> q matchCoreHeaderless :: (Serializable a) => (a -> Bool) -> (a -> ProcessM q) -> MatchM q () matchCoreHeaderless f g = matchCore (\(a,b) -> b==(Nothing::Maybe ()) && f a) (\(a,_) -> g a) matchCore :: (Serializable a,Serializable b) => ((a,Maybe b) -> Bool) -> ((a,Maybe b) -> ProcessM q) -> MatchM q () matchCore cond body = do mb <- getMatch doit mb where doit mb = let decodified = getMessagePayload (mbMessage mb) decodifiedh = getMessageHeader (mbMessage mb) in -- decodified `seq` decodifiedh `seq` TODO is this good or bad? case decodified of Just x -> if cond (x,decodifiedh) then returnHalt () (body (x,decodifiedh)) else liftSTM retry Nothing -> liftSTM retry ---------------------------------------------- -- * Exceptions and return values ---------------------------------------------- -- | Thrown in response to a bad configuration -- file or command line option, most likely -- in 'Config' data ConfigException = ConfigException String deriving (Show,Typeable) instance Exception ConfigException -- | Thrown by various network-related functions when -- communication with a host has failed data TransmitException = TransmitException TransmitStatus deriving (Show,Typeable) instance Exception TransmitException -- | Thrown by 'matchUnknownThrow' in response to a message -- of a wrong type being received by a process data UnknownMessageException = UnknownMessageException String deriving (Show,Typeable) instance Exception UnknownMessageException -- | Thrown by "Remote.Process" system services in response -- to some problem data ServiceException = ServiceException String deriving (Show,Typeable) instance Exception ServiceException -- | Used internally by 'terminate' to mark the orderly termination -- of a process data ProcessTerminationException = ProcessTerminationException deriving (Show,Typeable) instance Exception ProcessTerminationException data TransmitStatus = QteOK | QteUnknownPid | QteBadFormat | QteOther String | QtePleaseSendBody | QteBadNetworkMagic | QteNetworkError String | QteEncodingError String | QteDispositionFailed | QteLoggingError | QteConnectionTimeout | QteUnknownCommand | QteThrottle Int deriving (Show,Read,Typeable) instance Binary TransmitStatus where put QteOK = putWord8 0 put QteUnknownPid = putWord8 1 put QteBadFormat = putWord8 2 put (QteOther s) = putWord8 3 >> put s put QtePleaseSendBody = putWord8 4 put QteBadNetworkMagic = putWord8 5 put (QteNetworkError s) = putWord8 6 >> put s put (QteEncodingError s) = putWord8 7 >> put s put QteDispositionFailed = putWord8 8 put QteLoggingError = putWord8 9 put QteConnectionTimeout = putWord8 10 put QteUnknownCommand = putWord8 11 put (QteThrottle s) = putWord8 12 get = do ch <- getWord8 case ch of 0 -> return QteOK 1 -> return QteUnknownPid 2 -> return QteBadFormat 3 -> get >>= return . QteOther 4 -> return QtePleaseSendBody 5 -> return QteBadNetworkMagic 6 -> get >>= return . QteNetworkError 7 -> get >>= return . QteEncodingError 8 -> return QteDispositionFailed 9 -> return QteLoggingError 10 -> return QteConnectionTimeout 11 -> return QteUnknownCommand 12 -> get >>= return . QteThrottle ---------------------------------------------- -- * Node and process spawning ---------------------------------------------- -- | Creates a new 'Node' object, given the specified configuration (usually created by 'readConfig') and -- function metadata table (usually create by 'Remote.Call.registerCalls'). You probably want to use -- 'Remote.Init.remoteInit' instead of this lower-level function. initNode :: Config -> Lookup -> IO (MVar Node) initNode cfg lookup = do defaultHostName <- getHostName let newHostName = if null (cfgHostName cfg) then defaultHostName else cfgHostName cfg -- TODO it would be nice to check that is name actually makes sense, e.g. try to contact ourselves theNewHostName <- evaluate newHostName finalizer <- newEmptyMVar finalized <- newEmptyMVar outgoing <- newQSem (cfgMaxOutgoing cfg) mvar <- newMVar Node {ndHostName=theNewHostName, ndProcessTable=Map.empty, ndAdminProcessTable=Map.empty, ndConfig=cfg, ndLookup=lookup, ndListenPort=0, ndNextProcessId=0, ndNodeFinalizer=finalizer, ndNodeFinalized=finalized, ndLogConfig=defaultLogConfig, ndOutgoing=outgoing} return mvar -- | Given a Node (created by 'initNode'), start execution of user-provided code -- by invoking the given function with the node's 'cfgRole' string. roleDispatch :: MVar Node -> (String -> ProcessM ()) -> IO () roleDispatch mnode func = do cfg <- getConfigI mnode runLocalProcess mnode (func (cfgRole cfg)) >> return () -- | Start executing a process on the current node. This is a variation of 'spawnLocal' -- which accepts two blocks of user-defined code. The first block -- is the main body of the code to run concurrently. The second block is a "prefix" -- which is run in the new process, prior to the main body, but its completion -- is guaranteed before spawnAnd returns. Thus, the prefix code is useful for -- initializing the new process synchronously. spawnLocalAnd :: ProcessM () -> ProcessM () -> ProcessM ProcessId spawnLocalAnd fun prefix = do p <- getProcess v <- liftIO $ newEmptyMVar pid <- liftIO $ runLocalProcess (prNodeRef p) (myFun v) liftIO $ takeMVar v return pid where myFun mv = (prefix `pfinally` liftIO (putMVar mv ())) >> fun -- | A synonym for 'spawnLocal' forkProcess :: ProcessM () -> ProcessM ProcessId forkProcess = spawnLocal -- | Create a parallel process sharing the same message queue and PID. -- Not safe for export, as doing any message receive operation could -- result in a munged message queue. forkProcessWeak :: ProcessM () -> ProcessM () forkProcessWeak f = do p <- getProcess _res <- liftIO $ forkIO (runProcessM f p >> return ()) return () -- | Create a new process on the current node. Returns the new process's identifier. -- Unlike 'spawn', this function does not need a 'Closure' or a 'NodeId'. spawnLocal :: ProcessM () -> ProcessM ProcessId spawnLocal fun = do p <- getProcess liftIO $ runLocalProcess (prNodeRef p) fun runLocalProcess :: MVar Node -> ProcessM () -> IO ProcessId runLocalProcess node fun = do localprocessid <- modifyMVar node (\thenode -> return $ (thenode {ndNextProcessId=1+ndNextProcessId thenode}, ndNextProcessId thenode)) channel <- newTChanIO passProcess <- newEmptyMVar okay <- newEmptyMVar thread <- forkIO (runner okay node passProcess) thenodeid <- getNodeId node let pid = thenodeid `seq` ProcessId thenodeid localprocessid state <- newTVarIO $ ProcessState {prQueue = queueMake} pool <- newIORef Map.empty putMVar passProcess (mkProcess channel node localprocessid thread pid state pool) takeMVar okay return pid where notifyProcessDown p r = do nid <- getNodeId node let pp = adminGetPid nid ServiceProcessMonitor let msg = GlProcessDown (prPid p) r try $ sendBasic node pp (msg) (Nothing::Maybe ()) PldAdmin Nothing :: IO (Either SomeException TransmitStatus)--ignore result ok notifyProcessUp _p = return () closePool p = do c <- readIORef (prPool p) mapM hClose (Map.elems c) exceptionHandler e p = let shown = show e in notifyProcessDown (p) (SrException shown) >> (try (logI node (prPid p) "SYS" LoCritical (concat ["Process got unhandled exception ",shown]))::IO(Either SomeException ())) >> return () --ignore error exceptionCatcher p fun = do notifyProcessUp (p) res <- try (fun `catch` (\ProcessTerminationException -> return ())) case res of Left e -> exceptionHandler (e::SomeException) p Right a -> notifyProcessDown (p) SrNormal >> closePool p >> return a runner okay node passProcess = do p <- takeMVar passProcess let init = do death <- newEmptyMVar death2 <- newTVarIO False let pte = (mkProcessTableEntry (prChannel p) (prThread p) death death2 ) insertProcessTableEntry node (prSelf p) pte return pte let action = runProcessM fun p bracket (init) (\pte -> atomically (writeTVar (pteDeathT pte) True) >> putMVar (pteDeath pte) ()) (\_ -> putMVar okay () >> (exceptionCatcher p (action>>=return . snd)) >> return ()) insertProcessTableEntry node processid entry = modifyMVar_ node (\n -> return $ n {ndProcessTable = Map.insert processid entry (ndProcessTable n)} ) mkProcessTableEntry channel thread death death2 = ProcessTableEntry { pteChannel = channel, pteThread = thread, pteDeath = death, pteDeathT = death2, pteDaemonic = False } mkProcess channel noderef localprocessid thread pid state pool = Process { prPid = pid, prSelf = localprocessid, prChannel = channel, prNodeRef = noderef, prThread = thread, prState = state, prPool = pool, prLogConfig = Nothing } ---------------------------------------------- -- * Roundtrip conversations ---------------------------------------------- -- TODO this needs withMonitor a safe variant -- To make this work with a ptimeout but still be able to return martial results, -- they need to be stored in an MVar. Also, like roundtipQuery, this should have -- two variants: a flavor that establishes monitors, and a timeout-based flavor. -- This also needs an ASYNC variant, that will send data simultaneously, from multiple subprocesses -- and finally, it needs a POLY variant, of type Pld -> [(ProcessId,a)] -> ProcessM [Either TransmitStatus b] roundtripQueryMulti :: (Serializable a,Serializable b) => PayloadDisposition -> [ProcessId] -> a -> ProcessM [Either TransmitStatus b] roundtripQueryMulti pld pids dat = -- TODO timeout let convidsM = mapM (\_ -> liftIO $ newConversationId) pids in do convids <- convidsM sender <- getSelfPid let convopids = zip convids pids sending (convid,pid) = let hdr = RoundtripHeader {msgheaderConversationId = convid, msgheaderSender = sender, msgheaderDestination = pid} in do res <- sendTry pid dat (Just hdr) pld case res of QteOK -> return (convid,Nothing) n -> return (convid,Just (Left n)) res <- mapM sending convopids let receiving c = if (any isNothing (Map.elems c)) then do newmap <- receiveWait [matcher c] receiving newmap else return c matcher c = matchCore (\(_,h) -> case h of Just a -> (msgheaderConversationId a,msgheaderSender a) `elem` convopids Nothing -> False) (\(b,Just h) -> let newmap = Map.adjust (\_ -> (Just (Right b))) (msgheaderConversationId h) c in return (newmap) ) m <- receiving (Map.fromList res) return $ catMaybes (Map.elems m) generalPid :: ProcessId -> ProcessId generalPid (ProcessId n _p) = ProcessId n (-1) roundtripQuery :: (Serializable a, Serializable b) => PayloadDisposition -> ProcessId -> a -> ProcessM (Either TransmitStatus b) roundtripQuery pld pid dat = do res <- ptry $ withMonitor apid $ roundtripQueryImpl 0 pld pid dat Prelude.id [] case res of Left (ServiceException s) -> return $ Left $ QteOther s Right (Left a) -> return (Left a) Right (Right a) -> return (Right a) where apid = case pld of PldAdmin -> generalPid pid _ -> pid roundtripQueryLocal :: (Serializable a, Serializable b) => PayloadDisposition -> ProcessId -> a -> ProcessM (Either TransmitStatus b) roundtripQueryLocal pld pid dat = roundtripQueryImpl 0 pld pid dat Prelude.id [] roundtripQueryUnsafe :: (Serializable a, Serializable b) => PayloadDisposition -> ProcessId -> a -> ProcessM (Either TransmitStatus b) roundtripQueryUnsafe pld pid dat = do cfg <- getConfig roundtripQueryImpl (cfgRoundtripTimeout cfg) pld pid dat Prelude.id [] roundtripQueryImpl :: (Serializable a, Serializable b) => Int -> PayloadDisposition -> ProcessId -> a -> (b -> c) -> [MatchM (Either TransmitStatus c) ()] -> ProcessM (Either TransmitStatus c) roundtripQueryImpl time pld pid dat converter additional = do mmatcher <- roundtripQueryImplSub pld pid dat (\_ y -> (return . Right . converter) y) case mmatcher of Left err -> return $ Left err Right matcher -> receiver $ [ matcher (), matchProcessDown pid ((return . Left . QteNetworkError) "Remote partner unavailable"), matchProcessDown (generalPid pid) ((return . Left . QteNetworkError) "Remote partner unavailable")] ++ additional where receiver matchers = case time of 0 -> do receiveWait matchers n -> do res <- receiveTimeout n matchers case res of Nothing -> return (Left QteConnectionTimeout) Just a -> return a roundtripQueryImplSub :: (Serializable a, Serializable b) => PayloadDisposition -> ProcessId -> a -> (c -> b -> ProcessM q) -> ProcessM (Either TransmitStatus (c -> MatchM q ())) roundtripQueryImplSub pld pid dat act = do convId <- liftIO $ newConversationId sender <- getSelfPid res <- mysend pid dat (Just RoundtripHeader {msgheaderConversationId = convId,msgheaderSender = sender,msgheaderDestination = pid}) case res of QteOK -> return $ Right $ \c -> (matchCore (\(_,h) -> case h of Just a -> msgheaderConversationId a == convId && msgheaderSender a == pid Nothing -> False) (\(x,_) -> do vv <- act c x return $ vv)) err -> return (Left err) where mysend p d mh = sendTry p d mh pld roundtripResponse :: (Serializable a, Serializable b) => (a -> ProcessM (b,q)) -> MatchM q () roundtripResponse f = roundtripResponseAsync myf False where myf inp verf = do (resp,ret) <- f inp _ <- verf resp return ret roundtripResponseAsync :: (Serializable a, Serializable b) => (a -> (b -> ProcessM ()) -> ProcessM q) -> Bool -> MatchM q () roundtripResponseAsync f throwing = matchCore (\(_,h)->conditional h) transformer where conditional :: Maybe RoundtripHeader -> Bool conditional a = case a of Just _ -> True Nothing -> False transformer (m,Just h) = let sender b = do res <- sendTry (msgheaderSender h) b (Just RoundtripHeader {msgheaderSender=msgheaderDestination h,msgheaderDestination = msgheaderSender h,msgheaderConversationId=msgheaderConversationId h}) PldUser case res of QteOK -> return () _ -> if throwing then throw $ TransmitException res else logS "SYS" LoImportant ("roundtripResponse couldn't send response to "++show (msgheaderSender h)++" because "++show res) in f m sender data RoundtripHeader = RoundtripHeader { msgheaderConversationId :: Int, msgheaderSender :: ProcessId, msgheaderDestination :: ProcessId } deriving (Typeable) instance Binary RoundtripHeader where put (RoundtripHeader a b c) = put a >> put b >> put c get = get >>= \a -> get >>= \b -> get >>= \c -> return $ RoundtripHeader a b c ---------------------------------------------- -- * Message sending ---------------------------------------------- -- | Sends a message to the given process. If the -- process isn't running or can't be accessed, -- this function will throw a 'TransmitException'. -- The message must implement the 'Serializable' interface. send :: (Serializable a) => ProcessId -> a -> ProcessM () send pid msg = sendSimple pid msg PldUser >>= (\x -> case x of QteOK -> return () _ -> throw $ TransmitException x ) -- | Like 'send', but in case of error returns a value rather than throw -- an exception. sendQuiet :: (Serializable a) => ProcessId -> a -> ProcessM TransmitStatus sendQuiet p m = sendSimple p m PldUser sendSimple :: (Serializable a) => ProcessId -> a -> PayloadDisposition -> ProcessM TransmitStatus sendSimple pid dat pld = sendTry pid dat (Nothing :: Maybe ()) pld sendTry :: (Serializable a,Serializable b) => ProcessId -> a -> Maybe b -> PayloadDisposition -> ProcessM TransmitStatus sendTry pid msg msghdr pld = getProcess >>= (\_p -> let timeoutFilter a = do cfg <- getConfig -- TODO This is problematic. We should give up on transmitting after a certain period -- of time if the remote host is just not responsive, but delays in message delivery, -- even to legitimate peers, are unpredictable. Timeouts are currently disabled -- but should be enabled if I can think of a good way to set them. This would -- also be a good place to put automatic retries or to limit the number -- of outgoing connections per node. //!! q <- case 0 of -- case cfgRoundtripTimeout cfg of 0 -> a n -> do ff <- ptimeout n $ a case ff of Nothing -> return QteConnectionTimeout Just r -> return r case q of QteThrottle n -> liftIO (threadDelay n) >> timeoutFilter a n -> return n tryAction = ptry action >>= (\x -> case x of Left l -> return $ QteEncodingError $ show (l::ErrorCall) -- catch errors from encoding Right r -> return r) where action = do p <- getProcess timeoutFilter $ liftIO $ sendBasic (prNodeRef p) pid msg msghdr pld (Just $ prPool p) in tryAction ) sendBasic :: (Serializable a,Serializable b) => MVar Node -> ProcessId -> a -> Maybe b -> PayloadDisposition -> Maybe (IORef (Map.Map NodeId Handle)) -> IO TransmitStatus sendBasic mnode pid msg msghdr pld pool = do nid <- getNodeId mnode let islocal = nodeFromPid pid == nid -- TODO It's important that the semantics of messaging are preserved -- regardles of the location of particular processes. Messages are -- fully evaluated when sent to a remote node as part of the serialization -- but they don't need to be when sending to a local process; the message -- is just plopped into a TChan, basically. Unfortunately, this means -- that exceptions associated with the serialization of data will be -- handled by the receiving process, rather than the sender, which -- it may not be prepared for. An easy way to crash a system process, -- then, is to send it a message contained undefined or divide by zero. -- To prevent this, we now serialize all messages, even those bound for -- local destinations. -- FORMERLY: let themsg = makeMessage islocal pld msg msghdr let themsg = makeMessage False pld msg msghdr (if islocal then sendRawLocal else sendRawRemote) mnode pid nid themsg pool sendRawLocal :: MVar Node -> ProcessId -> NodeId -> Message -> Maybe (IORef (Map.Map NodeId Handle)) -> IO TransmitStatus sendRawLocal noderef thepid _nodeid msg _ | thepid == nullPid = return QteUnknownPid | otherwise = do cfg <- getConfigI noderef messageHandler cfg noderef (getMessageDisposition msg) msg (cfgNetworkMagic cfg) (localFromPid thepid) sendRawRemote :: MVar Node -> ProcessId -> NodeId -> Message -> Maybe (IORef (Map.Map NodeId Handle)) -> IO TransmitStatus sendRawRemote noderef thepid nodeid msg (Just pool) = do apool <- readIORef pool ppool <- if Map.size apool > 10 -- trim pool then mapM hClose (Map.elems apool) >> return Map.empty else return apool let finded = (Map.lookup thenode ppool) (ret,h) <- sendRawRemoteImpl noderef thepid nodeid msg finded case ret of QteOK -> cleanup h ppool _ -> case finded of Nothing -> cleanup h ppool _ -> do (_ret2,newh) <- sendRawRemoteImpl noderef thepid nodeid msg Nothing cleanup newh ppool return ret where thenode = (nodeFromPid thepid) cleanup h ppool = case h of Just hh -> writeIORef pool (Map.insert thenode hh ppool) Nothing -> writeIORef pool (Map.delete thenode ppool) sendRawRemote noderef thepid nodeid msg Nothing = liftM fst $ sendRawRemoteImpl noderef thepid nodeid msg Nothing sendRawRemoteImpl :: MVar Node -> ProcessId -> NodeId -> Message -> Maybe Handle -> IO (TransmitStatus,Maybe Handle) sendRawRemoteImpl noderef thepid@(ProcessId (NodeId hostname portid) localpid) nodeid msg bigh | thepid == nullPid = return (QteUnknownPid,Nothing) | otherwise = do node <- readMVar noderef res <- withSem (ndOutgoing node) (try setup) case res of Right n -> return n Left l | isEOFError l -> return (QteNetworkError (show l),Nothing) | isUserError l -> return (QteBadFormat,Nothing) | isDoesNotExistError l -> return (QteNetworkError (show l),Nothing) | otherwise -> return (QteOther $ show l,Nothing) where setup = bracket (acquireConnection) (\_ -> return ()) (sender) acquireConnection = case bigh of Nothing -> do h <- connectTo hostname (PortNumber $ toEnum portid) hSetBuffering h (BlockBuffering Nothing) return h Just h -> return h sender h = do cfg <- getConfigI noderef ret <- writeMessage h (cfgNetworkMagic cfg,localpid,nodeid,msg) return (ret,Just h) writeMessage :: Handle -> (String,LocalProcessId,NodeId,Message)-> IO TransmitStatus writeMessage h (magic,dest,nodeid,(EncodedMessage msgDisp msgHdr msgMsg)) = do hPutStrZ h $ unwords ["Rmt!!",magic,show dest,show msgDisp,show fmt,show nodeid] hFlush h response <- hGetLineZ h resp <- readIO response :: IO TransmitStatus case resp of QtePleaseSendBody -> do maybe (return ()) (hPutPayload h) (msgHdr) hPutPayload h msgMsg hFlush h response2 <- hGetLineZ h resp2 <- readIO response2 :: IO TransmitStatus return resp2 QteOK -> return QteBadFormat n -> return n where fmt = case msgHdr of Nothing -> (0::Int) Just _ -> 2 writeMessage _ _ = throw $ ServiceException "writeMessage went down wrong pipe" ---------------------------------------------- -- * Message delivery ---------------------------------------------- -- | Starts a message-receive loop on the given node. You probably don't want to call this function yourself. forkAndListenAndDeliver :: MVar Node -> Config -> IO () forkAndListenAndDeliver node cfg = do coord <- newEmptyMVar _tid <- forkIO $ listenAndDeliver node cfg (coord) result <- takeMVar coord maybe (return ()) throw result writeResult :: Handle -> TransmitStatus -> IO () writeResult h er = hPutStrZ h (show er) >> hFlush h readMessage :: Handle -> IO (String,LocalProcessId,NodeId,Message) readMessage h = do line <- hGetLineZ h case words line of ["Rmt!!",magic,destp,disp,format,nodeid] -> do adestp <- readIO destp :: IO LocalProcessId adisp <- readIO disp :: IO PayloadDisposition aformat <- readIO format :: IO Int anodeid <- readIO nodeid :: IO NodeId writeResult h QtePleaseSendBody hFlush h header <- case aformat of 2 -> do hdr <- hGetPayload h return $ Just hdr _ -> return Nothing body <- hGetPayload h return (magic,adestp,anodeid,EncodedMessage { msgEHeader = header, msgEDisposition = adisp, msgEPayload = body }) _ -> throw $ userError "Bad message format" getProcessTableEntry :: Node -> LocalProcessId -> STM (Maybe ProcessTableEntry) getProcessTableEntry tbl adestp = let res = Map.lookup adestp (ndProcessTable tbl) in case res of Just x -> do isdead <- readTVar (pteDeathT x) if isdead then return Nothing else return (Just x) Nothing -> return Nothing deliver :: LocalProcessId -> MVar Node -> Message -> IO (Maybe ProcessTableEntry) deliver adestp tbl msg = do node <- readMVar tbl atomically $ do pte <- getProcessTableEntry node adestp maybe (return Nothing) (\x -> writeTChan (pteChannel x) msg >> return (Just x)) pte listenAndDeliver :: MVar Node -> Config -> MVar (Maybe IOError) -> IO () listenAndDeliver node cfg coord = -- this listenon should be replaced with a lower-level listen call that binds -- to the interface corresponding to the name specified in cfgNodeName do res <- try $ bracket (setupSocket) (sClose) (\s -> finishStartup >> sockBody s) case res of Left e -> putMVar coord (Just e) Right _ -> return () where finishStartup = putMVar coord Nothing setupSocket = do sock <- listenOn whichPort setSocketOption sock KeepAlive 1 realPort <- socketPort sock modifyMVar_ node (\a -> return $ a {ndListenPort=fromEnum realPort}) return sock whichPort = if cfgListenPort cfg /= 0 then PortNumber $ toEnum $ cfgListenPort cfg else PortNumber aNY_PORT handleCommSafe h = (try $ handleComm h :: IO (Either IOError ())) >> return () {- UNUSED logNetworkError :: IOError -> IO () logNetworkError n = return () writeResultTry h q = do res <- try (writeResult h q) case res of Left n -> logNetworkError n Right q -> return () -} handleComm h = do (magic,adestp,_nodeid,msg) <- readMessage h res <- messageHandler cfg node (getMessageDisposition msg) msg magic adestp writeResult h res case res of QteOK -> handleComm h _ -> return () sockBody s = do hchan <- newChan _tid <- forkIO $ forever $ do h <- readChan hchan hSetBuffering h (BlockBuffering Nothing) forkIO $ (handleCommSafe h `finally` hClose h) forever $ do (newh,_,_) <- accept s writeChan hchan newh messageHandler :: Config -> MVar Node -> PayloadDisposition -> Message -> String -> LocalProcessId -> IO TransmitStatus messageHandler cfg node pld = case pld of PldAdmin -> dispositionAdminHandler PldUser -> dispositionUserHandler where validMagic magic = magic == cfgNetworkMagic cfg -- same network || magic == localRegistryMagicMagic -- message from local magic-neutral registry || cfgRole cfg == localRegistryMagicRole-- message to local magic-neutral registry dispositionAdminHandler msg magic adestp | validMagic magic = do realPid <- adminLookupN (toEnum adestp) node case realPid of Left er -> return er Right p -> do res <- deliver p node msg case res of Just _ -> return QteOK Nothing -> return QteUnknownPid | otherwise = return QteBadNetworkMagic dispositionUserHandler msg magic adestp | validMagic magic = do res <- deliver adestp node msg case res of Nothing -> return QteUnknownPid Just _ -> return QteOK | otherwise = return QteBadNetworkMagic -- | Blocks until all non-daemonic processes of the given -- node have ended. Usually called on the main thread of a program. waitForThreads :: MVar Node -> IO () waitForThreads mnode = do node <- takeMVar mnode waitFor (Map.toList (ndProcessTable node)) node where waitFor lst node= case lst of [] -> putMVar mnode node (pn,pte):rest -> if (pteDaemonic pte) then waitFor rest node else do putMVar mnode node -- this take and modify should be atomic to ensure no one squeezes in a message to a zombie process takeMVar (pteDeath pte) modifyMVar_ mnode (\x -> return $ x {ndProcessTable=Map.delete pn (ndProcessTable x)}) waitForThreads mnode ---------------------------------------------- -- * Miscellaneous utilities ---------------------------------------------- paddedString :: Int -> String -> String paddedString i s = s ++ (replicate (i-length s) ' ') newConversationId :: IO Int newConversationId = do d <- newUnique return $ hashUnique d -- TODO this is a huge performance bottleneck. Why? How to fix? exclusionList :: [a] -> [(a,[a])] exclusionList [] = [] exclusionList (a:rest) = each [] a rest where each before it [] = [(it,before)] each before it following@(next:after) = (it,before++following):(each (before++[it]) next after) {- dumpMessageQueue :: ProcessM [String] dumpMessageQueue = do p <- getProcess msgs <- cleanChannel ps <- liftIO $ modifyIORef (prState p) (\x -> x {prQueue = queueInsertMulti (prQueue x) msgs}) buf <- liftIO $ readIORef (prState p) return $ map msgPayload (queueToList $ prQueue buf) printDumpMessageQueue :: ProcessM () printDumpMessageQueue = do liftIO $ putStrLn "----BEGINDUMP------" dump <- dumpMessageQueue liftIO $ mapM putStrLn dump liftIO $ putStrLn "----ENDNDUMP-------" -} {- UNUSED duration :: Int -> ProcessM a -> ProcessM (Int,a) duration t a = do time1 <- liftIO $ getCurrentTime result <- a time2 <- liftIO $ getCurrentTime return (t - diffTime time2 time1,result) -} diffTime :: UTCTime -> UTCTime -> Int diffTime time2 time1 = picosecondsToMicroseconds (fromEnum (diffUTCTime time2 time1)) where picosecondsToMicroseconds a = a `div` 1000000 hGetLineZ :: Handle -> IO String hGetLineZ h = loop [] >>= return . reverse where loop l = do c <- hGetChar h case c of '\0' -> return l _ -> loop (c:l) hPutStrZ :: Handle -> String -> IO () hPutStrZ h [] = hPutChar h '\0' hPutStrZ h (c:l) = hPutChar h c >> hPutStrZ h l buildPid :: ProcessM () buildPid = do p <- getProcess node <- liftIO $ readMVar (prNodeRef p) let pid=ProcessId (NodeId (ndHostName node) (ndListenPort node)) (prSelf p) putProcess $ p {prPid = pid} nullPid :: ProcessId nullPid = ProcessId (NodeId "0.0.0.0" 0) 0 -- | Returns the node ID of the node that the current process is running on. getSelfNode :: ProcessM NodeId getSelfNode = do (ProcessId n _p) <- getSelfPid return n getNodeId :: MVar Node -> IO NodeId getNodeId mnode = do node <- readMVar mnode return (NodeId (ndHostName node) (ndListenPort node)) -- | Returns the process ID of the current process. getSelfPid :: ProcessM ProcessId getSelfPid = do p <- getProcess case prPid p of (ProcessId (NodeId _ 0) _) -> (buildPid >> getProcess >>= return.prPid) _ -> return $ prPid p nodeFromPid :: ProcessId -> NodeId nodeFromPid (ProcessId nid _) = nid makeNodeFromHost :: String -> PortId -> NodeId makeNodeFromHost = NodeId localFromPid :: ProcessId -> LocalProcessId localFromPid (ProcessId _ lid) = lid hostFromNid :: NodeId -> HostName hostFromNid (NodeId hn _p) = hn buildPidFromNodeId :: NodeId -> LocalProcessId -> ProcessId buildPidFromNodeId n lp = ProcessId n lp {- UNUSED localServiceToPid :: LocalProcessId -> ProcessM ProcessId localServiceToPid sid = do (ProcessId nid _lid) <- getSelfPid return $ ProcessId nid sid -} -- | Returns true if the given process ID is associated with the current node. -- Does not examine if the process is currently running. isPidLocal :: ProcessId -> ProcessM Bool isPidLocal pid = do mine <- getSelfPid return (nodeFromPid mine == nodeFromPid pid) ---------------------------------------------- -- * Exception handling ---------------------------------------------- suppressTransmitException :: ProcessM a -> ProcessM (Maybe a) suppressTransmitException a = do res <- ptry a case res of Left (TransmitException _) -> return Nothing Right r -> return $ Just r -- | A 'ProcessM'-flavoured variant of 'Control.Exception.try' ptry :: (Exception e) => ProcessM a -> ProcessM (Either e a) ptry f = do p <- getProcess res <- liftIO $ try (runProcessM f p) case res of Left e -> return $ Left e Right (newp,newanswer) -> ProcessM (\_ -> return (newp,Right newanswer)) {- UNUSED -- | A 'ProcessM'-flavoured variant of 'Control.Exception.catch' pcatch :: Exception e => ProcessM a -> (e -> ProcessM a) -> ProcessM a pcatch code handler = do p <- getProcess liftIO $ catch (liftM snd $ runProcessM code p) (\e -> liftM snd $ runProcessM (handler e) p) -} -- | A 'ProcessM'-flavoured variant of 'System.Timeout.timeout' ptimeout :: Int -> ProcessM a -> ProcessM (Maybe a) ptimeout t f = do p <- getProcess res <- liftIO $ System.Timeout.timeout t (runProcessM f p) case res of Nothing -> return Nothing Just (newp,newanswer) -> ProcessM (\_ -> return (newp,Just newanswer)) -- | A 'ProcessM'-flavoured variant of 'Control.Exception.bracket' pbracket :: (ProcessM a) -> (a -> ProcessM b) -> (a -> ProcessM c) -> ProcessM c pbracket before after fun = do p <- getProcess (newp2,newanswer2) <- liftIO $ bracket (runProcessM before p) (\(newp,newanswer) -> runProcessM (after newanswer) newp) (\(newp,newanswer) -> runProcessM (fun newanswer) newp) ProcessM (\_ -> return (newp2, newanswer2)) -- | A 'ProcessM'-flavoured variant of 'Control.Exception.finally' pfinally :: ProcessM a -> ProcessM b -> ProcessM a pfinally fun after = pbracket (return ()) (\_ -> after) (const fun) ---------------------------------------------- -- * Configuration file ---------------------------------------------- emptyConfig :: Config emptyConfig = Config { cfgRole = "NODE", cfgHostName = "", cfgListenPort = 0, cfgPeerDiscoveryPort = 38813, cfgLocalRegistryListenPort = 38813, cfgNetworkMagic = "MAGIC", cfgKnownHosts = [], cfgRoundtripTimeout = 10000000, cfgMaxOutgoing = 50, cfgPromiseFlushDelay = 5000000, cfgPromisePrefix = "rpromise-", cfgArgs = [] } -- | Reads in configuration data from external sources, specifically from the command line arguments -- and a configuration file. -- The first parameter to this function determines whether command-line arguments are consulted. -- If the second parameter is not 'Nothing' then it should be the name of the configuration file; -- an exception will be thrown if the specified file does not exist. -- Usually, this function shouldn't be called directly, but rather from 'Remote.Init.remoteInit', -- which also takes into account environment variables. -- Options set by command-line parameters have the highest precedence, -- followed by options read from a configuration file; if a configuration option is not explicitly -- specified anywhere, a reasonable default is used. The configuration file has a format, wherein -- one configuration option is specified on each line; the first token on each line is the name -- of the configuration option, followed by whitespace, followed by its value. Lines beginning with # -- are comments. Thus: -- -- > # This is a sample configuration file -- > cfgHostName host3 -- > cfgKnownHosts host1 host2 host3 host4 -- -- Options may be specified on the command line similarly. Note that command-line arguments containing spaces must be quoted. -- -- > ./MyProgram -cfgHostName=host3 -cfgKnownHosts='host1 host2 host3 host4' readConfig :: Bool -> Maybe FilePath -> IO Config readConfig useargs fp = do a <- safety (processConfigFile emptyConfig) ("while reading config file ") b <- safety (processArgs a) "while parsing command line " return b where processConfigFile from = maybe (return from) (readConfigFile from) fp processArgs from = if useargs then readConfigArgs from else return from safety o s = do res <- try o :: IO (Either SomeException Config) either (\e -> throw $ ConfigException $ s ++ show e) (return) res readConfigFile from afile = do contents <- readFile afile evaluate $ processConfig (lines contents) from readConfigArgs from = do args <- getArgs c <- evaluate $ processConfig (fst $ collectArgs args) from return $ c {cfgArgs = reverse $ snd $ collectArgs args} collectArgs the = foldl findArg ([],[]) the findArg (last,n) ('-':this) | isPrefixOf "cfg" this = ((map (\x -> if x=='=' then ' ' else x) this):last,n) findArg (last,n) a = (last,a:n) processConfig :: [String] -> Config -> Config processConfig rawLines from = foldl processLine from rawLines where processLine cfg line = case words line of ('#':_):_ -> cfg [] -> cfg [option,val] -> updateCfg cfg option val option:rest | (not . null) rest -> updateCfg cfg option (unwords rest) _ -> error $ "Bad configuration syntax: " ++ line updateCfg cfg "cfgRole" role = cfg {cfgRole=clean role} updateCfg cfg "cfgHostName" hn = cfg {cfgHostName=clean hn} updateCfg cfg "cfgListenPort" p = cfg {cfgListenPort=(read.isInt) p} updateCfg cfg "cfgPeerDiscoveryPort" p = cfg {cfgPeerDiscoveryPort=(read.isInt) p} updateCfg cfg "cfgRoundtripTimeout" p = cfg {cfgRoundtripTimeout=(read.isInt) p} updateCfg cfg "cfgPromiseFlushDelay" p = cfg {cfgPromiseFlushDelay=(read.isInt) p} updateCfg cfg "cfgMaxOutgoing" p = cfg {cfgMaxOutgoing=(read.isInt) p} updateCfg cfg "cfgPromisePrefix" p = cfg {cfgPromisePrefix=p} updateCfg cfg "cfgLocalRegistryListenPort" p = cfg {cfgLocalRegistryListenPort=(read.isInt) p} updateCfg cfg "cfgKnownHosts" m = cfg {cfgKnownHosts=words m} updateCfg cfg "cfgNetworkMagic" m = cfg {cfgNetworkMagic=clean m} updateCfg _ opt _ = error ("Unknown configuration option: "++opt) isInt s | all isDigit s = s isInt s = error ("Not a good number: "++s) {- UNUSED nonempty s | (not.null) s = s nonempty b = error ("Unexpected empty item: " ++ b) -} clean = filter (not.isSpace) ---------------------------------------------- -- * Logging ---------------------------------------------- -- | Specifies the importance of a particular log entry. -- Can also be used to filter log output. data LogLevel = LoSay -- ^ Non-suppressible application-level emission | LoFatal | LoCritical | LoImportant | LoStandard -- ^ The default log level | LoInformation | LoTrivial deriving (Eq,Ord,Enum,Show) instance Binary LogLevel where put n = put $ fromEnum n get = get >>= return . toEnum -- | Specifies the subsystem or region that is responsible for -- generating a given log entry. This is useful in conjunction -- with 'LogFilter' to limit displayed log output to the -- particular area of your program that you are currently debugging. -- The SYS, TSK, and SAY spheres are used by the framework -- for messages relating to the Process layer, the Task layer, -- and the 'say' function. -- The remainder of values are free for use at the application level. type LogSphere = String -- | A preference as to what is done with log messages data LogTarget = LtStdout -- ^ Messages will be output to the console; the default | LtForward NodeId -- ^ Log messages will be forwarded to the given node; please don't set up a loop | LtFile FilePath -- ^ Log messages will be appended to the given file | LtForwarded -- ^ Special value -- don't set this in your LogConfig! deriving (Typeable) instance Binary LogTarget where put LtStdout = putWord8 0 put (LtForward nid) = putWord8 1 >> put nid put (LtFile fp) = putWord8 2 >> put fp put LtForwarded = putWord8 3 get = do a <- getWord8 case a of 0 -> return LtStdout 1 -> get >>= return . LtForward 2 -> get >>= return . LtFile 3 -> return LtForwarded -- | Specifies which log messages will be output. -- All log messages of importance below the current -- log level or not among the criterea given here -- will be suppressed. This type lets you limit -- displayed log messages to certain components. data LogFilter = LfAll | LfOnly [LogSphere] | LfExclude [LogSphere] deriving (Typeable) instance Binary LogFilter where put LfAll = putWord8 0 put (LfOnly ls) = putWord8 1 >> put ls put (LfExclude le) = putWord8 2 >> put le get = do a <- getWord8 case a of 0 -> return LfAll 1 -> get >>= return . LfOnly 2 -> get >>= return . LfExclude -- | Expresses a current configuration of the logging -- subsystem, which determines which log messages to -- be output and where to send them when they are. -- Both processes and nodes have log configurations, -- set with 'setLogConfig' and 'setNodeLogConfig' -- respectively. The node log configuration is -- used for all processes that have not explicitly -- set their log configuration. Otherwise, the -- process log configuration takes priority. data LogConfig = LogConfig { logLevel :: LogLevel, -- ^ The lowest message priority that will be displayed logTarget :: LogTarget, -- ^ Where to send messages logFilter :: LogFilter -- ^ Other filtering } deriving (Typeable) instance Binary LogConfig where put (LogConfig ll lt lf) = put ll >> put lt >> put lf get = do ll <- get lt <- get lf <- get return $ LogConfig ll lt lf data LogMessage = LogMessage UTCTime LogLevel LogSphere String ProcessId LogTarget | LogUpdateConfig LogConfig deriving (Typeable) -- This correct instance of UTCTime -- works on 32-bit systems and is -- courtesy of Warren Harris instance Binary UTCTime where put t = do let d = toModifiedJulianDay (utctDay t) s = realToFrac $ utctDayTime t :: Pico ps = truncate $ s * 1e12 :: Integer put d >> put ps get = do d <- get ps <- get return $ UTCTime (ModifiedJulianDay d) (picosecondsToDiffTime ps) instance Binary LogMessage where put (LogMessage utc ll ls s pid target) = putWord8 0 >> put utc >> put ll >> put ls >> put s >> put pid >> put target put (LogUpdateConfig lc) = putWord8 1 >> put lc get = do a <- getWord8 case a of 0 -> do utc <- get ll <- get ls <- get s <- get pid <- get target <- get return $ LogMessage utc ll ls s pid target 1 -> do lc <- get return $ LogUpdateConfig lc instance Show LogMessage where show (LogMessage utc ll ls s pid _) = concat [paddedString 30 (show utc)," ",(show $ fromEnum ll)," ",paddedString 28 (show pid)," ",ls," ",s] show (LogUpdateConfig _) = "LogUpdateConfig" showLogMessage :: LogMessage -> IO String showLogMessage (LogMessage utc ll ls s pid _) = do gmt <- utcToLocalZonedTime utc return $ concat [paddedString 30 (show gmt)," ", (show $ fromEnum ll)," ", paddedString 28 (show pid)," ",ls," ",s] -- | The default log configuration represents -- a starting point for setting your own -- configuration. It is: -- -- > logLevel = LoStandard -- > logTarget = LtStdout -- > logFilter = LfAll defaultLogConfig :: LogConfig defaultLogConfig = LogConfig { logLevel = LoStandard, logTarget = LtStdout, logFilter = LfAll } logApplyFilter :: LogConfig -> LogSphere -> LogLevel -> Bool logApplyFilter cfg sph lev = filterSphere (logFilter cfg) && filterLevel (logLevel cfg) where filterSphere LfAll = True filterSphere (LfOnly lst) = elem sph lst filterSphere (LfExclude lst) = not $ elem sph lst filterLevel ll = lev <= ll -- | Uses the logging facility to produce non-filterable, programmatic output. Shouldn't be used -- for informational logging, but rather for application-level output. say :: String -> ProcessM () say v = v `seq` logS "SAY" LoSay v -- | Gets the currently active log configuration -- for the current process; if the current process -- doesn't have a log configuration set, the process's -- log configuration will be returned getLogConfig :: ProcessM LogConfig getLogConfig = do p <- getProcess case (prLogConfig p) of Just lc -> return lc Nothing -> do node <- liftIO $ readMVar (prNodeRef p) return (ndLogConfig node) -- | Set the process's log configuration. This overrides -- any node-level log configuration setLogConfig :: LogConfig -> ProcessM () setLogConfig lc = do p <- getProcess putProcess (p {prLogConfig = Just lc}) -- | Sets the node's log configuration setNodeLogConfig :: LogConfig -> ProcessM () setNodeLogConfig lc = do p <- getProcess liftIO $ modifyMVar_ (prNodeRef p) (\x -> return $ x {ndLogConfig = lc}) -- | Sets the log configuration of a remote node. -- May throw TransmitException setRemoteNodeLogConfig :: NodeId -> LogConfig -> ProcessM () setRemoteNodeLogConfig nid lc = do res <- sendSimple (adminGetPid nid ServiceLog) (LogUpdateConfig lc) PldAdmin case res of QteOK -> return () _n -> throw $ TransmitException $ QteLoggingError logI :: MVar Node -> ProcessId -> LogSphere -> LogLevel -> String -> IO () logI mnode pid sph ll txt = do node <- readMVar mnode when (filtered (ndLogConfig node)) (sendMsg (ndLogConfig node)) where filtered cfg = logApplyFilter cfg sph ll makeMsg cfg = do time <- getCurrentTime return $ LogMessage time ll sph txt pid (logTarget cfg) sendMsg cfg = do msg <- makeMsg cfg res <- let svc = (adminGetPid nid ServiceLog) nid = nodeFromPid pid in sendBasic mnode svc msg (Nothing::Maybe()) PldAdmin Nothing case res of _ -> return () -- ignore error -- what can I do? -- | Generates a log entry, using the process's current logging configuration. -- -- * 'LogSphere' indicates the subsystem generating this message. SYS in the case of componentes of the framework. -- -- * 'LogLevel' indicates the importance of the message. -- -- * The third parameter is the log message. -- -- Both of the first two parameters may be used to filter log output. logS :: LogSphere -> LogLevel -> String -> ProcessM () logS sph ll txt = do lc <- txt `seq` getLogConfig when (filtered lc) (sendMsg lc) where filtered cfg = logApplyFilter cfg sph ll makeMsg cfg = do time <- liftIO $ getCurrentTime pid <- getSelfPid return $ LogMessage time ll sph txt pid (logTarget cfg) sendMsg cfg = do msg <- makeMsg cfg nid <- getSelfNode res <- let svc = (adminGetPid nid ServiceLog) in sendSimple svc msg PldAdmin case res of QteOK -> return () _n -> throw $ TransmitException $ QteLoggingError startLoggingService :: ProcessM () startLoggingService = serviceThread ServiceLog logger where logger = receiveWait [matchLogMessage,matchUnknownThrow] >> logger matchLogMessage = match (\msg -> do mylc <- getLogConfig case msg of (LogMessage _ _ _ _ _ LtForwarded) -> processMessage (logTarget mylc) msg (LogMessage _ _ _ _ _ _) -> processMessage (targetPreference msg) msg (LogUpdateConfig lc) -> setNodeLogConfig lc) targetPreference (LogMessage _ _ _ _ _ a) = a forwardify (LogMessage a b c d e _) = LogMessage a b c d e LtForwarded processMessage whereto txt = do smsg <- liftIO $ showLogMessage txt case whereto of LtStdout -> liftIO $ putStrLn smsg LtFile fp -> (ptry (liftIO (appendFile fp (smsg ++ "\n"))) :: ProcessM (Either IOError ()) ) >> return () -- ignore error - what can we do? LtForward nid -> do self <- getSelfNode when (self /= nid) (sendSimple (adminGetPid nid ServiceLog) (forwardify txt) PldAdmin >> return ()) -- ignore error -- what can we do? _n -> throw $ ConfigException $ "Invalid message forwarded setting" ---------------------------------------------- -- * Node monitoring ---------------------------------------------- data NodeMonitorCommand = NodeMonitorStart NodeId | NodeMonitorPing deriving (Typeable) instance Binary NodeMonitorCommand where put (NodeMonitorStart nid) = putWord8 0 >> put nid put (NodeMonitorPing) = putWord8 1 get = do a <- getWord8 case a of 0 -> do b <- get return $ NodeMonitorStart b 1 -> return NodeMonitorPing data NodeMonitorSignal = NodeMonitorNodeFailure NodeId deriving (Typeable) instance Binary NodeMonitorSignal where put (NodeMonitorNodeFailure nid) = put nid get = do a <- get return $ NodeMonitorNodeFailure a data NodeMonitorInformation = NodeMonitorNodeDown NodeId deriving (Typeable) instance Binary NodeMonitorInformation where put (NodeMonitorNodeDown nid) = put nid get = do a <- get return $ NodeMonitorNodeDown a monitorNode :: NodeId -> ProcessM () monitorNode nid = do mynid <- getSelfNode res <- roundtripQueryLocal PldAdmin (adminGetPid mynid ServiceNodeMonitor) (NodeMonitorStart nid) case res of Right () -> return () _ -> (throw $ ServiceException $ "while contacting node monitor "++show res) -- | Sends a small message to the specified node to determine if it's alive. -- If the node cannot be reached or does not respond within a time frame, the function -- will return False. pingNode :: NodeId -> ProcessM Bool pingNode nid = do res <- roundtripQueryUnsafe PldAdmin (adminGetPid nid ServiceNodeMonitor) (NodeMonitorPing) case res of Right () -> return True _ -> return False -- TODO this can be re-engineered to avoid setting up and tearing down TCP connections and threads -- at every ping. Instead open up a TCP connection to the pingee and require it to send us a hello -- every N seconds or die. Also, this has a bug in that the number of failures is not reset after a successful ping startNodeMonitorService :: ProcessM () startNodeMonitorService = serviceThread ServiceNodeMonitor (service Map.empty) where service state = let matchCommand cmd = case cmd of NodeMonitorPing -> return ((),state) NodeMonitorStart nid -> do res <- addmonitor nid return ((),res) matchSignal cmd = case cmd of NodeMonitorNodeFailure nid -> do res <- handlefailure nid return (res) listenaction nid mainpid = let onfailure = sendSimple mainpid (NodeMonitorNodeFailure nid) PldUser >> return () loop = do res <- pingNode nid case res of False -> onfailure True -> liftIO (threadDelay interpingtimeout) >> loop in do loop failurelimit = 3 -- TODO make configurable interpingtimeout = 5000000 retrytimeout = 1000000 reportfailure nid = do mynid <- getSelfNode sendSimple (adminGetPid mynid ServiceProcessMonitor) (GlNodeDown nid) PldAdmin handlefailure nid = case Map.lookup nid state of Just c -> if c >= failurelimit then do _ <- reportfailure nid return (Map.delete nid state) else do mypid <- getSelfPid _ <- spawnLocalAnd (liftIO (threadDelay retrytimeout) >> listenaction nid mypid) setDaemonic return (Map.adjust succ nid state) Nothing -> return state addmonitor nid = case Map.member nid state of True -> return state False -> do mynid <- getSelfNode mypid <- getSelfPid if mynid==nid then return state else do _ <- spawnLocalAnd (listenaction nid mypid) setDaemonic return $ Map.insert nid (0) state in receiveWait [roundtripResponse matchCommand, match matchSignal, matchUnknownThrow] >>= service ---------------------------------------------- -- * Service helpers ---------------------------------------------- data ServiceId = ServiceLog | ServiceSpawner | ServiceNodeRegistry | ServiceNodeMonitor | ServiceProcessMonitor | ServiceProcessRegistry deriving (Ord,Eq,Enum,Show) adminGetPid :: NodeId -> ServiceId -> ProcessId adminGetPid nid sid = ProcessId nid (fromEnum sid) adminDeregister :: ServiceId -> ProcessM () adminDeregister val = do p <- getProcess pid <- getSelfPid liftIO $ modifyMVar_ (prNodeRef p) (fun $ localFromPid pid) where fun pid node = return $ node {ndAdminProcessTable = Map.delete val (ndAdminProcessTable node)} adminRegister :: ServiceId -> ProcessM () adminRegister val = do p <- getProcess pid <- getSelfPid liftIO $ modifyMVar_ (prNodeRef p) (\node -> if Map.member val (ndAdminProcessTable node) then throw $ ServiceException $ "Duplicate administrative registration at index " ++ show val else (fun (localFromPid pid) node)) where fun pid node = return $ node {ndAdminProcessTable = Map.insert val pid (ndAdminProcessTable node)} {- UNUSED adminLookup :: ServiceId -> ProcessM LocalProcessId adminLookup val = do p <- getProcess node <- liftIO $ readMVar (prNodeRef p) case Map.lookup val (ndAdminProcessTable node) of Nothing -> throw $ ServiceException $ "Request for unknown administrative service " ++ show val Just x -> return x -} adminLookupN :: ServiceId -> MVar Node -> IO (Either TransmitStatus LocalProcessId) adminLookupN val mnode = do node <- readMVar mnode case Map.lookup val (ndAdminProcessTable node) of Nothing -> return $ Left QteUnknownPid Just x -> return $ Right x startFinalizerService :: ProcessM () -> ProcessM () startFinalizerService todo = spawnLocalAnd body prefix >> return () where prefix = setDaemonic body = do p <- getProcess node <- liftIO $ readMVar (prNodeRef p) liftIO $ takeMVar (ndNodeFinalizer node) todo `pfinally` (liftIO $ putMVar (ndNodeFinalized node) ()) performFinalization :: MVar Node -> IO () performFinalization mnode = do node <- readMVar mnode putMVar (ndNodeFinalizer node) () takeMVar (ndNodeFinalized node) setDaemonic :: ProcessM () setDaemonic = do p <- getProcess pid <- getSelfPid liftIO $ modifyMVar_ (prNodeRef p) (\node -> return $ node {ndProcessTable=Map.adjust (\pte -> pte {pteDaemonic=True}) (localFromPid pid) (ndProcessTable node)}) serviceThread :: ServiceId -> ProcessM () -> ProcessM () serviceThread v f = spawnLocalAnd (pbracket (return ()) (\_ -> adminDeregister v >> logError) (\_ -> f)) (adminRegister v >> setDaemonic) >> (return()) where logError = logS "SYS" LoFatal $ "System process "++show v++" has terminated" -- TODO maybe restart? ---------------------------------------------- -- * Process registry service ---------------------------------------------- data ProcessRegistryState = ProcessRegistryState (Map.Map String ProcessId) (Map.Map ProcessId String) data ProcessRegistryCommand = ProcessRegistryQuery String (Maybe (Closure (ProcessM ()))) | ProcessRegistrySet String ProcessId deriving (Typeable) instance Binary ProcessRegistryCommand where put (ProcessRegistryQuery a b) = putWord8 0 >> put a >> put b put (ProcessRegistrySet a b) = putWord8 1 >> put a >> put b get = do cmd <- getWord8 case cmd of 0 -> do a <- get b <- get return $ ProcessRegistryQuery a b 1 -> do a <- get b <- get return $ ProcessRegistrySet a b data ProcessRegistryAnswer = ProcessRegistryResponse (Maybe ProcessId) | ProcessRegistryError String deriving (Typeable) instance Binary ProcessRegistryAnswer where put (ProcessRegistryResponse a) = putWord8 0 >> put a put (ProcessRegistryError s) = putWord8 1 >> put s get = do a <- getWord8 case a of 0 -> get >>= (return . ProcessRegistryResponse) 1 -> get >>= (return . ProcessRegistryError) startProcessRegistryService :: ProcessM () startProcessRegistryService = serviceThread ServiceProcessRegistry (service initialState) where initialState = ProcessRegistryState Map.empty Map.empty service state@(ProcessRegistryState nameToPid pidToName) = let downs (ProcessMonitorException pid _why) = case Map.lookup pid pidToName of Just name -> let newPidToName = Map.delete pid pidToName newNameToPid = Map.delete name nameToPid in return (ProcessRegistryState newNameToPid newPidToName) Nothing -> return state cmds cmd = case cmd of ProcessRegistrySet name pid -> case (Map.lookup pid pidToName, Map.lookup name nameToPid) of (Nothing,Nothing) -> let newNameToPid = Map.insert name pid nameToPid newPidToName = Map.insert pid name pidToName in do islocal <- isPidLocal pid case islocal of True -> do mypid <- getSelfPid ok <- monitorProcessQuiet mypid pid MaMonitor case ok of True -> return (ProcessRegistryResponse Nothing,ProcessRegistryState newNameToPid newPidToName) False -> return (ProcessRegistryError $ "Couldn't establish monitoring of task in naming "++name,state) False -> return (ProcessRegistryError $ "Refuse to register nonlocal process" ++ show pid,state) (Nothing,_) -> return (ProcessRegistryError $ "The name "++name++" has already been registered",state) (_,_) -> return (ProcessRegistryError $ "The process "++show pid++" has already been registered",state) ProcessRegistryQuery name mClo -> case Map.lookup name nameToPid of Just pid -> return (ProcessRegistryResponse (Just pid),state) Nothing -> case mClo of Nothing -> return (ProcessRegistryResponse Nothing,state) Just clo -> do mynid <- getSelfNode mypid <- getSelfPid pid <- spawnAnd mynid clo defaultSpawnOptions {amsoMonitor=Just (mypid,MaMonitor)} let newNameToPid = Map.insert name pid nameToPid newPidToName = Map.insert pid name pidToName return (ProcessRegistryResponse (Just pid),ProcessRegistryState newNameToPid newPidToName) in receiveWait [roundtripResponse cmds,match downs] >>= service -- TODO nameQueryOrWait :: NodeId -> String -> ProcessM ProcessId -- | Similar to 'nameQuery' but if the named process doesn't exist, -- it will be started from the given closure. If the process is -- already running, the closure will be ignored. nameQueryOrStart :: NodeId -> String -> Closure (ProcessM ()) -> ProcessM ProcessId nameQueryOrStart nid name clo = let servicepid = adminGetPid nid ServiceProcessRegistry msg = ProcessRegistryQuery name (Just clo) in do res <- roundtripQueryUnsafe PldAdmin servicepid msg case res of Right (ProcessRegistryResponse (Just answer)) -> return answer Right (ProcessRegistryError s) -> throw $ ServiceException s _ -> throw $ ServiceException $ "Crazy talk from process registry" -- | Query the PID of a named process on a particular node. -- If no process of that name exists, or if that -- process has ended, this function returns Nothing. nameQuery :: NodeId -> String -> ProcessM (Maybe ProcessId) nameQuery nid name = let servicepid = adminGetPid nid ServiceProcessRegistry msg = ProcessRegistryQuery name Nothing in do res <- roundtripQueryUnsafe PldAdmin servicepid msg case res of Right (ProcessRegistryResponse answer) -> return answer Right (ProcessRegistryError s) -> throw $ ServiceException s _ -> throw $ ServiceException $ "Crazy talk from process registry" -- | Assigns a name to the current process. The name is local to the -- node. On each node, each process may have only one name, and each -- name may be given to only one node. If this function is called -- more than once by the same process, or called more than once -- with the name on a single node, it will throw a 'ServiceException'. -- The PID of a named process can be queried later with 'nameQuery'. When the -- named process ends, its name will again become available. -- One reason to use named processes is to create node-local state. -- This example lets each node have its own favorite color, which can -- be changed and queried. -- -- > nodeFavoriteColor :: ProcessM () -- > nodeFavoriteColor = -- > do nameSet "favorite_color" -- > loop Blue -- > where loop color = -- > receiveWait -- > [ match (\newcolor -> return newcolor), -- > match (\pid -> send pid color >> return color) -- > ] >>= loop -- > -- > setFavoriteColor :: NodeId -> Color -> ProcessM () -- > setFavoriteColor nid color = -- > do (Just pid) <- nameQuery nid "favorite_color" -- > send pid color -- > -- > getFavoriteColor :: NodeId -> ProcessM Color -- > getFavoriteColor nid = -- > do (Just pid) <- nameQuery nid "favorite_color" -- > mypid <- getSelfPid -- > send pid mypid -- > expect nameSet :: String -> ProcessM () nameSet name = do mynid <- getSelfNode mypid <- getSelfPid let servicepid = adminGetPid mynid ServiceProcessRegistry msg = ProcessRegistrySet name mypid res <- roundtripQueryLocal PldAdmin servicepid msg case res of Right (ProcessRegistryResponse Nothing) -> return () Right (ProcessRegistryError s) -> throw $ ServiceException s _ -> throw $ ServiceException $ "Crazy talk from process registry" ---------------------------------------------- -- * Global and spawn service ---------------------------------------------- data GlSignal = GlProcessDown ProcessId SignalReason | GlNodeDown NodeId deriving (Typeable,Show) instance Binary GlSignal where put (GlProcessDown a b) = putWord8 0 >> put a >> put b put (GlNodeDown a) = putWord8 1 >> put a get = do ch <- getWord8 case ch of 0 -> do a <- get b <- get return $ GlProcessDown a b 1 -> do a <- get return $ GlNodeDown a data GlSynchronous = GlRequestMonitoring ProcessId ProcessId MonitorAction | GlRequestMoniteeing ProcessId NodeId | GlRequestUnmonitoring ProcessId ProcessId MonitorAction deriving (Typeable) instance Binary GlSynchronous where put (GlRequestMonitoring a b c) = putWord8 0 >> put a >> put b >> put c put (GlRequestMoniteeing a b) = putWord8 1 >> put a >> put b put (GlRequestUnmonitoring a b c) = putWord8 2 >> put a >> put b >> put c get = do ch <- getWord8 case ch of 0 -> do a <- get b <- get c <- get return $ GlRequestMonitoring a b c 1 -> do a <- get b <- get return $ GlRequestMoniteeing a b 2 -> do a <- get b <- get c <- get return $ GlRequestUnmonitoring a b c data GlCommand = GlMonitor ProcessId ProcessId MonitorAction | GlUnmonitor ProcessId ProcessId MonitorAction deriving (Typeable) instance Binary GlCommand where put (GlMonitor a b c) = putWord8 0 >> put a >> put b >> put c put (GlUnmonitor a b c) = putWord8 1 >> put a >> put b >> put c get = do ch <- getWord8 case ch of 0 -> do a <- get b <- get c <- get return $ GlMonitor a b c 1 -> do a <- get b <- get c <- get return $ GlUnmonitor a b c -- | The different kinds of monitoring available between processes. data MonitorAction = MaMonitor -- ^ MaMonitor means that the monitor process will be sent a ProcessMonitorException message when the monitee terminates for any reason. | MaLink -- ^ MaLink means that the monitor process will receive an asynchronous exception of type ProcessMonitorException when the monitee terminates for any reason | MaLinkError -- ^ MaLinkError means that the monitor process will receive an asynchronous exception of type ProcessMonitorException when the monitee terminates abnormally deriving (Typeable,Show,Ord,Eq) instance Binary MonitorAction where put MaMonitor = putWord8 0 put MaLink = putWord8 1 put MaLinkError = putWord8 2 get = getWord8 >>= \x -> case x of 0 -> return MaMonitor 1 -> return MaLink 2 -> return MaLinkError -- | Part of the notification system of process monitoring, indicating why the monitor is being notified. data SignalReason = SrNormal -- ^ the monitee terminated normally | SrException String -- ^ the monitee terminated with an uncaught exception, which is given as a string | SrNoPing -- ^ the monitee is believed to have ended or be inaccessible, as the node on which its running is not responding to pings. This may indicate a network bisection or that the remote node has crashed. | SrInvalid -- ^ SrInvalid: the monitee was not running at the time of the attempt to establish monitoring deriving (Typeable,Show) instance Binary SignalReason where put SrNormal = putWord8 0 put (SrException s) = putWord8 1 >> put s put SrNoPing = putWord8 2 put SrInvalid = putWord8 3 get = do a <- getWord8 case a of 0 -> return SrNormal 1 -> get >>= return . SrException 2 -> return SrNoPing 3 -> return SrInvalid type GlLinks = Map.Map ProcessId (Map.Map (LocalProcessId,MonitorAction) (Int), Map.Map LocalProcessId (Int), Map.Map NodeId ()) data GlobalData = GlobalData { glLinks :: GlLinks, glNextId :: Integer, glSyncs :: Map.Map Integer (GlobalData -> MatchM GlobalData ()) } gdCombineEntry :: (Map.Map (LocalProcessId,MonitorAction) (Int), Map.Map LocalProcessId (Int), Map.Map NodeId ()) -> (Map.Map (LocalProcessId,MonitorAction) (Int), Map.Map LocalProcessId (Int), Map.Map NodeId ()) -> (Map.Map (LocalProcessId,MonitorAction) (Int), Map.Map LocalProcessId (Int), Map.Map NodeId ()) gdCombineEntry (newmonitors,newmonitees,newnodes) (oldmonitors,oldmonitees,oldnodes) = let finalnodes = Map.unionWith const newnodes oldnodes finalmonitors = Map.unionWith (+) newmonitors oldmonitors finalmonitees = Map.unionWith (+) newmonitees oldmonitees in (finalmonitors,finalmonitees,finalnodes) gdAddMonitor :: GlLinks -> ProcessId -> MonitorAction -> LocalProcessId -> GlLinks gdAddMonitor gl pid ma lpid = Map.insertWith' gdCombineEntry pid (Map.singleton (lpid,ma) 1, Map.empty,Map.empty) gl gdDelMonitor :: GlLinks -> ProcessId -> MonitorAction -> LocalProcessId -> GlLinks gdDelMonitor gl pid ma lpid = Map.adjust editentry pid gl where editentry (mons,mots,ns) = (fixmons mons,mots,ns) fixmons m = Map.update (\a -> if pred a == 0 then Nothing else Just (pred a)) (lpid,ma) m gdAddMonitee :: GlLinks -> ProcessId -> LocalProcessId -> GlLinks gdAddMonitee gl pid lpid = Map.insertWith' gdCombineEntry pid (Map.empty,Map.singleton lpid 1,Map.empty) gl gdDelMonitee :: GlLinks -> ProcessId -> LocalProcessId -> GlLinks gdDelMonitee gl pid lpid = Map.adjust editentry pid gl where editentry (mons,mots,ns) = (mons,fixmons mots,ns) fixmons m = Map.update (\a -> if pred a == 0 then Nothing else Just (pred a)) lpid m glExpungeProcess :: GlLinks -> ProcessId -> NodeId -> GlLinks glExpungeProcess gl pid myself = let mine n = buildPidFromNodeId myself n in case Map.lookup pid gl of Nothing -> gl Just (mons,mots,_ns) -> let s1 = Map.delete pid gl s2 = foldl' (\g (lp,_)-> Map.delete (mine lp) g) s1 (Map.keys mons) s3 = foldl' (\g lp -> Map.delete (mine lp) g) s2 (Map.keys mots) in s3 gdAddNode :: GlLinks -> ProcessId -> NodeId -> GlLinks gdAddNode gl pid nid = Map.insertWith' gdCombineEntry pid (Map.empty,Map.empty,Map.singleton nid ()) gl -- | The main form of notification to a monitoring process that a monitored process has terminated. -- This data structure can be delivered to the monitor either as a message (if the monitor is -- of type 'MaMonitor') or as an asynchronous exception (if the monitor is of type 'MaLink' or 'MaLinkError'). -- It contains the PID of the monitored process and the reason for its nofication. data ProcessMonitorException = ProcessMonitorException ProcessId SignalReason deriving (Typeable) instance Binary ProcessMonitorException where put (ProcessMonitorException pid sr) = put pid >> put sr get = do pid <- get sr <- get return $ ProcessMonitorException pid sr instance Exception ProcessMonitorException instance Show ProcessMonitorException where show (ProcessMonitorException pid why) = "ProcessMonitorException: " ++ show pid ++ " has terminated because "++show why -- | Establishes bidirectional abnormal termination monitoring between the current -- process and another. Monitoring established with linkProcess -- is bidirectional and signals only in the event of abnormal termination. -- In other words, @linkProcess a@ is equivalent to: -- -- > monitorProcess mypid a MaLinkError -- > monitorProcess a mypid MaLinkError linkProcess :: ProcessId -> ProcessM () linkProcess p = do mypid <- getSelfPid mynid <- getSelfNode let servicepid = (adminGetPid mynid ServiceProcessMonitor) msg1 = GlMonitor mypid p MaLinkError msg2 = GlMonitor p mypid MaLinkError res1 <- roundtripQueryLocal PldAdmin servicepid msg1 case res1 of Right QteOK -> do res2 <- roundtripQueryLocal PldAdmin servicepid msg2 case res2 of Right QteOK -> return () Right err -> herr err Left err -> herr err Right err -> herr err Left err -> herr err where herr err = throw $ ServiceException $ "Error when linking process " ++ show p ++ ": " ++ show err -- | A specialized version of 'match' (for use with 'receive', 'receiveWait' and friends) for catching process down -- messages. This way processes can avoid waiting forever for a response from another process that has crashed. -- Intended to be used within a 'withMonitor' block, e.g.: -- -- > withMonitor apid $ -- > do send apid QueryMsg -- > receiveWait -- > [ -- > match (\AnswerMsg -> return "ok"), -- > matchProcessDown apid (return "aborted") -- > ] matchProcessDown :: ProcessId -> ProcessM q -> MatchM q () matchProcessDown pid f = matchIf (\(ProcessMonitorException p _) -> p==pid) (const f) -- | Establishes temporary monitoring of another process. The process to be monitored is given in the -- first parameter, and the code to run in the second. If the given process goes down while the code -- in the second parameter is running, a process down message will be sent to the current process, -- which can be handled by 'matchProcessDown'. withMonitor :: ProcessId -> ProcessM a -> ProcessM a withMonitor pid f = withMonitoring pid MaMonitor f withMonitoring :: ProcessId -> MonitorAction -> ProcessM a -> ProcessM a withMonitoring pid how f = do mypid <- getSelfPid monitorProcess mypid pid how -- TODO if this throws a ServiceException, translate that into a trigger a <- f `pfinally` safety (unmonitorProcess mypid pid how) return a where safety n = ptry n :: ProcessM (Either ServiceException ()) -- | Establishes unidirectional processing of another process. The format is: -- -- > monitorProcess monitor monitee action -- -- Here, -- -- * monitor is the process that will be notified if the monitee goes down -- -- * monitee is the process that will be monitored -- -- * action determines how the monitor will be notified -- -- Monitoring will remain in place until one of the processes ends or until -- 'unmonitorProcess' is called. Calls to 'monitorProcess' are cumulative, -- such that calling 'monitorProcess' 3 three times on the same pair of processes -- will ensure that monitoring will stay in place until 'unmonitorProcess' is called -- three times on the same pair of processes. -- If the monitee is not currently running, the monitor will be signalled immediately. -- See also 'MonitorAction'. monitorProcess :: ProcessId -> ProcessId -> MonitorAction -> ProcessM () monitorProcess monitor monitee how = monitorProcessImpl GlMonitor monitor monitee how True >> return () -- | Removes monitoring established by 'monitorProcess'. Note that the type of -- monitoring, given in the third parameter, must match in order for monitoring -- to be removed. If monitoring has not already been established between these -- two processes, this function takes not action. unmonitorProcess :: ProcessId -> ProcessId -> MonitorAction -> ProcessM () unmonitorProcess monitor monitee how = monitorProcessImpl GlUnmonitor monitor monitee how True >> return () monitorProcessQuiet :: ProcessId -> ProcessId -> MonitorAction -> ProcessM Bool monitorProcessQuiet monitor monitee how = do res <- monitorProcessImpl GlMonitor monitor monitee how False case res of QteOK -> return True _ -> return False monitorProcessImpl :: (ProcessId -> ProcessId -> MonitorAction -> GlCommand) -> ProcessId -> ProcessId -> MonitorAction -> Bool -> ProcessM TransmitStatus monitorProcessImpl msgtype monitor monitee how throwit = let msg = msgtype monitor monitee how in do let servicepid = (adminGetPid (nodeFromPid monitee) ServiceProcessMonitor) res <- roundtripQueryUnsafe PldAdmin servicepid msg -- TODO: Prefer to send this message to the local service case res of Right QteOK -> return QteOK Right err -> herr err Left err -> herr err where herr err = case throwit of True -> throw $ ServiceException $ "Error when monitoring process " ++ show monitee ++ ": " ++ show err False -> return err sendInterrupt :: (Exception e) => ProcessId -> e -> ProcessM Bool sendInterrupt pid e = do islocal <- isPidLocal pid case islocal of False -> return False True -> do p <- getProcess node <- liftIO $ readMVar (prNodeRef p) res <- liftIO $ atomically $ getProcessTableEntry node (localFromPid pid) case res of Just pte -> do liftIO $ throwTo (pteThread pte) e return True Nothing -> return False triggerMonitor :: ProcessId -> ProcessId -> MonitorAction -> SignalReason -> ProcessM () triggerMonitor towho aboutwho how why = let msg = ProcessMonitorException aboutwho why in case how of MaMonitor -> sendSimple towho msg PldUser >> return () MaLink -> sendInterrupt towho msg >> return () MaLinkError -> case why of SrNormal -> return () _ -> sendInterrupt towho msg >> return () startProcessMonitorService :: ProcessM () startProcessMonitorService = serviceThread ServiceProcessMonitor (service emptyGlobal) where emptyGlobal = GlobalData {glLinks=Map.empty, glNextId=0, glSyncs = Map.empty} getGlobalFor pid = adminGetPid (nodeFromPid pid) ServiceProcessMonitor checkliveness pid = do islocal <- isPidLocal pid case islocal of True -> isProcessUp (localFromPid pid) False -> return True checklivenessandtrigger monitor monitee action = do a <- checkliveness monitee case a of True -> return True False -> do trigger monitor monitee action SrInvalid return False trigger = triggerMonitor {- UNUSED forward destinationnode msg = sendSimple (getGlobalFor destinationnode) msg PldAdmin -} isProcessUp lpid = do p <- getProcess node <- liftIO $ readMVar (prNodeRef p) res <- liftIO $ atomically $ getProcessTableEntry node lpid case res of Nothing -> return (lpid<0) Just _ -> return True removeLocalMonitee gl monitor monitee _action = gl {glLinks = gdDelMonitee (glLinks gl) monitor (localFromPid monitee) } removeLocalMonitor gl monitor monitee action = gl {glLinks = gdDelMonitor (glLinks gl) monitee action (localFromPid monitor) } addLocalMonitee gl monitor monitee _action = gl {glLinks = gdAddMonitee (glLinks gl) monitor (localFromPid monitee) } addLocalMonitor gl monitor monitee action = gl {glLinks = gdAddMonitor (glLinks gl) monitee action (localFromPid monitor) } addLocalNode gl monitor monitee _action = gl {glLinks = gdAddNode (glLinks gl) monitee (nodeFromPid monitor)} broadcast nids msg = mapM_ (\p -> forkProcessWeak $ ((ptimeout 5000000 $ sendSimple (adminGetPid p ServiceProcessMonitor) msg PldAdmin) >> return ())) nids handleProcessDown :: GlLinks -> ProcessId -> SignalReason -> ProcessM GlLinks handleProcessDown global pid why = do islocal <- isPidLocal pid mynid <- getSelfNode case Map.lookup pid global of Nothing -> return global Just (monitors,_monitee,nodes) -> do mapM_ (\(tellwho,how) -> trigger (buildPidFromNodeId mynid tellwho) pid how why) (Map.keys monitors) when (islocal) (broadcast (Map.keys nodes) (GlProcessDown pid why)) mynid <- getSelfNode return $ glExpungeProcess global pid mynid service global = let additional = map (\receiver -> receiver global) (Map.elems (glSyncs global)) matchPatterns = [match matchSignals, roundtripResponseAsync matchCommands False, roundtripResponse (matchSyncs global)] ++ additional matchSignals cmd = case cmd of GlProcessDown pid why -> do res <- handleProcessDown (glLinks global) pid why return $ global { glLinks = res} GlNodeDown nid -> let gl = glLinks global aslist = Map.keys gl pids = filter (\n -> nodeFromPid n == nid) aslist in do res <- foldM (\g p -> handleProcessDown g p SrNoPing) gl pids return $ global {glLinks = res} matchSyncs global cmd = case cmd of GlRequestMonitoring monitee monitor action -> let s1 = addLocalMonitor global monitor monitee action in monitorNode (nodeFromPid monitee) >> return (QteOK,s1) GlRequestMoniteeing pid towho -> do live <- checkliveness pid case live of True -> let s1 = global {glLinks = gdAddNode (glLinks global) pid towho} in return (QteOK,s1) False -> return (QteUnknownPid,global) GlRequestUnmonitoring monitee monitor action -> let s1 = removeLocalMonitor global monitor monitee action in return (QteOK,s1) matchCommands cmd ans = case cmd of GlMonitor monitor monitee action -> do ismoniteelocal <- isPidLocal monitee ismonitorlocal <- isPidLocal monitor case (ismoniteelocal,ismonitorlocal) of (True,True) -> do live <- checklivenessandtrigger monitor monitee action case live of True -> let s1 = addLocalMonitee global monitor monitee action s2 = addLocalMonitor s1 monitor monitee action in ans QteOK >> return s2 False -> ans QteOK >> return global (True,False) -> do live <- checklivenessandtrigger monitor monitee action case live of False -> ans QteOK >> return global True -> let msg = GlRequestMonitoring monitee monitor action receiver myId myGlobal myMsg = let newGlobal = myGlobal {glSyncs = Map.delete myId (glSyncs myGlobal)} in case myMsg of QteOK -> let s1 = addLocalNode newGlobal monitor monitee action in do _ <- ans QteOK return s1 err -> ans err >> return newGlobal in do mmatch <- roundtripQueryImplSub PldAdmin (getGlobalFor monitor) msg (receiver (glNextId global)) case mmatch of Left err -> ans err >> return global Right mymatch -> return global {glNextId=glNextId global+1, glSyncs=Map.insert (glNextId global) (mymatch) (glSyncs global)} (False,True) -> let msg = GlRequestMoniteeing monitee (nodeFromPid monitor) receiver myId myGlobal myMsg = let newGlobal = myGlobal {glSyncs = Map.delete myId (glSyncs myGlobal)} in case myMsg of QteOK -> let s1 = addLocalMonitor newGlobal monitor monitee action in do monitorNode (nodeFromPid monitee) _ <- ans QteOK return s1 QteUnknownPid -> do trigger monitor monitee action SrInvalid _ <- ans QteOK return newGlobal err -> do _ <- ans err return newGlobal in do mmatch <- roundtripQueryImplSub PldAdmin (getGlobalFor monitee) msg (receiver (glNextId global)) case mmatch of Left err -> ans err >> return global Right mymatch -> return global {glNextId=glNextId global+1, glSyncs=Map.insert (glNextId global) (mymatch) (glSyncs global)} (False,False) -> do _ <- ans (QteOther "Requesting monitoring by third party node") return global GlUnmonitor monitor monitee action -> do ismoniteelocal <- isPidLocal monitee ismonitorlocal <- isPidLocal monitor case (ismoniteelocal,ismonitorlocal) of (True,True) -> let s1 = removeLocalMonitee global monitor monitee action s2 = removeLocalMonitor s1 monitor monitee action in ans QteOK >> return s2 (True,False) -> let msg = GlRequestUnmonitoring monitee monitor action receiver myId myGlobal myMsg = let newGlobal = myGlobal {glSyncs=Map.delete myId (glSyncs myGlobal)} in ans myMsg >> return newGlobal in do sync <- roundtripQueryImplSub PldAdmin (getGlobalFor monitee) msg (receiver (glNextId global)) case sync of Left err -> ans err >> return global Right mymatch -> return global {glNextId=glNextId global+1, glSyncs=Map.insert (glNextId global) (mymatch) (glSyncs global)} (False,True) -> let s1 = removeLocalMonitor global monitor monitee action in ans QteOK >> return s1 (False,False) -> ans (QteOther "Requesting unmonitoring by third party node") >> return global in receiveWait matchPatterns >>= service data AmSpawn = AmSpawn (Closure (ProcessM ())) AmSpawnOptions deriving (Typeable) instance Binary AmSpawn where put (AmSpawn c o) =put c >> put o get=get >>= \c -> get >>= \o -> return $ AmSpawn c o data AmCall = AmCall ProcessId (Closure Payload) deriving (Typeable) instance Binary AmCall where put (AmCall pid clo) = put pid >> put clo get = do c <- get d <- get return $ AmCall c d data AmSpawnOptions = AmSpawnOptions { amsoPaused :: Bool, amsoLink :: Maybe ProcessId, amsoMonitor :: Maybe (ProcessId,MonitorAction), amsoName :: Maybe String } deriving (Typeable) instance Binary AmSpawnOptions where put (AmSpawnOptions a b c d) = put a >> put b >> put c >> put d get = do a <- get b <- get c <- get d <- get return $ AmSpawnOptions a b c d defaultSpawnOptions :: AmSpawnOptions defaultSpawnOptions = AmSpawnOptions {amsoPaused=False, amsoLink=Nothing, amsoMonitor=Nothing, amsoName=Nothing} data AmSpawnUnpause = AmSpawnUnpause deriving (Typeable) instance Binary AmSpawnUnpause where put AmSpawnUnpause = return () get = return AmSpawnUnpause -- TODO. Of course, this would need to be in a different module. -- spawnWithChannel :: NodeId -> Closure (ReceivePort a -> ProcessM ()) -> ProcessM SendPort -- spawnWithChannel nid clo = send ... -- | Start a process running the code, given as a closure, on the specified node. -- If successful, returns the process ID of the new process. If unsuccessful, -- throw a 'TransmitException'. spawn :: NodeId -> Closure (ProcessM ()) -> ProcessM ProcessId spawn node clo = spawnAnd node clo defaultSpawnOptions -- | Ends the current process in an orderly manner. terminate :: ProcessM a terminate = throw ProcessTerminationException -- | If a remote process has been started in a paused state with 'spawnAnd' , -- it will be running but inactive until unpaused. Use this function to unpause -- such a function. It has no effect on processes that are not paused or that -- have already been unpaused. unpause :: ProcessId -> ProcessM () unpause pid = send pid AmSpawnUnpause -- | A variant of 'spawn' that starts the remote process with -- bidirectoinal monitoring, as in 'linkProcess' spawnLink :: NodeId -> Closure (ProcessM ()) -> ProcessM ProcessId spawnLink node clo = do mypid <- getSelfPid spawnAnd node clo defaultSpawnOptions {amsoLink=Just mypid} -- | A variant of 'spawn' that allows greater control over how the remote process is started. spawnAnd :: NodeId -> Closure (ProcessM ()) -> AmSpawnOptions -> ProcessM ProcessId spawnAnd node clo opt = do res <- roundtripQueryUnsafe PldAdmin (adminGetPid node ServiceSpawner) (AmSpawn clo opt) case res of Left e -> throw $ TransmitException e Right pid -> return pid -- | Invokes a function on a remote node. The function must be -- given by a closure. This function will block until the called -- function completes or the connection is broken. callRemote :: (Serializable a) => NodeId -> Closure (ProcessM a) -> ProcessM a callRemote node clo = callRemoteImpl node clo callRemoteIO :: (Serializable a) => NodeId -> Closure (IO a) -> ProcessM a callRemoteIO node clo = callRemoteImpl node clo callRemotePure :: (Serializable a) => NodeId -> Closure a -> ProcessM a callRemotePure node clo = callRemoteImpl node clo callRemoteImpl :: (Serializable a) => NodeId -> Closure b -> ProcessM a callRemoteImpl node clo = let newclo = makePayloadClosure clo in case newclo of Nothing -> throw $ TransmitException QteUnknownCommand Just plclo -> do mypid <- getSelfPid res <- roundtripQuery PldAdmin (adminGetPid node ServiceSpawner) (AmCall mypid plclo) case res of Right (Just mval) -> do val <- liftIO $ serialDecode mval case val of Just a -> return a _ -> throw $ TransmitException QteUnknownCommand Left e -> throw (TransmitException e) _ -> throw $ TransmitException QteUnknownCommand startSpawnerService :: ProcessM () startSpawnerService = serviceThread ServiceSpawner spawner where spawner = receiveWait [matchSpawnRequest,matchCallRequest,matchUnknownThrow] >> spawner exceptFilt :: SomeException -> q -> q exceptFilt _ q = q callWorker c responder = do a <- ptry $ invokeClosure c case a of Left q -> exceptFilt q (responder Nothing) Right Nothing -> responder Nothing Right (Just pl) -> responder (Just pl) spawnWorker c = do a <- invokeClosure c case a of Nothing -> (logS "SYS" LoCritical $ "Failed to invoke closure "++(show c)) --TODO it would be nice if this error could be propagated to the caller of spawn, at the very least it should throw an exception so a linked process will be notified Just q -> q matchCallRequest = roundtripResponseAsync (\cmd sender -> case cmd of AmCall _pid clo -> spawnLocal (callWorker clo sender) >> return ()) False matchSpawnRequest = roundtripResponse (\cmd -> case cmd of AmSpawn c opt -> let namePostlude = case amsoName opt of Nothing -> return () Just name -> nameSet name pausePrelude = case amsoPaused opt of False -> return () True -> receiveWait [match (\AmSpawnUnpause -> return ())] linkPostlude = case amsoLink opt of Nothing -> return () Just pid -> linkProcess pid monitorPostlude = case amsoMonitor opt of Nothing -> return () Just (pid,ma) -> do mypid <- getSelfPid _ <- monitorProcessQuiet pid mypid ma return () in do newpid <- spawnLocalAnd (pausePrelude >> spawnWorker c) (namePostlude >> linkPostlude >> monitorPostlude) return (newpid,())) ---------------------------------------------- -- * Local node registry ---------------------------------------------- {- UNUSED localRegistryMagicProcess :: LocalProcessId localRegistryMagicProcess = 38813 -} localRegistryMagicMagic :: String localRegistryMagicMagic = "__LocalRegistry" localRegistryMagicRole :: String localRegistryMagicRole = "__LocalRegistry" type LocalProcessName = String -- | Created by 'Remote.Peer.getPeers', this maps -- each role to a list of nodes that have that role. -- It can be examined directly or queried with -- 'findPeerByRole'. type PeerInfo = Map.Map String [NodeId] data LocalNodeData = LocalNodeData {ldmRoles :: PeerInfo} {- UNUSED type RegistryData = Map.Map String LocalNodeData -} data LocalProcessMessage = LocalNodeRegister String String NodeId | LocalNodeUnregister String String NodeId | LocalNodeQuery String | LocalNodeAnswer PeerInfo | LocalNodeResponseOK | LocalNodeResponseError String | LocalNodeHello deriving (Typeable) instance Binary LocalProcessMessage where put (LocalNodeRegister m r n) = putWord8 0 >> put m >> put r >> put n put (LocalNodeUnregister m r n) = putWord8 1 >> put m >> put r >> put n put (LocalNodeQuery r) = putWord8 3 >> put r put (LocalNodeAnswer pi) = putWord8 4 >> put pi put (LocalNodeResponseOK) = putWord8 5 put (LocalNodeResponseError s) = putWord8 6 >> put s put (LocalNodeHello) = putWord8 7 get = do g <- getWord8 case g of 0 -> get >>= \m -> get >>= \r -> get >>= \n -> return (LocalNodeRegister m r n) 1 -> get >>= \m -> get >>= \r -> get >>= \n -> return (LocalNodeUnregister m r n) 3 -> get >>= return . LocalNodeQuery 4 -> get >>= return . LocalNodeAnswer 5 -> return LocalNodeResponseOK 6 -> get >>= return . LocalNodeResponseError 7 -> return LocalNodeHello remoteRegistryPid :: NodeId -> ProcessM ProcessId remoteRegistryPid nid = do let (NodeId hostname _) = nid cfg <- getConfig return $ adminGetPid (NodeId hostname (cfgLocalRegistryListenPort cfg)) ServiceNodeRegistry localRegistryPid :: ProcessM ProcessId localRegistryPid = do (NodeId hostname _) <- getSelfNode cfg <- getConfig return $ adminGetPid (NodeId hostname (cfgLocalRegistryListenPort cfg)) ServiceNodeRegistry -- | Every host on which a node is running also needs a node registry, -- which arbitrates those nodes can responds to peer queries. If -- no registry is running, one will be automatically started -- when the framework is started, but the registry can be started -- independently, also. This function does that. standaloneLocalRegistry :: String -> IO () standaloneLocalRegistry cfgn = do cfg <- readConfig True (Just cfgn) res <- startLocalRegistry cfg True liftIO $ putStrLn $ "Terminating standalone local registry: " ++ show res -- | Contacts the local node registry and attempts to register current node. -- You probably don't want to call this function yourself, as it's done for you in 'Remote.Init.remoteInit' localRegistryRegisterNode :: ProcessM () localRegistryRegisterNode = localRegistryRegisterNodeImpl LocalNodeRegister -- | Contacts the local node registry and attempts to unregister current node. -- You probably don't want to call this function yourself, as it's done for you in 'Remote.Init.remoteInit' localRegistryUnregisterNode :: ProcessM () localRegistryUnregisterNode = localRegistryRegisterNodeImpl LocalNodeUnregister localRegistryRegisterNodeImpl :: (String -> String -> NodeId -> LocalProcessMessage) -> ProcessM () localRegistryRegisterNodeImpl cons = do cfg <- getConfig lrpid <- localRegistryPid nid <- getSelfNode let regMsg = cons (cfgNetworkMagic cfg) (cfgRole cfg) nid res <- roundtripQueryUnsafe PldAdmin lrpid regMsg case res of Left ts -> throw $ TransmitException ts Right LocalNodeResponseOK -> return () Right (LocalNodeResponseError s) -> throw $ TransmitException $ QteOther s -- | Contacts the local node registry and attempts to verify that it is alive. -- If the local node registry cannot be contacted, an exception will be thrown. localRegistryHello :: ProcessM () localRegistryHello = do lrpid <- localRegistryPid res <- roundtripQueryUnsafe PldAdmin lrpid LocalNodeHello case res of (Right LocalNodeHello) -> return () (Left n) -> throw $ ConfigException $ "Can't talk to local node registry: " ++ show n _ -> throw $ ConfigException $ "No response from local node registry" localRegistryQueryNodes :: NodeId -> ProcessM (Maybe PeerInfo) localRegistryQueryNodes nid = do cfg <- getConfig lrpid <- remoteRegistryPid nid let regMsg = LocalNodeQuery (cfgNetworkMagic cfg) res <- roundtripQueryUnsafe PldAdmin lrpid regMsg case res of Left _ts -> return Nothing Right (LocalNodeAnswer pi) -> return $ Just pi -- TODO since local registries are potentially sticky, there is good reason -- to ensure that they don't get corrupted; there should be some -- kind of security precaution here, e.g. making sure that registrations -- only come from local nodes startLocalRegistry :: Config -> Bool -> IO TransmitStatus startLocalRegistry cfg waitforever = startit where regConfig = cfg {cfgListenPort = cfgLocalRegistryListenPort cfg, cfgNetworkMagic=localRegistryMagicMagic, cfgRole = localRegistryMagicRole} handler tbl = receiveWait [roundtripResponse (registryCommand tbl)] >>= handler emptyNodeData = LocalNodeData {ldmRoles = Map.empty} lookup tbl magic role = case Map.lookup magic tbl of Nothing -> Nothing Just ldm -> Map.lookup role (ldmRoles ldm) remove tbl magic role nid = let roler Nothing = Nothing roler (Just lst) = Just $ filter ((/=)nid) lst removeFrom ldm = ldm {ldmRoles = Map.alter (roler) role (ldmRoles ldm)} remover Nothing = Nothing remover (Just ldm) = Just (removeFrom ldm) in Map.alter remover magic tbl insert tbl magic role nid = let roler Nothing = Just [nid] roler (Just lst) = if elem nid lst then (Just lst) else (Just (nid:lst)) insertTo ldm = ldm {ldmRoles = Map.alter (roler) role (ldmRoles ldm)} inserter Nothing = Just (insertTo emptyNodeData) inserter (Just ldm) = Just (insertTo ldm) in Map.alter inserter magic tbl registryCommand tbl (LocalNodeQuery magic) = case Map.lookup magic tbl of Nothing -> return (LocalNodeAnswer Map.empty,tbl) Just pi -> return (LocalNodeAnswer (ldmRoles pi),tbl) registryCommand tbl (LocalNodeUnregister magic role nid) = -- check that given entry exists? return (LocalNodeResponseOK,remove tbl magic role nid) registryCommand tbl (LocalNodeRegister magic role nid) = case lookup tbl magic role of Nothing -> return (LocalNodeResponseOK,insert tbl magic role nid) Just nids -> if elem nid nids then return (LocalNodeResponseError ("Multiple node registration for " ++ show nid ++ " as " ++ role),tbl) else return (LocalNodeResponseOK,insert tbl magic role nid) registryCommand tbl (LocalNodeHello) = return (LocalNodeHello,tbl) registryCommand tbl _ = return (LocalNodeResponseError "Unknown command",tbl) startit = do node <- initNode regConfig Remote.Reg.empty res <- try $ forkAndListenAndDeliver node regConfig :: IO (Either SomeException ()) case res of Left e -> return $ QteNetworkError $ show e Right () -> runLocalProcess node (startLoggingService >> adminRegister ServiceNodeRegistry >> handler Map.empty) >> if waitforever then waitForThreads node >> (return $ QteOther "Local registry process terminated") else return QteOK ---------------------------------------------- -- * Closures ---------------------------------------------- -- TODO makeClosure should verify that the given fun argument actually exists makeClosure :: (Typeable a,Serializable v) => String -> v -> ProcessM (Closure a) makeClosure fun env = do enc <- liftIO $ serialEncode env return $ Closure fun enc makePayloadClosure :: Closure a -> Maybe (Closure Payload) makePayloadClosure (Closure name arg) = case isSuffixOf "__impl" name of False -> Nothing True -> Just $ Closure (name++"Pl") arg evaluateClosure :: (Typeable b) => Closure a -> ProcessM (Maybe (Payload -> b)) evaluateClosure (Closure name _) = do node <- getLookup return $ getEntryByIdent node name invokeClosure :: (Typeable a) => Closure a -> ProcessM (Maybe a) invokeClosure (Closure name arg) = (\_id -> do node <- getLookup res <- sequence [pureFun node,ioFun node,procFun node] case catMaybes res of (a:_) -> return $ Just a _ -> return Nothing ) Prelude.id where pureFun node = case getEntryByIdent node name of Nothing -> return Nothing Just x -> return $ Just $ (x arg) ioFun node = case getEntryByIdent node name of Nothing -> return Nothing Just x -> liftIO (x arg) >>= (return.Just) procFun node = case getEntryByIdent node name of Nothing -> return Nothing Just x -> (x arg) >>= (return.Just) ---------------------------------------------- -- * Simple functional queue ---------------------------------------------- data Queue a = Queue [a] [a] queueMake :: Queue a queueMake = Queue [] [] {- UNUSED queueEmpty :: Queue a -> Bool queueEmpty (Queue [] []) = True queueEmpty _ = False -} queueInsert :: Queue a -> a -> Queue a queueInsert (Queue incoming outgoing) a = Queue (a:incoming) outgoing {- UNUSED queueInsertAndLimit :: Queue a -> Int -> a -> Queue a queueInsertAndLimit q limit a= let s1 = queueInsert q a s2 = if queueLength s1 > limit then let (_,f) = queueRemove s1 in f else s1 in s2 -} queueInsertMulti :: Queue a -> [a] -> Queue a queueInsertMulti (Queue incoming outgoing) a = Queue (a++incoming) outgoing {- UNUSED queueRemove :: Queue a -> (Maybe a,Queue a) queueRemove (Queue incoming (a:outgoing)) = (Just a,Queue incoming outgoing) queueRemove (Queue l@(_:_) []) = queueRemove $ Queue [] (reverse l) queueRemove q@(Queue [] []) = (Nothing,q) -} queueToList :: Queue a -> [a] queueToList (Queue incoming outgoing) = outgoing ++ reverse incoming queueFromList :: [a] -> Queue a queueFromList l = Queue [] l {- UNUSED queueLength :: Queue a -> Int queueLength (Queue incoming outgoing) = length incoming + length outgoing -- should probably just store in the length in the structure queueEach :: Queue a -> [(a,Queue a)] queueEach q = case queueToList q of [] -> [] a:rest -> each [] a rest where each before it [] = [(it,queueFromList before)] each before it following@(next:after) = (it,queueFromList (before++following)):(each (before++[it]) next after) -} withSem :: QSem -> IO a -> IO a withSem sem f = action `finally` signalQSem sem where action = waitQSem sem >> f