module Remote.Process (
ProcessM,
NodeId,ProcessId,
PeerInfo,
nullPid,getSelfPid,getSelfNode,isPidLocal,
expect,
MatchM,receive,receiveWait,receiveTimeout,
match,matchIf,matchCond,matchUnknown,matchUnknownThrow,matchProcessDown,
send,sendQuiet,
logS,say,
LogSphere,LogLevel(..),LogTarget(..),LogFilter(..),LogConfig(..),
setLogConfig,getLogConfig,setNodeLogConfig,setRemoteNodeLogConfig,defaultLogConfig,
ptry,ptimeout,pbracket,pfinally,
UnknownMessageException(..),ServiceException(..),
TransmitException(..),TransmitStatus(..),
nameSet, nameQuery, nameQueryOrStart,
spawnLocal,spawnLocalAnd,forkProcess,spawn,spawnAnd,spawnLink,unpause,
AmSpawnOptions(..),defaultSpawnOptions,MonitorAction(..),SignalReason(..),
ProcessMonitorException(..),linkProcess,monitorProcess,unmonitorProcess,withMonitor,pingNode,
callRemote,callRemotePure,callRemoteIO,
terminate,
readConfig,emptyConfig,Config(..),getConfig, getCfgArgs,
initNode,roleDispatch,setDaemonic,
waitForThreads,performFinalization,forkAndListenAndDeliver,runLocalProcess,
makeClosure,invokeClosure,evaluateClosure,
getQueueLength,nodeFromPid,localFromPid,hostFromNid,
PortId,LocalProcessId,
localRegistryHello,localRegistryRegisterNode, localRegistryQueryNodes,localRegistryUnregisterNode,
sendSimple,makeNodeFromHost,getNewMessageLocal, getProcess,Message,Process,prNodeRef,
roundtripResponse,roundtripResponseAsync,roundtripQuery,roundtripQueryMulti,
makePayloadClosure,getLookup,diffTime,roundtripQueryImpl,roundtripQueryUnsafe,
PayloadDisposition(..),suppressTransmitException,Node,getMessagePayload,getMessageType,
startSpawnerService, startLoggingService, startProcessMonitorService, startLocalRegistry, startFinalizerService,
startNodeMonitorService, startProcessRegistryService, standaloneLocalRegistry
)
where
import qualified Prelude as Prelude
import Prelude hiding (catch, id, init, last, lookup, pi)
import Control.Concurrent (forkIO,ThreadId,threadDelay)
import Control.Concurrent.MVar (MVar,newMVar, newEmptyMVar,takeMVar,putMVar,modifyMVar,modifyMVar_,readMVar)
import Control.Exception (ErrorCall(..),throwTo,bracket,try,Exception,throw,evaluate,finally,SomeException,catch)
import Control.Monad (foldM,when,liftM,forever)
import Control.Monad.Trans (MonadIO,liftIO)
import Data.Binary (Binary,put,get,putWord8,getWord8)
import Data.Char (isSpace,isDigit)
import Data.List (isSuffixOf,foldl', isPrefixOf)
import Data.Maybe (catMaybes,isNothing)
import Data.Typeable (Typeable)
import Data.Data (Data)
import Data.Unique (newUnique,hashUnique)
import System.IO (Handle,hClose,hSetBuffering,hGetChar,hPutChar,BufferMode(..),hFlush)
import System.IO.Error (isEOFError,isDoesNotExistError,isUserError)
import Network.BSD (getHostName)
import Network (HostName,PortID(..),listenOn,accept,sClose,connectTo)
import Network.Socket (setSocketOption,SocketOption(..),socketPort,aNY_PORT )
import qualified Data.Map as Map (Map,keys,fromList,unionWith,elems,singleton,member,update,empty,adjust,alter,insert,delete,lookup,toList,size,insertWith')
import Remote.Reg (getEntryByIdent,Lookup,empty)
import Remote.Encoding (serialEncode,serialDecode,serialEncodePure,serialDecodePure,dynamicEncodePure,dynamicDecodePure,DynamicPayload,Payload,Serializable,hPutPayload,hGetPayload,getPayloadType,getDynamicPayloadType)
import System.Environment (getArgs)
import qualified System.Timeout (timeout)
import Data.Time (toModifiedJulianDay,Day(..),picosecondsToDiffTime,getCurrentTime,diffUTCTime,UTCTime(..),utcToLocalZonedTime)
import Remote.Closure (Closure (..))
import Control.Concurrent.STM (STM,atomically,retry,orElse)
import Control.Concurrent.STM.TChan (TChan,isEmptyTChan,readTChan,newTChanIO,writeTChan)
import Control.Concurrent.Chan (newChan,readChan,writeChan)
import Control.Concurrent.STM.TVar (TVar,newTVarIO,readTVar,writeTVar)
import Control.Concurrent.QSem (QSem,newQSem,waitQSem,signalQSem)
import Data.IORef (IORef,newIORef,readIORef,writeIORef)
import Data.Fixed (Pico)
type PortId = Int
type LocalProcessId = Int
data Config = Config
{
cfgRole :: !String,
cfgHostName :: !HostName,
cfgListenPort :: !PortId,
cfgLocalRegistryListenPort :: !PortId,
cfgPeerDiscoveryPort :: !PortId,
cfgNetworkMagic :: !String,
cfgKnownHosts :: ![String],
cfgRoundtripTimeout :: !Int,
cfgMaxOutgoing :: !Int,
cfgPromiseFlushDelay :: !Int,
cfgPromisePrefix :: !String,
cfgArgs :: [String]
} deriving (Show)
type ProcessTable = Map.Map LocalProcessId ProcessTableEntry
type AdminProcessTable = Map.Map ServiceId LocalProcessId
data Node = Node
{
ndProcessTable :: ProcessTable,
ndAdminProcessTable :: AdminProcessTable,
ndHostName :: HostName,
ndListenPort :: PortId,
ndConfig :: Config,
ndLookup :: Lookup,
ndLogConfig :: LogConfig,
ndNodeFinalizer :: MVar (),
ndNodeFinalized :: MVar (),
ndOutgoing :: QSem,
ndNextProcessId :: LocalProcessId
}
data NodeId = NodeId !HostName !PortId deriving (Typeable,Eq,Ord,Data)
data ProcessId = ProcessId !NodeId !LocalProcessId deriving (Typeable,Eq,Ord,Data)
instance Binary NodeId where
put (NodeId h p) = put h >> put p
get = get >>= \h ->
get >>= \p ->
return (NodeId h p)
instance Show NodeId where
show (NodeId hostname portid) = concat ["nid://",hostname,":",show portid,"/"]
instance Read NodeId where
readsPrec _ s = if isPrefixOf "nid://" s
then let a = drop 6 s
hostname = takeWhile ((/=) ':') a
b = drop (length hostname+1) a
port= takeWhile ((/=) '/') b
c = drop (length port+1) b
result = NodeId hostname (read port) in
if (not.null) hostname && (not.null) port
then [(result,c)]
else error "Bad parse looking for NodeId"
else error "Bad parse looking for NodeId"
instance Binary ProcessId where
put (ProcessId n p) = put n >> put p
get = get >>= \n ->
get >>= \p ->
return (ProcessId n p)
instance Show ProcessId where
show (ProcessId (NodeId hostname portid) localprocessid) = concat ["pid://",hostname,":",show portid,"/",show localprocessid,"/"]
instance Read ProcessId where
readsPrec _ s = if isPrefixOf "pid://" s
then let a = drop 6 s
hostname = takeWhile ((/=) ':') a
b = drop (length hostname+1) a
port= takeWhile ((/=) '/') b
c = drop (length port+1) b
pid = takeWhile ((/=) '/') c
d = drop (length pid+1) c
result = ProcessId (NodeId hostname (read port)) (read pid) in
if (not.null) hostname && (not.null) pid && (not.null) port
then [(result,d)]
else error "Bad parse looking for ProcessId"
else error "Bad parse looking for ProcessId"
data PayloadDisposition = PldUser |
PldAdmin
deriving (Typeable,Read,Show,Eq)
data Message =
EncodedMessage { msgEDisposition :: PayloadDisposition, msgEHeader :: Maybe Payload, msgEPayload :: !Payload }
| DynamicMessage { msgDDisposition :: PayloadDisposition, msgDHeader :: Maybe DynamicPayload, msgDPayload :: DynamicPayload }
makeMessage :: (Serializable msg, Serializable hdr) => Bool -> PayloadDisposition -> msg -> Maybe hdr -> Message
makeMessage True pld msg hdr =
DynamicMessage {msgDDisposition=pld,msgDHeader=maybe Nothing (Just . dynamicEncodePure) hdr, msgDPayload=dynamicEncodePure msg}
makeMessage False pld msg hdr =
EncodedMessage {msgEDisposition=pld,msgEHeader=maybe Nothing (Just . serialEncodePure) hdr,msgEPayload=serialEncodePure msg}
getMessageType :: Message -> String
getMessageType (EncodedMessage _ _ a) = getPayloadType a
getMessageType (DynamicMessage _ _ a) = getDynamicPayloadType a
getMessagePayload :: (Serializable a) => Message -> Maybe a
getMessagePayload (EncodedMessage _ _ a) = serialDecodePure a
getMessagePayload (DynamicMessage _ _ a) = dynamicDecodePure a
getMessageHeader :: (Serializable a) => Message -> Maybe a
getMessageHeader (EncodedMessage _ a _) = maybe Nothing serialDecodePure a
getMessageHeader (DynamicMessage _ a _) = maybe Nothing dynamicDecodePure a
getMessageDisposition :: Message -> PayloadDisposition
getMessageDisposition (EncodedMessage a _ _) = a
getMessageDisposition (DynamicMessage a _ _) = a
data ProcessTableEntry = ProcessTableEntry
{
pteThread :: ThreadId,
pteChannel :: TChan Message,
pteDeathT :: TVar Bool,
pteDeath :: MVar (),
pteDaemonic :: Bool
}
data ProcessState = ProcessState
{
prQueue :: Queue Message
}
data Process = Process
{
prPid :: ProcessId,
prSelf :: LocalProcessId,
prNodeRef :: MVar Node,
prThread :: ThreadId,
prChannel :: TChan Message,
prState :: TVar ProcessState,
prPool :: IORef (Map.Map NodeId Handle),
prLogConfig :: Maybe LogConfig
}
data ProcessM a = ProcessM {runProcessM :: Process -> IO (Process,a)} deriving Typeable
instance Monad ProcessM where
m >>= k = ProcessM $ (\p -> (runProcessM m) p >>= (\(news,newa) -> runProcessM (k newa) news))
return x = ProcessM $ \s -> return (s,x)
instance Functor ProcessM where
fmap f v = ProcessM $ (\p -> (runProcessM v) p >>= (\x -> return $ fmap f x))
instance MonadIO ProcessM where
liftIO arg = ProcessM $ \pr -> (arg >>= (\x -> return (pr,x)))
getProcess :: ProcessM (Process)
getProcess = ProcessM $ \x -> return (x,x)
getCfgArgs :: ProcessM [String]
getCfgArgs = do cfg <- getConfig
return $ cfgArgs cfg
getConfig :: ProcessM (Config)
getConfig = do p <- getProcess
node <- liftIO $ readMVar (prNodeRef p)
return $ ndConfig node
getConfigI :: MVar Node -> IO Config
getConfigI mnode = do node <- readMVar mnode
return $ ndConfig node
getLookup :: ProcessM (Lookup)
getLookup = do p <- getProcess
node <- liftIO $ readMVar (prNodeRef p)
return $ ndLookup node
putProcess :: Process -> ProcessM ()
putProcess p = ProcessM $ \_ -> return (p,())
data MatchM q a = MatchM { runMatchM :: MatchBlock -> STM ((MatchBlock,Maybe (ProcessM q)),a) }
instance Monad (MatchM q) where
m >>= k = MatchM $ \mbi -> do
(mb,a) <- runMatchM m mbi
(mb',a2) <- runMatchM (k a) (fst mb)
return (mb',a2)
return x = MatchM $ \mb -> return $ ((mb,Nothing),x)
returnHalt :: a -> ProcessM q -> MatchM q a
returnHalt x invoker = MatchM $ \mb -> return $ ((mb,Just (invoker)),x)
liftSTM :: STM a -> MatchM q a
liftSTM arg = MatchM $ \mb -> do a <- arg
return ((mb,Nothing),a)
data MatchBlock = MatchBlock
{
mbMessage :: Message
}
getMatch :: MatchM q MatchBlock
getMatch = MatchM $ \x -> return ((x,Nothing), x)
getNewMessage :: Process -> STM Message
getNewMessage p = readTChan $ prChannel p
getNewMessageLocal :: Node -> LocalProcessId -> STM (Maybe Message)
getNewMessageLocal node lpid = do mpte <- getProcessTableEntry (node) lpid
case mpte of
Just pte -> do
msg <- readTChan (pteChannel pte)
return $ Just msg
Nothing -> return Nothing
getQueueLength :: ProcessM Int
getQueueLength =
do p <- getProcess
liftIO $ atomically $
do q <- getCurrentMessages p
return $ length q
getCurrentMessages :: Process -> STM [Message]
getCurrentMessages p = do
msgs <- cleanChannel (prChannel p) []
ps <- readTVar (prState p)
let q = (prQueue ps)
let newq = queueInsertMulti q msgs
writeTVar (prState p) ps {prQueue = newq}
return $ queueToList newq
where cleanChannel c m = do isEmpty <- isEmptyTChan c
if isEmpty
then return m
else do item <- readTChan c
cleanChannel c (item:m)
matchMessage :: [MatchM q ()] -> Message -> STM (Maybe (ProcessM q))
matchMessage matchers msg = do (_mb,r) <- (foldl orElse (retry) (map executor matchers)) `orElse` (return (theMatchBlock,Nothing))
return r
where executor x = do
(ok@(_mb,matchfound),_) <- runMatchM x theMatchBlock
case matchfound of
Nothing -> retry
_ -> return ok
theMatchBlock = MatchBlock {mbMessage = msg}
matchMessages :: [MatchM q ()] -> [(Message,STM ())] -> STM (Maybe (ProcessM q))
matchMessages matchers msgs = (foldl orElse (retry) (map executor msgs)) `orElse` (return Nothing)
where executor (msg,acceptor) = do
res <- matchMessage matchers msg
case res of
Nothing -> retry
Just pmq -> acceptor >> return (Just pmq)
receive :: [MatchM q ()] -> ProcessM (Maybe q)
receive m = do p <- getProcess
res <- convertErrorCall $ liftIO $ atomically $
do
msgs <- getCurrentMessages p
matchMessages m (messageHandlerGenerator (prState p) msgs)
case res of
Nothing -> return Nothing
Just n -> do q <- n
return $ Just q
expect :: (Serializable a) => ProcessM a
expect = receiveWait [match return]
receiveWait :: [MatchM q ()] -> ProcessM q
receiveWait m = do f <- receiveWaitImpl m
f
receiveTimeout :: Int -> [MatchM q ()] -> ProcessM (Maybe q)
receiveTimeout 0 m = receive m
receiveTimeout to m | to > 0 =
do res <- ptimeout to $ receiveWaitImpl m
case res of
Nothing -> return Nothing
Just f -> do q <- f
return $ Just q
messageHandlerGenerator :: TVar ProcessState -> [Message] -> [(Message, STM ())]
messageHandlerGenerator prSt msgs =
map (\(m,q) -> (m,do ps <- readTVar prSt
writeTVar prSt ps {prQueue = queueFromList q})) (exclusionList msgs)
receiveWaitImpl :: [MatchM q ()] -> ProcessM (ProcessM q)
receiveWaitImpl m =
do p <- getProcess
v <- attempt1 p
attempt2 p v
where
attempt2 p v =
case v of
Just n -> return n
Nothing -> do ret <- convertErrorCall $ liftIO $ atomically $
do
msg <- getNewMessage p
ps <- readTVar (prState p)
let oldq = prQueue ps
writeTVar (prState p) ps {prQueue = queueInsert oldq msg}
matchMessages m [(msg,writeTVar (prState p) ps)]
attempt2 p ret
attempt1 p = convertErrorCall $ liftIO $ atomically $
do
msgs <- getCurrentMessages p
matchMessages m (messageHandlerGenerator (prState p) msgs)
convertErrorCall :: ProcessM a -> ProcessM a
convertErrorCall f =
do a <- ptry ff
case a of
Right c -> return c
Left b -> throw $ TransmitException $ QteOther $ show (b::ErrorCall)
where ff = do q <- f
q `seq` return q
matchUnknown :: ProcessM q -> MatchM q ()
matchUnknown body = returnHalt () body
matchUnknownThrow :: MatchM q ()
matchUnknownThrow = do mb <- getMatch
returnHalt () (throw $ UnknownMessageException (getMessageType (mbMessage mb)))
match :: (Serializable a) => (a -> ProcessM q) -> MatchM q ()
match = matchCoreHeaderless (const True)
matchIf :: (Serializable a) => (a -> Bool) -> (a -> ProcessM q) -> MatchM q ()
matchIf = matchCoreHeaderless
matchCond :: (Serializable a) => (a -> Maybe (ProcessM q)) -> MatchM q ()
matchCond f =
matchIf (not . isNothing . f) run
where run a = case f a of
Nothing -> throw $ TransmitException $ QteOther $ "Indecesive predicate in matchCond"
Just q -> q
matchCoreHeaderless :: (Serializable a) => (a -> Bool) -> (a -> ProcessM q) -> MatchM q ()
matchCoreHeaderless f g = matchCore (\(a,b) -> b==(Nothing::Maybe ()) && f a)
(\(a,_) -> g a)
matchCore :: (Serializable a,Serializable b) => ((a,Maybe b) -> Bool) -> ((a,Maybe b) -> ProcessM q) -> MatchM q ()
matchCore cond body =
do mb <- getMatch
doit mb
where
doit mb =
let
decodified = getMessagePayload (mbMessage mb)
decodifiedh = getMessageHeader (mbMessage mb)
in
case decodified of
Just x -> if cond (x,decodifiedh)
then returnHalt () (body (x,decodifiedh))
else liftSTM retry
Nothing -> liftSTM retry
data ConfigException = ConfigException String deriving (Show,Typeable)
instance Exception ConfigException
data TransmitException = TransmitException TransmitStatus deriving (Show,Typeable)
instance Exception TransmitException
data UnknownMessageException = UnknownMessageException String deriving (Show,Typeable)
instance Exception UnknownMessageException
data ServiceException = ServiceException String deriving (Show,Typeable)
instance Exception ServiceException
data ProcessTerminationException = ProcessTerminationException deriving (Show,Typeable)
instance Exception ProcessTerminationException
data TransmitStatus = QteOK
| QteUnknownPid
| QteBadFormat
| QteOther String
| QtePleaseSendBody
| QteBadNetworkMagic
| QteNetworkError String
| QteEncodingError String
| QteDispositionFailed
| QteLoggingError
| QteConnectionTimeout
| QteUnknownCommand
| QteThrottle Int deriving (Show,Read,Typeable)
instance Binary TransmitStatus where
put QteOK = putWord8 0
put QteUnknownPid = putWord8 1
put QteBadFormat = putWord8 2
put (QteOther s) = putWord8 3 >> put s
put QtePleaseSendBody = putWord8 4
put QteBadNetworkMagic = putWord8 5
put (QteNetworkError s) = putWord8 6 >> put s
put (QteEncodingError s) = putWord8 7 >> put s
put QteDispositionFailed = putWord8 8
put QteLoggingError = putWord8 9
put QteConnectionTimeout = putWord8 10
put QteUnknownCommand = putWord8 11
put (QteThrottle s) = putWord8 12
get = do ch <- getWord8
case ch of
0 -> return QteOK
1 -> return QteUnknownPid
2 -> return QteBadFormat
3 -> get >>= return . QteOther
4 -> return QtePleaseSendBody
5 -> return QteBadNetworkMagic
6 -> get >>= return . QteNetworkError
7 -> get >>= return . QteEncodingError
8 -> return QteDispositionFailed
9 -> return QteLoggingError
10 -> return QteConnectionTimeout
11 -> return QteUnknownCommand
12 -> get >>= return . QteThrottle
initNode :: Config -> Lookup -> IO (MVar Node)
initNode cfg lookup =
do defaultHostName <- getHostName
let newHostName =
if null (cfgHostName cfg)
then defaultHostName
else cfgHostName cfg
theNewHostName <- evaluate newHostName
finalizer <- newEmptyMVar
finalized <- newEmptyMVar
outgoing <- newQSem (cfgMaxOutgoing cfg)
mvar <- newMVar Node {ndHostName=theNewHostName,
ndProcessTable=Map.empty,
ndAdminProcessTable=Map.empty,
ndConfig=cfg,
ndLookup=lookup,
ndListenPort=0,
ndNextProcessId=0,
ndNodeFinalizer=finalizer,
ndNodeFinalized=finalized,
ndLogConfig=defaultLogConfig,
ndOutgoing=outgoing}
return mvar
roleDispatch :: MVar Node -> (String -> ProcessM ()) -> IO ()
roleDispatch mnode func = do cfg <- getConfigI mnode
runLocalProcess mnode (func (cfgRole cfg)) >> return ()
spawnLocalAnd :: ProcessM () -> ProcessM () -> ProcessM ProcessId
spawnLocalAnd fun prefix =
do p <- getProcess
v <- liftIO $ newEmptyMVar
pid <- liftIO $ runLocalProcess (prNodeRef p) (myFun v)
liftIO $ takeMVar v
return pid
where myFun mv = (prefix `pfinally` liftIO (putMVar mv ())) >> fun
forkProcess :: ProcessM () -> ProcessM ProcessId
forkProcess = spawnLocal
forkProcessWeak :: ProcessM () -> ProcessM ()
forkProcessWeak f = do p <- getProcess
_res <- liftIO $ forkIO (runProcessM f p >> return ())
return ()
spawnLocal :: ProcessM () -> ProcessM ProcessId
spawnLocal fun = do p <- getProcess
liftIO $ runLocalProcess (prNodeRef p) fun
runLocalProcess :: MVar Node -> ProcessM () -> IO ProcessId
runLocalProcess node fun =
do
localprocessid <- modifyMVar node (\thenode -> return $ (thenode {ndNextProcessId=1+ndNextProcessId thenode},
ndNextProcessId thenode))
channel <- newTChanIO
passProcess <- newEmptyMVar
okay <- newEmptyMVar
thread <- forkIO (runner okay node passProcess)
thenodeid <- getNodeId node
let pid = thenodeid `seq` ProcessId thenodeid localprocessid
state <- newTVarIO $ ProcessState {prQueue = queueMake}
pool <- newIORef Map.empty
putMVar passProcess (mkProcess channel node localprocessid thread pid state pool)
takeMVar okay
return pid
where
notifyProcessDown p r = do nid <- getNodeId node
let pp = adminGetPid nid ServiceProcessMonitor
let msg = GlProcessDown (prPid p) r
try $ sendBasic node pp (msg) (Nothing::Maybe ()) PldAdmin Nothing :: IO (Either SomeException TransmitStatus)
notifyProcessUp _p = return ()
closePool p = do c <- readIORef (prPool p)
mapM hClose (Map.elems c)
exceptionHandler e p = let shown = show e in
notifyProcessDown (p) (SrException shown) >>
(try (logI node (prPid p) "SYS" LoCritical (concat ["Process got unhandled exception ",shown]))::IO(Either SomeException ())) >> return ()
exceptionCatcher p fun =
do notifyProcessUp (p)
res <- try (fun `catch` (\ProcessTerminationException -> return ()))
case res of
Left e -> exceptionHandler (e::SomeException) p
Right a -> notifyProcessDown (p) SrNormal >> closePool p >> return a
runner okay node passProcess = do
p <- takeMVar passProcess
let init = do death <- newEmptyMVar
death2 <- newTVarIO False
let pte = (mkProcessTableEntry (prChannel p) (prThread p) death death2 )
insertProcessTableEntry node (prSelf p) pte
return pte
let action = runProcessM fun p
bracket (init)
(\pte -> atomically (writeTVar (pteDeathT pte) True) >> putMVar (pteDeath pte) ())
(\_ -> putMVar okay () >>
(exceptionCatcher p (action>>=return . snd)) >> return ())
insertProcessTableEntry node processid entry =
modifyMVar_ node (\n ->
return $ n {ndProcessTable = Map.insert processid entry (ndProcessTable n)} )
mkProcessTableEntry channel thread death death2 =
ProcessTableEntry
{
pteChannel = channel,
pteThread = thread,
pteDeath = death,
pteDeathT = death2,
pteDaemonic = False
}
mkProcess channel noderef localprocessid thread pid state pool =
Process
{
prPid = pid,
prSelf = localprocessid,
prChannel = channel,
prNodeRef = noderef,
prThread = thread,
prState = state,
prPool = pool,
prLogConfig = Nothing
}
roundtripQueryMulti :: (Serializable a,Serializable b) => PayloadDisposition -> [ProcessId] -> a -> ProcessM [Either TransmitStatus b]
roundtripQueryMulti pld pids dat =
let
convidsM = mapM (\_ -> liftIO $ newConversationId) pids
in do convids <- convidsM
sender <- getSelfPid
let
convopids = zip convids pids
sending (convid,pid) =
let hdr = RoundtripHeader {msgheaderConversationId = convid,
msgheaderSender = sender,
msgheaderDestination = pid}
in do res <- sendTry pid dat (Just hdr) pld
case res of
QteOK -> return (convid,Nothing)
n -> return (convid,Just (Left n))
res <- mapM sending convopids
let receiving c = if (any isNothing (Map.elems c))
then do newmap <- receiveWait [matcher c]
receiving newmap
else return c
matcher c = matchCore (\(_,h) -> case h of
Just a -> (msgheaderConversationId a,msgheaderSender a) `elem` convopids
Nothing -> False)
(\(b,Just h) ->
let newmap = Map.adjust (\_ -> (Just (Right b))) (msgheaderConversationId h) c
in return (newmap) )
m <- receiving (Map.fromList res)
return $ catMaybes (Map.elems m)
generalPid :: ProcessId -> ProcessId
generalPid (ProcessId n _p) = ProcessId n (1)
roundtripQuery :: (Serializable a, Serializable b) => PayloadDisposition -> ProcessId -> a -> ProcessM (Either TransmitStatus b)
roundtripQuery pld pid dat =
do res <- ptry $ withMonitor apid $ roundtripQueryImpl 0 pld pid dat Prelude.id []
case res of
Left (ServiceException s) -> return $ Left $ QteOther s
Right (Left a) -> return (Left a)
Right (Right a) -> return (Right a)
where apid = case pld of
PldAdmin -> generalPid pid
_ -> pid
roundtripQueryLocal :: (Serializable a, Serializable b) => PayloadDisposition -> ProcessId -> a -> ProcessM (Either TransmitStatus b)
roundtripQueryLocal pld pid dat = roundtripQueryImpl 0 pld pid dat Prelude.id []
roundtripQueryUnsafe :: (Serializable a, Serializable b) => PayloadDisposition -> ProcessId -> a -> ProcessM (Either TransmitStatus b)
roundtripQueryUnsafe pld pid dat =
do cfg <- getConfig
roundtripQueryImpl (cfgRoundtripTimeout cfg) pld pid dat Prelude.id []
roundtripQueryImpl :: (Serializable a, Serializable b) => Int -> PayloadDisposition -> ProcessId -> a -> (b -> c) -> [MatchM (Either TransmitStatus c) ()] -> ProcessM (Either TransmitStatus c)
roundtripQueryImpl time pld pid dat converter additional =
do mmatcher <- roundtripQueryImplSub pld pid dat (\_ y -> (return . Right . converter) y)
case mmatcher of
Left err -> return $ Left err
Right matcher ->
receiver $ [ matcher (),
matchProcessDown pid ((return . Left . QteNetworkError) "Remote partner unavailable"),
matchProcessDown (generalPid pid) ((return . Left . QteNetworkError) "Remote partner unavailable")]
++ additional
where
receiver matchers =
case time of
0 -> do receiveWait matchers
n -> do res <- receiveTimeout n matchers
case res of
Nothing -> return (Left QteConnectionTimeout)
Just a -> return a
roundtripQueryImplSub :: (Serializable a, Serializable b) => PayloadDisposition -> ProcessId -> a -> (c -> b -> ProcessM q) -> ProcessM (Either TransmitStatus (c -> MatchM q ()))
roundtripQueryImplSub pld pid dat act =
do convId <- liftIO $ newConversationId
sender <- getSelfPid
res <- mysend pid dat (Just RoundtripHeader {msgheaderConversationId = convId,msgheaderSender = sender,msgheaderDestination = pid})
case res of
QteOK -> return $ Right $ \c -> (matchCore (\(_,h) ->
case h of
Just a -> msgheaderConversationId a == convId && msgheaderSender a == pid
Nothing -> False) (\(x,_) -> do vv <- act c x
return $ vv))
err -> return (Left err)
where
mysend p d mh = sendTry p d mh pld
roundtripResponse :: (Serializable a, Serializable b) => (a -> ProcessM (b,q)) -> MatchM q ()
roundtripResponse f = roundtripResponseAsync myf False
where myf inp verf = do (resp,ret) <- f inp
_ <- verf resp
return ret
roundtripResponseAsync :: (Serializable a, Serializable b) => (a -> (b -> ProcessM ()) -> ProcessM q) -> Bool -> MatchM q ()
roundtripResponseAsync f throwing =
matchCore (\(_,h)->conditional h) transformer
where conditional :: Maybe RoundtripHeader -> Bool
conditional a = case a of
Just _ -> True
Nothing -> False
transformer (m,Just h) =
let sender b =
do res <- sendTry (msgheaderSender h) b (Just RoundtripHeader {msgheaderSender=msgheaderDestination h,msgheaderDestination = msgheaderSender h,msgheaderConversationId=msgheaderConversationId h}) PldUser
case res of
QteOK -> return ()
_ -> if throwing
then throw $ TransmitException res
else logS "SYS" LoImportant ("roundtripResponse couldn't send response to "++show (msgheaderSender h)++" because "++show res)
in f m sender
data RoundtripHeader = RoundtripHeader
{
msgheaderConversationId :: Int,
msgheaderSender :: ProcessId,
msgheaderDestination :: ProcessId
} deriving (Typeable)
instance Binary RoundtripHeader where
put (RoundtripHeader a b c) = put a >> put b >> put c
get = get >>= \a -> get >>= \b -> get >>= \c -> return $ RoundtripHeader a b c
send :: (Serializable a) => ProcessId -> a -> ProcessM ()
send pid msg = sendSimple pid msg PldUser >>=
(\x -> case x of
QteOK -> return ()
_ -> throw $ TransmitException x
)
sendQuiet :: (Serializable a) => ProcessId -> a -> ProcessM TransmitStatus
sendQuiet p m = sendSimple p m PldUser
sendSimple :: (Serializable a) => ProcessId -> a -> PayloadDisposition -> ProcessM TransmitStatus
sendSimple pid dat pld = sendTry pid dat (Nothing :: Maybe ()) pld
sendTry :: (Serializable a,Serializable b) => ProcessId -> a -> Maybe b -> PayloadDisposition -> ProcessM TransmitStatus
sendTry pid msg msghdr pld = getProcess >>= (\_p ->
let
timeoutFilter a =
do cfg <- getConfig
q <- case 0 of
0 -> a
n -> do ff <- ptimeout n $ a
case ff of
Nothing -> return QteConnectionTimeout
Just r -> return r
case q of
QteThrottle n -> liftIO (threadDelay n) >> timeoutFilter a
n -> return n
tryAction = ptry action >>=
(\x -> case x of
Left l -> return $ QteEncodingError $ show (l::ErrorCall)
Right r -> return r)
where action =
do p <- getProcess
timeoutFilter $ liftIO $ sendBasic (prNodeRef p) pid msg msghdr pld (Just $ prPool p)
in tryAction )
sendBasic :: (Serializable a,Serializable b) => MVar Node -> ProcessId -> a -> Maybe b -> PayloadDisposition -> Maybe (IORef (Map.Map NodeId Handle)) -> IO TransmitStatus
sendBasic mnode pid msg msghdr pld pool = do
nid <- getNodeId mnode
let islocal = nodeFromPid pid == nid
let themsg = makeMessage False pld msg msghdr
(if islocal then sendRawLocal else sendRawRemote) mnode pid nid themsg pool
sendRawLocal :: MVar Node -> ProcessId -> NodeId -> Message -> Maybe (IORef (Map.Map NodeId Handle)) -> IO TransmitStatus
sendRawLocal noderef thepid _nodeid msg _
| thepid == nullPid = return QteUnknownPid
| otherwise = do cfg <- getConfigI noderef
messageHandler cfg noderef (getMessageDisposition msg) msg (cfgNetworkMagic cfg) (localFromPid thepid)
sendRawRemote :: MVar Node -> ProcessId -> NodeId -> Message -> Maybe (IORef (Map.Map NodeId Handle)) -> IO TransmitStatus
sendRawRemote noderef thepid nodeid msg (Just pool) =
do apool <- readIORef pool
ppool <- if Map.size apool > 10
then mapM hClose (Map.elems apool) >> return Map.empty
else return apool
let finded = (Map.lookup thenode ppool)
(ret,h) <- sendRawRemoteImpl noderef thepid nodeid msg finded
case ret of
QteOK -> cleanup h ppool
_ -> case finded of
Nothing -> cleanup h ppool
_ -> do (_ret2,newh) <- sendRawRemoteImpl noderef thepid nodeid msg Nothing
cleanup newh ppool
return ret
where
thenode = (nodeFromPid thepid)
cleanup h ppool =
case h of
Just hh -> writeIORef pool (Map.insert thenode hh ppool)
Nothing -> writeIORef pool (Map.delete thenode ppool)
sendRawRemote noderef thepid nodeid msg Nothing =
liftM fst $ sendRawRemoteImpl noderef thepid nodeid msg Nothing
sendRawRemoteImpl :: MVar Node -> ProcessId -> NodeId -> Message -> Maybe Handle -> IO (TransmitStatus,Maybe Handle)
sendRawRemoteImpl noderef thepid@(ProcessId (NodeId hostname portid) localpid) nodeid msg bigh
| thepid == nullPid = return (QteUnknownPid,Nothing)
| otherwise =
do node <- readMVar noderef
res <- withSem (ndOutgoing node) (try setup)
case res of
Right n -> return n
Left l | isEOFError l -> return (QteNetworkError (show l),Nothing)
| isUserError l -> return (QteBadFormat,Nothing)
| isDoesNotExistError l -> return (QteNetworkError (show l),Nothing)
| otherwise -> return (QteOther $ show l,Nothing)
where setup = bracket
(acquireConnection)
(\_ -> return ())
(sender)
acquireConnection =
case bigh of
Nothing ->
do h <- connectTo hostname (PortNumber $ toEnum portid)
hSetBuffering h (BlockBuffering Nothing)
return h
Just h -> return h
sender h = do cfg <- getConfigI noderef
ret <- writeMessage h (cfgNetworkMagic cfg,localpid,nodeid,msg)
return (ret,Just h)
writeMessage :: Handle -> (String,LocalProcessId,NodeId,Message)-> IO TransmitStatus
writeMessage h (magic,dest,nodeid,(EncodedMessage msgDisp msgHdr msgMsg)) =
do hPutStrZ h $ unwords ["Rmt!!",magic,show dest,show msgDisp,show fmt,show nodeid]
hFlush h
response <- hGetLineZ h
resp <- readIO response :: IO TransmitStatus
case resp of
QtePleaseSendBody -> do maybe (return ()) (hPutPayload h) (msgHdr)
hPutPayload h msgMsg
hFlush h
response2 <- hGetLineZ h
resp2 <- readIO response2 :: IO TransmitStatus
return resp2
QteOK -> return QteBadFormat
n -> return n
where fmt = case msgHdr of
Nothing -> (0::Int)
Just _ -> 2
writeMessage _ _ = throw $ ServiceException "writeMessage went down wrong pipe"
forkAndListenAndDeliver :: MVar Node -> Config -> IO ()
forkAndListenAndDeliver node cfg = do coord <- newEmptyMVar
_tid <- forkIO $ listenAndDeliver node cfg (coord)
result <- takeMVar coord
maybe (return ()) throw result
writeResult :: Handle -> TransmitStatus -> IO ()
writeResult h er = hPutStrZ h (show er) >> hFlush h
readMessage :: Handle -> IO (String,LocalProcessId,NodeId,Message)
readMessage h =
do line <- hGetLineZ h
case words line of
["Rmt!!",magic,destp,disp,format,nodeid] ->
do adestp <- readIO destp :: IO LocalProcessId
adisp <- readIO disp :: IO PayloadDisposition
aformat <- readIO format :: IO Int
anodeid <- readIO nodeid :: IO NodeId
writeResult h QtePleaseSendBody
hFlush h
header <- case aformat of
2 -> do hdr <- hGetPayload h
return $ Just hdr
_ -> return Nothing
body <- hGetPayload h
return (magic,adestp,anodeid,EncodedMessage { msgEHeader = header,
msgEDisposition = adisp,
msgEPayload = body
})
_ -> throw $ userError "Bad message format"
getProcessTableEntry :: Node -> LocalProcessId -> STM (Maybe ProcessTableEntry)
getProcessTableEntry tbl adestp = let res = Map.lookup adestp (ndProcessTable tbl) in
case res of
Just x -> do isdead <- readTVar (pteDeathT x)
if isdead
then return Nothing
else return (Just x)
Nothing -> return Nothing
deliver :: LocalProcessId -> MVar Node -> Message -> IO (Maybe ProcessTableEntry)
deliver adestp tbl msg =
do
node <- readMVar tbl
atomically $ do
pte <- getProcessTableEntry node adestp
maybe (return Nothing) (\x -> writeTChan (pteChannel x) msg >> return (Just x)) pte
listenAndDeliver :: MVar Node -> Config -> MVar (Maybe IOError) -> IO ()
listenAndDeliver node cfg coord =
do res <- try $ bracket (setupSocket) (sClose) (\s -> finishStartup >> sockBody s)
case res of
Left e -> putMVar coord (Just e)
Right _ -> return ()
where
finishStartup = putMVar coord Nothing
setupSocket =
do sock <- listenOn whichPort
setSocketOption sock KeepAlive 1
realPort <- socketPort sock
modifyMVar_ node (\a -> return $ a {ndListenPort=fromEnum realPort})
return sock
whichPort = if cfgListenPort cfg /= 0
then PortNumber $ toEnum $ cfgListenPort cfg
else PortNumber aNY_PORT
handleCommSafe h =
(try $ handleComm h :: IO (Either IOError ())) >> return ()
handleComm h =
do (magic,adestp,_nodeid,msg) <- readMessage h
res <- messageHandler cfg node (getMessageDisposition msg) msg magic adestp
writeResult h res
case res of
QteOK -> handleComm h
_ -> return ()
sockBody s =
do hchan <- newChan
_tid <- forkIO $ forever $ do h <- readChan hchan
hSetBuffering h (BlockBuffering Nothing)
forkIO $ (handleCommSafe h `finally` hClose h)
forever $ do (newh,_,_) <- accept s
writeChan hchan newh
messageHandler :: Config -> MVar Node -> PayloadDisposition -> Message -> String -> LocalProcessId -> IO TransmitStatus
messageHandler cfg node pld = case pld of
PldAdmin -> dispositionAdminHandler
PldUser -> dispositionUserHandler
where
validMagic magic = magic == cfgNetworkMagic cfg
|| magic == localRegistryMagicMagic
|| cfgRole cfg == localRegistryMagicRole
dispositionAdminHandler msg magic adestp
| validMagic magic =
do
realPid <- adminLookupN (toEnum adestp) node
case realPid of
Left er -> return er
Right p -> do res <- deliver p node msg
case res of
Just _ -> return QteOK
Nothing -> return QteUnknownPid
| otherwise = return QteBadNetworkMagic
dispositionUserHandler msg magic adestp
| validMagic magic =
do
res <- deliver adestp node msg
case res of
Nothing -> return QteUnknownPid
Just _ -> return QteOK
| otherwise = return QteBadNetworkMagic
waitForThreads :: MVar Node -> IO ()
waitForThreads mnode = do node <- takeMVar mnode
waitFor (Map.toList (ndProcessTable node)) node
where waitFor lst node= case lst of
[] -> putMVar mnode node
(pn,pte):rest -> if (pteDaemonic pte)
then waitFor rest node
else do putMVar mnode node
takeMVar (pteDeath pte)
modifyMVar_ mnode (\x -> return $ x {ndProcessTable=Map.delete pn (ndProcessTable x)})
waitForThreads mnode
paddedString :: Int -> String -> String
paddedString i s = s ++ (replicate (ilength s) ' ')
newConversationId :: IO Int
newConversationId = do d <- newUnique
return $ hashUnique d
exclusionList :: [a] -> [(a,[a])]
exclusionList [] = []
exclusionList (a:rest) = each [] a rest
where
each before it [] = [(it,before)]
each before it following@(next:after) = (it,before++following):(each (before++[it]) next after)
diffTime :: UTCTime -> UTCTime -> Int
diffTime time2 time1 =
picosecondsToMicroseconds (fromEnum (diffUTCTime time2 time1))
where picosecondsToMicroseconds a = a `div` 1000000
hGetLineZ :: Handle -> IO String
hGetLineZ h = loop [] >>= return . reverse
where loop l = do c <- hGetChar h
case c of
'\0' -> return l
_ -> loop (c:l)
hPutStrZ :: Handle -> String -> IO ()
hPutStrZ h [] = hPutChar h '\0'
hPutStrZ h (c:l) = hPutChar h c >> hPutStrZ h l
buildPid :: ProcessM ()
buildPid = do p <- getProcess
node <- liftIO $ readMVar (prNodeRef p)
let pid=ProcessId (NodeId (ndHostName node) (ndListenPort node))
(prSelf p)
putProcess $ p {prPid = pid}
nullPid :: ProcessId
nullPid = ProcessId (NodeId "0.0.0.0" 0) 0
getSelfNode :: ProcessM NodeId
getSelfNode = do (ProcessId n _p) <- getSelfPid
return n
getNodeId :: MVar Node -> IO NodeId
getNodeId mnode = do node <- readMVar mnode
return (NodeId (ndHostName node) (ndListenPort node))
getSelfPid :: ProcessM ProcessId
getSelfPid = do p <- getProcess
case prPid p of
(ProcessId (NodeId _ 0) _) -> (buildPid >> getProcess >>= return.prPid)
_ -> return $ prPid p
nodeFromPid :: ProcessId -> NodeId
nodeFromPid (ProcessId nid _) = nid
makeNodeFromHost :: String -> PortId -> NodeId
makeNodeFromHost = NodeId
localFromPid :: ProcessId -> LocalProcessId
localFromPid (ProcessId _ lid) = lid
hostFromNid :: NodeId -> HostName
hostFromNid (NodeId hn _p) = hn
buildPidFromNodeId :: NodeId -> LocalProcessId -> ProcessId
buildPidFromNodeId n lp = ProcessId n lp
isPidLocal :: ProcessId -> ProcessM Bool
isPidLocal pid = do mine <- getSelfPid
return (nodeFromPid mine == nodeFromPid pid)
suppressTransmitException :: ProcessM a -> ProcessM (Maybe a)
suppressTransmitException a =
do res <- ptry a
case res of
Left (TransmitException _) -> return Nothing
Right r -> return $ Just r
ptry :: (Exception e) => ProcessM a -> ProcessM (Either e a)
ptry f = do p <- getProcess
res <- liftIO $ try (runProcessM f p)
case res of
Left e -> return $ Left e
Right (newp,newanswer) -> ProcessM (\_ -> return (newp,Right newanswer))
ptimeout :: Int -> ProcessM a -> ProcessM (Maybe a)
ptimeout t f = do p <- getProcess
res <- liftIO $ System.Timeout.timeout t (runProcessM f p)
case res of
Nothing -> return Nothing
Just (newp,newanswer) -> ProcessM (\_ -> return (newp,Just newanswer))
pbracket :: (ProcessM a) -> (a -> ProcessM b) -> (a -> ProcessM c) -> ProcessM c
pbracket before after fun =
do p <- getProcess
(newp2,newanswer2) <- liftIO $ bracket
(runProcessM before p)
(\(newp,newanswer) -> runProcessM (after newanswer) newp)
(\(newp,newanswer) -> runProcessM (fun newanswer) newp)
ProcessM (\_ -> return (newp2, newanswer2))
pfinally :: ProcessM a -> ProcessM b -> ProcessM a
pfinally fun after = pbracket (return ()) (\_ -> after) (const fun)
emptyConfig :: Config
emptyConfig = Config {
cfgRole = "NODE",
cfgHostName = "",
cfgListenPort = 0,
cfgPeerDiscoveryPort = 38813,
cfgLocalRegistryListenPort = 38813,
cfgNetworkMagic = "MAGIC",
cfgKnownHosts = [],
cfgRoundtripTimeout = 10000000,
cfgMaxOutgoing = 50,
cfgPromiseFlushDelay = 5000000,
cfgPromisePrefix = "rpromise-",
cfgArgs = []
}
readConfig :: Bool -> Maybe FilePath -> IO Config
readConfig useargs fp = do a <- safety (processConfigFile emptyConfig) ("while reading config file ")
b <- safety (processArgs a) "while parsing command line "
return b
where processConfigFile from = maybe (return from) (readConfigFile from) fp
processArgs from = if useargs
then readConfigArgs from
else return from
safety o s = do res <- try o :: IO (Either SomeException Config)
either (\e -> throw $ ConfigException $ s ++ show e) (return) res
readConfigFile from afile = do contents <- readFile afile
evaluate $ processConfig (lines contents) from
readConfigArgs from = do args <- getArgs
c <- evaluate $ processConfig (fst $ collectArgs args) from
return $ c {cfgArgs = reverse $ snd $ collectArgs args}
collectArgs the = foldl findArg ([],[]) the
findArg (last,n) ('-':this) | isPrefixOf "cfg" this = ((map (\x -> if x=='=' then ' ' else x) this):last,n)
findArg (last,n) a = (last,a:n)
processConfig :: [String] -> Config -> Config
processConfig rawLines from = foldl processLine from rawLines
where
processLine cfg line = case words line of
('#':_):_ -> cfg
[] -> cfg
[option,val] -> updateCfg cfg option val
option:rest | (not . null) rest -> updateCfg cfg option (unwords rest)
_ -> error $ "Bad configuration syntax: " ++ line
updateCfg cfg "cfgRole" role = cfg {cfgRole=clean role}
updateCfg cfg "cfgHostName" hn = cfg {cfgHostName=clean hn}
updateCfg cfg "cfgListenPort" p = cfg {cfgListenPort=(read.isInt) p}
updateCfg cfg "cfgPeerDiscoveryPort" p = cfg {cfgPeerDiscoveryPort=(read.isInt) p}
updateCfg cfg "cfgRoundtripTimeout" p = cfg {cfgRoundtripTimeout=(read.isInt) p}
updateCfg cfg "cfgPromiseFlushDelay" p = cfg {cfgPromiseFlushDelay=(read.isInt) p}
updateCfg cfg "cfgMaxOutgoing" p = cfg {cfgMaxOutgoing=(read.isInt) p}
updateCfg cfg "cfgPromisePrefix" p = cfg {cfgPromisePrefix=p}
updateCfg cfg "cfgLocalRegistryListenPort" p = cfg {cfgLocalRegistryListenPort=(read.isInt) p}
updateCfg cfg "cfgKnownHosts" m = cfg {cfgKnownHosts=words m}
updateCfg cfg "cfgNetworkMagic" m = cfg {cfgNetworkMagic=clean m}
updateCfg _ opt _ = error ("Unknown configuration option: "++opt)
isInt s | all isDigit s = s
isInt s = error ("Not a good number: "++s)
clean = filter (not.isSpace)
data LogLevel = LoSay
| LoFatal
| LoCritical
| LoImportant
| LoStandard
| LoInformation
| LoTrivial
deriving (Eq,Ord,Enum,Show)
instance Binary LogLevel where
put n = put $ fromEnum n
get = get >>= return . toEnum
type LogSphere = String
data LogTarget = LtStdout
| LtForward NodeId
| LtFile FilePath
| LtForwarded
deriving (Typeable)
instance Binary LogTarget where
put LtStdout = putWord8 0
put (LtForward nid) = putWord8 1 >> put nid
put (LtFile fp) = putWord8 2 >> put fp
put LtForwarded = putWord8 3
get = do a <- getWord8
case a of
0 -> return LtStdout
1 -> get >>= return . LtForward
2 -> get >>= return . LtFile
3 -> return LtForwarded
data LogFilter = LfAll
| LfOnly [LogSphere]
| LfExclude [LogSphere] deriving (Typeable)
instance Binary LogFilter where
put LfAll = putWord8 0
put (LfOnly ls) = putWord8 1 >> put ls
put (LfExclude le) = putWord8 2 >> put le
get = do a <- getWord8
case a of
0 -> return LfAll
1 -> get >>= return . LfOnly
2 -> get >>= return . LfExclude
data LogConfig = LogConfig
{
logLevel :: LogLevel,
logTarget :: LogTarget,
logFilter :: LogFilter
} deriving (Typeable)
instance Binary LogConfig where
put (LogConfig ll lt lf) = put ll >> put lt >> put lf
get = do ll <- get
lt <- get
lf <- get
return $ LogConfig ll lt lf
data LogMessage = LogMessage UTCTime LogLevel LogSphere String ProcessId LogTarget
| LogUpdateConfig LogConfig deriving (Typeable)
instance Binary UTCTime where
put t = do
let d = toModifiedJulianDay (utctDay t)
s = realToFrac $ utctDayTime t :: Pico
ps = truncate $ s * 1e12 :: Integer
put d >> put ps
get = do
d <- get
ps <- get
return $ UTCTime (ModifiedJulianDay d) (picosecondsToDiffTime ps)
instance Binary LogMessage where
put (LogMessage utc ll ls s pid target) = putWord8 0 >> put utc >> put ll >> put ls >> put s >> put pid >> put target
put (LogUpdateConfig lc) = putWord8 1 >> put lc
get = do a <- getWord8
case a of
0 -> do utc <- get
ll <- get
ls <- get
s <- get
pid <- get
target <- get
return $ LogMessage utc ll ls s pid target
1 -> do lc <- get
return $ LogUpdateConfig lc
instance Show LogMessage where
show (LogMessage utc ll ls s pid _) = concat [paddedString 30 (show utc)," ",(show $ fromEnum ll)," ",paddedString 28 (show pid)," ",ls," ",s]
show (LogUpdateConfig _) = "LogUpdateConfig"
showLogMessage :: LogMessage -> IO String
showLogMessage (LogMessage utc ll ls s pid _) =
do gmt <- utcToLocalZonedTime utc
return $ concat [paddedString 30 (show gmt)," ",
(show $ fromEnum ll)," ",
paddedString 28 (show pid)," ",ls," ",s]
defaultLogConfig :: LogConfig
defaultLogConfig = LogConfig
{
logLevel = LoStandard,
logTarget = LtStdout,
logFilter = LfAll
}
logApplyFilter :: LogConfig -> LogSphere -> LogLevel -> Bool
logApplyFilter cfg sph lev = filterSphere (logFilter cfg)
&& filterLevel (logLevel cfg)
where filterSphere LfAll = True
filterSphere (LfOnly lst) = elem sph lst
filterSphere (LfExclude lst) = not $ elem sph lst
filterLevel ll = lev <= ll
say :: String -> ProcessM ()
say v = v `seq` logS "SAY" LoSay v
getLogConfig :: ProcessM LogConfig
getLogConfig = do p <- getProcess
case (prLogConfig p) of
Just lc -> return lc
Nothing -> do node <- liftIO $ readMVar (prNodeRef p)
return (ndLogConfig node)
setLogConfig :: LogConfig -> ProcessM ()
setLogConfig lc = do p <- getProcess
putProcess (p {prLogConfig = Just lc})
setNodeLogConfig :: LogConfig -> ProcessM ()
setNodeLogConfig lc = do p <- getProcess
liftIO $ modifyMVar_ (prNodeRef p) (\x -> return $ x {ndLogConfig = lc})
setRemoteNodeLogConfig :: NodeId -> LogConfig -> ProcessM ()
setRemoteNodeLogConfig nid lc = do res <- sendSimple (adminGetPid nid ServiceLog) (LogUpdateConfig lc) PldAdmin
case res of
QteOK -> return ()
_n -> throw $ TransmitException $ QteLoggingError
logI :: MVar Node -> ProcessId -> LogSphere -> LogLevel -> String -> IO ()
logI mnode pid sph ll txt = do node <- readMVar mnode
when (filtered (ndLogConfig node))
(sendMsg (ndLogConfig node))
where filtered cfg = logApplyFilter cfg sph ll
makeMsg cfg = do time <- getCurrentTime
return $ LogMessage time ll sph txt pid (logTarget cfg)
sendMsg cfg = do msg <- makeMsg cfg
res <- let svc = (adminGetPid nid ServiceLog)
nid = nodeFromPid pid
in sendBasic mnode svc msg (Nothing::Maybe()) PldAdmin Nothing
case res of
_ -> return ()
logS :: LogSphere -> LogLevel -> String -> ProcessM ()
logS sph ll txt = do lc <- txt `seq` getLogConfig
when (filtered lc)
(sendMsg lc)
where filtered cfg = logApplyFilter cfg sph ll
makeMsg cfg = do time <- liftIO $ getCurrentTime
pid <- getSelfPid
return $ LogMessage time ll sph txt pid (logTarget cfg)
sendMsg cfg =
do msg <- makeMsg cfg
nid <- getSelfNode
res <- let svc = (adminGetPid nid ServiceLog)
in sendSimple svc msg PldAdmin
case res of
QteOK -> return ()
_n -> throw $ TransmitException $ QteLoggingError
startLoggingService :: ProcessM ()
startLoggingService = serviceThread ServiceLog logger
where logger = receiveWait [matchLogMessage,matchUnknownThrow] >> logger
matchLogMessage = match (\msg ->
do mylc <- getLogConfig
case msg of
(LogMessage _ _ _ _ _ LtForwarded) -> processMessage (logTarget mylc) msg
(LogMessage _ _ _ _ _ _) -> processMessage (targetPreference msg) msg
(LogUpdateConfig lc) -> setNodeLogConfig lc)
targetPreference (LogMessage _ _ _ _ _ a) = a
forwardify (LogMessage a b c d e _) = LogMessage a b c d e LtForwarded
processMessage whereto txt =
do smsg <- liftIO $ showLogMessage txt
case whereto of
LtStdout -> liftIO $ putStrLn smsg
LtFile fp -> (ptry (liftIO (appendFile fp (smsg ++ "\n"))) :: ProcessM (Either IOError ()) ) >> return ()
LtForward nid -> do self <- getSelfNode
when (self /= nid)
(sendSimple (adminGetPid nid ServiceLog) (forwardify txt) PldAdmin >> return ())
_n -> throw $ ConfigException $ "Invalid message forwarded setting"
data NodeMonitorCommand =
NodeMonitorStart NodeId
| NodeMonitorPing
deriving (Typeable)
instance Binary NodeMonitorCommand where
put (NodeMonitorStart nid) = putWord8 0 >> put nid
put (NodeMonitorPing) = putWord8 1
get = do a <- getWord8
case a of
0 -> do b <- get
return $ NodeMonitorStart b
1 -> return NodeMonitorPing
data NodeMonitorSignal =
NodeMonitorNodeFailure NodeId deriving (Typeable)
instance Binary NodeMonitorSignal where
put (NodeMonitorNodeFailure nid) = put nid
get = do a <- get
return $ NodeMonitorNodeFailure a
data NodeMonitorInformation =
NodeMonitorNodeDown NodeId
deriving (Typeable)
instance Binary NodeMonitorInformation where
put (NodeMonitorNodeDown nid) = put nid
get = do a <- get
return $ NodeMonitorNodeDown a
monitorNode :: NodeId -> ProcessM ()
monitorNode nid = do
mynid <- getSelfNode
res <- roundtripQueryLocal PldAdmin (adminGetPid mynid ServiceNodeMonitor) (NodeMonitorStart nid)
case res of
Right () -> return ()
_ -> (throw $ ServiceException $ "while contacting node monitor "++show res)
pingNode :: NodeId -> ProcessM Bool
pingNode nid = do res <- roundtripQueryUnsafe PldAdmin (adminGetPid nid ServiceNodeMonitor) (NodeMonitorPing)
case res of
Right () -> return True
_ -> return False
startNodeMonitorService :: ProcessM ()
startNodeMonitorService = serviceThread ServiceNodeMonitor (service Map.empty)
where
service state =
let matchCommand cmd = case cmd of
NodeMonitorPing -> return ((),state)
NodeMonitorStart nid -> do res <- addmonitor nid
return ((),res)
matchSignal cmd = case cmd of
NodeMonitorNodeFailure nid -> do res <- handlefailure nid
return (res)
listenaction nid mainpid =
let onfailure = sendSimple mainpid (NodeMonitorNodeFailure nid) PldUser >> return ()
loop = do res <- pingNode nid
case res of
False -> onfailure
True -> liftIO (threadDelay interpingtimeout) >> loop
in
do loop
failurelimit = 3
interpingtimeout = 5000000
retrytimeout = 1000000
reportfailure nid = do mynid <- getSelfNode
sendSimple (adminGetPid mynid ServiceProcessMonitor) (GlNodeDown nid) PldAdmin
handlefailure nid = case Map.lookup nid state of
Just c -> if c >= failurelimit
then do _ <- reportfailure nid
return (Map.delete nid state)
else do mypid <- getSelfPid
_ <- spawnLocalAnd (liftIO (threadDelay retrytimeout) >> listenaction nid mypid) setDaemonic
return (Map.adjust succ nid state)
Nothing -> return state
addmonitor nid = case Map.member nid state of
True -> return state
False -> do mynid <- getSelfNode
mypid <- getSelfPid
if mynid==nid
then return state
else do _ <- spawnLocalAnd (listenaction nid mypid) setDaemonic
return $ Map.insert nid (0) state
in receiveWait [roundtripResponse matchCommand,
match matchSignal,
matchUnknownThrow] >>= service
data ServiceId =
ServiceLog
| ServiceSpawner
| ServiceNodeRegistry
| ServiceNodeMonitor
| ServiceProcessMonitor
| ServiceProcessRegistry
deriving (Ord,Eq,Enum,Show)
adminGetPid :: NodeId -> ServiceId -> ProcessId
adminGetPid nid sid = ProcessId nid (fromEnum sid)
adminDeregister :: ServiceId -> ProcessM ()
adminDeregister val = do p <- getProcess
pid <- getSelfPid
liftIO $ modifyMVar_ (prNodeRef p) (fun $ localFromPid pid)
where fun pid node = return $ node {ndAdminProcessTable = Map.delete val (ndAdminProcessTable node)}
adminRegister :: ServiceId -> ProcessM ()
adminRegister val = do p <- getProcess
pid <- getSelfPid
liftIO $ modifyMVar_ (prNodeRef p) (\node ->
if Map.member val (ndAdminProcessTable node)
then throw $ ServiceException $ "Duplicate administrative registration at index " ++ show val
else (fun (localFromPid pid) node))
where fun pid node = return $ node {ndAdminProcessTable = Map.insert val pid (ndAdminProcessTable node)}
adminLookupN :: ServiceId -> MVar Node -> IO (Either TransmitStatus LocalProcessId)
adminLookupN val mnode =
do node <- readMVar mnode
case Map.lookup val (ndAdminProcessTable node) of
Nothing -> return $ Left QteUnknownPid
Just x -> return $ Right x
startFinalizerService :: ProcessM () -> ProcessM ()
startFinalizerService todo = spawnLocalAnd body prefix >> return ()
where prefix = setDaemonic
body =
do p <- getProcess
node <- liftIO $ readMVar (prNodeRef p)
liftIO $ takeMVar (ndNodeFinalizer node)
todo `pfinally` (liftIO $ putMVar (ndNodeFinalized node) ())
performFinalization :: MVar Node -> IO ()
performFinalization mnode = do node <- readMVar mnode
putMVar (ndNodeFinalizer node) ()
takeMVar (ndNodeFinalized node)
setDaemonic :: ProcessM ()
setDaemonic = do p <- getProcess
pid <- getSelfPid
liftIO $ modifyMVar_ (prNodeRef p)
(\node -> return $ node {ndProcessTable=Map.adjust (\pte -> pte {pteDaemonic=True}) (localFromPid pid) (ndProcessTable node)})
serviceThread :: ServiceId -> ProcessM () -> ProcessM ()
serviceThread v f = spawnLocalAnd (pbracket (return ())
(\_ -> adminDeregister v >> logError)
(\_ -> f)) (adminRegister v >> setDaemonic)
>> (return())
where logError = logS "SYS" LoFatal $ "System process "++show v++" has terminated"
data ProcessRegistryState = ProcessRegistryState (Map.Map String ProcessId) (Map.Map ProcessId String)
data ProcessRegistryCommand = ProcessRegistryQuery String (Maybe (Closure (ProcessM ())))
| ProcessRegistrySet String ProcessId deriving (Typeable)
instance Binary ProcessRegistryCommand where
put (ProcessRegistryQuery a b) = putWord8 0 >> put a >> put b
put (ProcessRegistrySet a b) = putWord8 1 >> put a >> put b
get = do cmd <- getWord8
case cmd of
0 -> do a <- get
b <- get
return $ ProcessRegistryQuery a b
1 -> do a <- get
b <- get
return $ ProcessRegistrySet a b
data ProcessRegistryAnswer = ProcessRegistryResponse (Maybe ProcessId)
| ProcessRegistryError String deriving (Typeable)
instance Binary ProcessRegistryAnswer where
put (ProcessRegistryResponse a) = putWord8 0 >> put a
put (ProcessRegistryError s) = putWord8 1 >> put s
get = do a <- getWord8
case a of
0 -> get >>= (return . ProcessRegistryResponse)
1 -> get >>= (return . ProcessRegistryError)
startProcessRegistryService :: ProcessM ()
startProcessRegistryService = serviceThread ServiceProcessRegistry (service initialState)
where
initialState = ProcessRegistryState Map.empty Map.empty
service state@(ProcessRegistryState nameToPid pidToName) =
let
downs (ProcessMonitorException pid _why) =
case Map.lookup pid pidToName of
Just name ->
let newPidToName = Map.delete pid pidToName
newNameToPid = Map.delete name nameToPid
in return (ProcessRegistryState newNameToPid newPidToName)
Nothing -> return state
cmds cmd =
case cmd of
ProcessRegistrySet name pid ->
case (Map.lookup pid pidToName, Map.lookup name nameToPid) of
(Nothing,Nothing) ->
let newNameToPid = Map.insert name pid nameToPid
newPidToName = Map.insert pid name pidToName
in do islocal <- isPidLocal pid
case islocal of
True ->
do mypid <- getSelfPid
ok <- monitorProcessQuiet mypid pid MaMonitor
case ok of
True -> return (ProcessRegistryResponse Nothing,ProcessRegistryState newNameToPid newPidToName)
False -> return (ProcessRegistryError $ "Couldn't establish monitoring of task in naming "++name,state)
False -> return (ProcessRegistryError $ "Refuse to register nonlocal process" ++ show pid,state)
(Nothing,_) -> return (ProcessRegistryError $ "The name "++name++" has already been registered",state)
(_,_) -> return (ProcessRegistryError $ "The process "++show pid++" has already been registered",state)
ProcessRegistryQuery name mClo ->
case Map.lookup name nameToPid of
Just pid -> return (ProcessRegistryResponse (Just pid),state)
Nothing ->
case mClo of
Nothing -> return (ProcessRegistryResponse Nothing,state)
Just clo -> do mynid <- getSelfNode
mypid <- getSelfPid
pid <- spawnAnd mynid clo defaultSpawnOptions {amsoMonitor=Just (mypid,MaMonitor)}
let newNameToPid = Map.insert name pid nameToPid
newPidToName = Map.insert pid name pidToName
return (ProcessRegistryResponse (Just pid),ProcessRegistryState newNameToPid newPidToName)
in receiveWait [roundtripResponse cmds,match downs] >>= service
nameQueryOrStart :: NodeId -> String -> Closure (ProcessM ()) -> ProcessM ProcessId
nameQueryOrStart nid name clo =
let servicepid = adminGetPid nid ServiceProcessRegistry
msg = ProcessRegistryQuery name (Just clo)
in do res <- roundtripQueryUnsafe PldAdmin servicepid msg
case res of
Right (ProcessRegistryResponse (Just answer)) -> return answer
Right (ProcessRegistryError s) -> throw $ ServiceException s
_ -> throw $ ServiceException $ "Crazy talk from process registry"
nameQuery :: NodeId -> String -> ProcessM (Maybe ProcessId)
nameQuery nid name =
let servicepid = adminGetPid nid ServiceProcessRegistry
msg = ProcessRegistryQuery name Nothing
in do res <- roundtripQueryUnsafe PldAdmin servicepid msg
case res of
Right (ProcessRegistryResponse answer) -> return answer
Right (ProcessRegistryError s) -> throw $ ServiceException s
_ -> throw $ ServiceException $ "Crazy talk from process registry"
nameSet :: String -> ProcessM ()
nameSet name =
do mynid <- getSelfNode
mypid <- getSelfPid
let servicepid = adminGetPid mynid ServiceProcessRegistry
msg = ProcessRegistrySet name mypid
res <- roundtripQueryLocal PldAdmin servicepid msg
case res of
Right (ProcessRegistryResponse Nothing) -> return ()
Right (ProcessRegistryError s) -> throw $ ServiceException s
_ -> throw $ ServiceException $ "Crazy talk from process registry"
data GlSignal = GlProcessDown ProcessId SignalReason
| GlNodeDown NodeId deriving (Typeable,Show)
instance Binary GlSignal where
put (GlProcessDown a b) = putWord8 0 >> put a >> put b
put (GlNodeDown a) = putWord8 1 >> put a
get = do ch <- getWord8
case ch of
0 -> do a <- get
b <- get
return $ GlProcessDown a b
1 -> do a <- get
return $ GlNodeDown a
data GlSynchronous = GlRequestMonitoring ProcessId ProcessId MonitorAction
| GlRequestMoniteeing ProcessId NodeId
| GlRequestUnmonitoring ProcessId ProcessId MonitorAction deriving (Typeable)
instance Binary GlSynchronous where
put (GlRequestMonitoring a b c) = putWord8 0 >> put a >> put b >> put c
put (GlRequestMoniteeing a b) = putWord8 1 >> put a >> put b
put (GlRequestUnmonitoring a b c) = putWord8 2 >> put a >> put b >> put c
get = do ch <- getWord8
case ch of
0 -> do a <- get
b <- get
c <- get
return $ GlRequestMonitoring a b c
1 -> do a <- get
b <- get
return $ GlRequestMoniteeing a b
2 -> do a <- get
b <- get
c <- get
return $ GlRequestUnmonitoring a b c
data GlCommand = GlMonitor ProcessId ProcessId MonitorAction
| GlUnmonitor ProcessId ProcessId MonitorAction deriving (Typeable)
instance Binary GlCommand where
put (GlMonitor a b c) = putWord8 0 >> put a >> put b >> put c
put (GlUnmonitor a b c) = putWord8 1 >> put a >> put b >> put c
get = do ch <- getWord8
case ch of
0 -> do a <- get
b <- get
c <- get
return $ GlMonitor a b c
1 -> do a <- get
b <- get
c <- get
return $ GlUnmonitor a b c
data MonitorAction = MaMonitor
| MaLink
| MaLinkError
deriving (Typeable,Show,Ord,Eq)
instance Binary MonitorAction where
put MaMonitor = putWord8 0
put MaLink = putWord8 1
put MaLinkError = putWord8 2
get = getWord8 >>= \x ->
case x of
0 -> return MaMonitor
1 -> return MaLink
2 -> return MaLinkError
data SignalReason = SrNormal
| SrException String
| SrNoPing
| SrInvalid
deriving (Typeable,Show)
instance Binary SignalReason where
put SrNormal = putWord8 0
put (SrException s) = putWord8 1 >> put s
put SrNoPing = putWord8 2
put SrInvalid = putWord8 3
get = do a <- getWord8
case a of
0 -> return SrNormal
1 -> get >>= return . SrException
2 -> return SrNoPing
3 -> return SrInvalid
type GlLinks = Map.Map ProcessId
(Map.Map (LocalProcessId,MonitorAction) (Int),
Map.Map LocalProcessId (Int),
Map.Map NodeId ())
data GlobalData = GlobalData
{
glLinks :: GlLinks,
glNextId :: Integer,
glSyncs :: Map.Map Integer (GlobalData -> MatchM GlobalData ())
}
gdCombineEntry :: (Map.Map (LocalProcessId,MonitorAction) (Int),
Map.Map LocalProcessId (Int),
Map.Map NodeId ()) -> (Map.Map (LocalProcessId,MonitorAction) (Int),
Map.Map LocalProcessId (Int),
Map.Map NodeId ()) -> (Map.Map (LocalProcessId,MonitorAction) (Int),
Map.Map LocalProcessId (Int),
Map.Map NodeId ())
gdCombineEntry (newmonitors,newmonitees,newnodes) (oldmonitors,oldmonitees,oldnodes) =
let finalnodes = Map.unionWith const newnodes oldnodes
finalmonitors = Map.unionWith (+) newmonitors oldmonitors
finalmonitees = Map.unionWith (+) newmonitees oldmonitees
in (finalmonitors,finalmonitees,finalnodes)
gdAddMonitor :: GlLinks -> ProcessId -> MonitorAction -> LocalProcessId -> GlLinks
gdAddMonitor gl pid ma lpid =
Map.insertWith' gdCombineEntry pid (Map.singleton (lpid,ma) 1, Map.empty,Map.empty) gl
gdDelMonitor :: GlLinks -> ProcessId -> MonitorAction -> LocalProcessId -> GlLinks
gdDelMonitor gl pid ma lpid = Map.adjust editentry pid gl
where editentry (mons,mots,ns) = (fixmons mons,mots,ns)
fixmons m = Map.update (\a -> if pred a == 0
then Nothing
else Just (pred a)) (lpid,ma) m
gdAddMonitee :: GlLinks -> ProcessId -> LocalProcessId -> GlLinks
gdAddMonitee gl pid lpid =
Map.insertWith' gdCombineEntry pid (Map.empty,Map.singleton lpid 1,Map.empty) gl
gdDelMonitee :: GlLinks -> ProcessId -> LocalProcessId -> GlLinks
gdDelMonitee gl pid lpid = Map.adjust editentry pid gl
where editentry (mons,mots,ns) = (mons,fixmons mots,ns)
fixmons m = Map.update (\a -> if pred a == 0
then Nothing
else Just (pred a)) lpid m
glExpungeProcess :: GlLinks -> ProcessId -> NodeId -> GlLinks
glExpungeProcess gl pid myself =
let mine n = buildPidFromNodeId myself n
in case Map.lookup pid gl of
Nothing -> gl
Just (mons,mots,_ns) ->
let s1 = Map.delete pid gl
s2 = foldl' (\g (lp,_)-> Map.delete (mine lp) g) s1 (Map.keys mons)
s3 = foldl' (\g lp -> Map.delete (mine lp) g) s2 (Map.keys mots)
in s3
gdAddNode :: GlLinks -> ProcessId -> NodeId -> GlLinks
gdAddNode gl pid nid = Map.insertWith' gdCombineEntry pid (Map.empty,Map.empty,Map.singleton nid ()) gl
data ProcessMonitorException = ProcessMonitorException ProcessId SignalReason deriving (Typeable)
instance Binary ProcessMonitorException where
put (ProcessMonitorException pid sr) = put pid >> put sr
get = do pid <- get
sr <- get
return $ ProcessMonitorException pid sr
instance Exception ProcessMonitorException
instance Show ProcessMonitorException where
show (ProcessMonitorException pid why) = "ProcessMonitorException: " ++ show pid ++ " has terminated because "++show why
linkProcess :: ProcessId -> ProcessM ()
linkProcess p = do mypid <- getSelfPid
mynid <- getSelfNode
let servicepid = (adminGetPid mynid ServiceProcessMonitor)
msg1 = GlMonitor mypid p MaLinkError
msg2 = GlMonitor p mypid MaLinkError
res1 <- roundtripQueryLocal PldAdmin servicepid msg1
case res1 of
Right QteOK -> do res2 <- roundtripQueryLocal PldAdmin servicepid msg2
case res2 of
Right QteOK -> return ()
Right err -> herr err
Left err -> herr err
Right err -> herr err
Left err -> herr err
where herr err = throw $ ServiceException $ "Error when linking process " ++ show p ++ ": " ++ show err
matchProcessDown :: ProcessId -> ProcessM q -> MatchM q ()
matchProcessDown pid f = matchIf (\(ProcessMonitorException p _) -> p==pid) (const f)
withMonitor :: ProcessId -> ProcessM a -> ProcessM a
withMonitor pid f = withMonitoring pid MaMonitor f
withMonitoring :: ProcessId -> MonitorAction -> ProcessM a -> ProcessM a
withMonitoring pid how f =
do mypid <- getSelfPid
monitorProcess mypid pid how
a <- f `pfinally` safety (unmonitorProcess mypid pid how)
return a
where safety n = ptry n :: ProcessM (Either ServiceException ())
monitorProcess :: ProcessId -> ProcessId -> MonitorAction -> ProcessM ()
monitorProcess monitor monitee how = monitorProcessImpl GlMonitor monitor monitee how True >> return ()
unmonitorProcess :: ProcessId -> ProcessId -> MonitorAction -> ProcessM ()
unmonitorProcess monitor monitee how = monitorProcessImpl GlUnmonitor monitor monitee how True >> return ()
monitorProcessQuiet :: ProcessId -> ProcessId -> MonitorAction -> ProcessM Bool
monitorProcessQuiet monitor monitee how =
do res <- monitorProcessImpl GlMonitor monitor monitee how False
case res of
QteOK -> return True
_ -> return False
monitorProcessImpl :: (ProcessId -> ProcessId -> MonitorAction -> GlCommand) -> ProcessId -> ProcessId -> MonitorAction -> Bool -> ProcessM TransmitStatus
monitorProcessImpl msgtype monitor monitee how throwit =
let msg = msgtype monitor monitee how
in do let servicepid = (adminGetPid (nodeFromPid monitee) ServiceProcessMonitor)
res <- roundtripQueryUnsafe PldAdmin servicepid msg
case res of
Right QteOK -> return QteOK
Right err -> herr err
Left err -> herr err
where herr err =
case throwit of
True -> throw $ ServiceException $ "Error when monitoring process " ++ show monitee ++ ": " ++ show err
False -> return err
sendInterrupt :: (Exception e) => ProcessId -> e -> ProcessM Bool
sendInterrupt pid e = do islocal <- isPidLocal pid
case islocal of
False -> return False
True -> do p <- getProcess
node <- liftIO $ readMVar (prNodeRef p)
res <- liftIO $ atomically $ getProcessTableEntry node (localFromPid pid)
case res of
Just pte -> do liftIO $ throwTo (pteThread pte) e
return True
Nothing -> return False
triggerMonitor :: ProcessId -> ProcessId -> MonitorAction -> SignalReason -> ProcessM ()
triggerMonitor towho aboutwho how why =
let
msg = ProcessMonitorException aboutwho why
in case how of
MaMonitor -> sendSimple towho msg PldUser >> return ()
MaLink -> sendInterrupt towho msg >> return ()
MaLinkError -> case why of
SrNormal -> return ()
_ -> sendInterrupt towho msg >> return ()
startProcessMonitorService :: ProcessM ()
startProcessMonitorService = serviceThread ServiceProcessMonitor (service emptyGlobal)
where
emptyGlobal = GlobalData {glLinks=Map.empty, glNextId=0, glSyncs = Map.empty}
getGlobalFor pid = adminGetPid (nodeFromPid pid) ServiceProcessMonitor
checkliveness pid = do islocal <- isPidLocal pid
case islocal of
True -> isProcessUp (localFromPid pid)
False -> return True
checklivenessandtrigger monitor monitee action =
do a <- checkliveness monitee
case a of
True -> return True
False -> do trigger monitor monitee action SrInvalid
return False
trigger = triggerMonitor
isProcessUp lpid = do p <- getProcess
node <- liftIO $ readMVar (prNodeRef p)
res <- liftIO $ atomically $ getProcessTableEntry node lpid
case res of
Nothing -> return (lpid<0)
Just _ -> return True
removeLocalMonitee gl monitor monitee _action =
gl {glLinks = gdDelMonitee (glLinks gl) monitor (localFromPid monitee) }
removeLocalMonitor gl monitor monitee action =
gl {glLinks = gdDelMonitor (glLinks gl) monitee action (localFromPid monitor) }
addLocalMonitee gl monitor monitee _action =
gl {glLinks = gdAddMonitee (glLinks gl) monitor (localFromPid monitee) }
addLocalMonitor gl monitor monitee action =
gl {glLinks = gdAddMonitor (glLinks gl) monitee action (localFromPid monitor) }
addLocalNode gl monitor monitee _action =
gl {glLinks = gdAddNode (glLinks gl) monitee (nodeFromPid monitor)}
broadcast nids msg = mapM_ (\p -> forkProcessWeak $ ((ptimeout 5000000 $ sendSimple (adminGetPid p ServiceProcessMonitor) msg PldAdmin) >> return ())) nids
handleProcessDown :: GlLinks -> ProcessId -> SignalReason -> ProcessM GlLinks
handleProcessDown global pid why =
do islocal <- isPidLocal pid
mynid <- getSelfNode
case Map.lookup pid global of
Nothing -> return global
Just (monitors,_monitee,nodes) ->
do mapM_ (\(tellwho,how) -> trigger (buildPidFromNodeId mynid tellwho) pid how why) (Map.keys monitors)
when (islocal)
(broadcast (Map.keys nodes) (GlProcessDown pid why))
mynid <- getSelfNode
return $ glExpungeProcess global pid mynid
service global =
let
additional = map (\receiver -> receiver global) (Map.elems (glSyncs global))
matchPatterns = [match matchSignals,
roundtripResponseAsync matchCommands False,
roundtripResponse (matchSyncs global)] ++ additional
matchSignals cmd =
case cmd of
GlProcessDown pid why ->
do res <- handleProcessDown (glLinks global) pid why
return $ global { glLinks = res}
GlNodeDown nid -> let gl = glLinks global
aslist = Map.keys gl
pids = filter (\n -> nodeFromPid n == nid) aslist
in do res <- foldM (\g p -> handleProcessDown g p SrNoPing) gl pids
return $ global {glLinks = res}
matchSyncs global cmd =
case cmd of
GlRequestMonitoring monitee monitor action ->
let s1 = addLocalMonitor global monitor monitee action
in monitorNode (nodeFromPid monitee)
>> return (QteOK,s1)
GlRequestMoniteeing pid towho ->
do live <- checkliveness pid
case live of
True -> let s1 = global {glLinks = gdAddNode (glLinks global) pid towho}
in return (QteOK,s1)
False -> return (QteUnknownPid,global)
GlRequestUnmonitoring monitee monitor action ->
let s1 = removeLocalMonitor global monitor monitee action
in return (QteOK,s1)
matchCommands cmd ans =
case cmd of
GlMonitor monitor monitee action ->
do ismoniteelocal <- isPidLocal monitee
ismonitorlocal <- isPidLocal monitor
case (ismoniteelocal,ismonitorlocal) of
(True,True) -> do live <- checklivenessandtrigger monitor monitee action
case live of
True -> let s1 = addLocalMonitee global monitor monitee action
s2 = addLocalMonitor s1 monitor monitee action
in ans QteOK >> return s2
False -> ans QteOK >> return global
(True,False) -> do live <- checklivenessandtrigger monitor monitee action
case live of
False -> ans QteOK >> return global
True -> let msg = GlRequestMonitoring monitee monitor action
receiver myId myGlobal myMsg =
let newGlobal = myGlobal {glSyncs = Map.delete myId (glSyncs myGlobal)}
in case myMsg of
QteOK -> let s1 = addLocalNode newGlobal monitor monitee action
in do _ <- ans QteOK
return s1
err -> ans err >> return newGlobal
in do mmatch <- roundtripQueryImplSub PldAdmin (getGlobalFor monitor) msg (receiver (glNextId global))
case mmatch of
Left err -> ans err >> return global
Right mymatch -> return global {glNextId=glNextId global+1,
glSyncs=Map.insert (glNextId global) (mymatch) (glSyncs global)}
(False,True) -> let msg = GlRequestMoniteeing monitee (nodeFromPid monitor)
receiver myId myGlobal myMsg =
let newGlobal = myGlobal {glSyncs = Map.delete myId (glSyncs myGlobal)}
in case myMsg of
QteOK -> let s1 = addLocalMonitor newGlobal monitor monitee action
in do monitorNode (nodeFromPid monitee)
_ <- ans QteOK
return s1
QteUnknownPid -> do trigger monitor monitee action SrInvalid
_ <- ans QteOK
return newGlobal
err -> do _ <- ans err
return newGlobal
in do mmatch <- roundtripQueryImplSub PldAdmin (getGlobalFor monitee) msg (receiver (glNextId global))
case mmatch of
Left err -> ans err >> return global
Right mymatch -> return global {glNextId=glNextId global+1,
glSyncs=Map.insert (glNextId global) (mymatch) (glSyncs global)}
(False,False) -> do _ <- ans (QteOther "Requesting monitoring by third party node")
return global
GlUnmonitor monitor monitee action ->
do ismoniteelocal <- isPidLocal monitee
ismonitorlocal <- isPidLocal monitor
case (ismoniteelocal,ismonitorlocal) of
(True,True) -> let s1 = removeLocalMonitee global monitor monitee action
s2 = removeLocalMonitor s1 monitor monitee action
in ans QteOK >> return s2
(True,False) -> let msg = GlRequestUnmonitoring monitee monitor action
receiver myId myGlobal myMsg =
let newGlobal = myGlobal {glSyncs=Map.delete myId (glSyncs myGlobal)}
in ans myMsg >> return newGlobal
in do sync <- roundtripQueryImplSub PldAdmin (getGlobalFor monitee) msg (receiver (glNextId global))
case sync of
Left err -> ans err >> return global
Right mymatch -> return global {glNextId=glNextId global+1,
glSyncs=Map.insert (glNextId global) (mymatch) (glSyncs global)}
(False,True) -> let s1 = removeLocalMonitor global monitor monitee action
in ans QteOK >> return s1
(False,False) -> ans (QteOther "Requesting unmonitoring by third party node") >> return global
in receiveWait matchPatterns >>= service
data AmSpawn = AmSpawn (Closure (ProcessM ())) AmSpawnOptions deriving (Typeable)
instance Binary AmSpawn where
put (AmSpawn c o) =put c >> put o
get=get >>= \c -> get >>= \o -> return $ AmSpawn c o
data AmCall = AmCall ProcessId (Closure Payload) deriving (Typeable)
instance Binary AmCall where
put (AmCall pid clo) = put pid >> put clo
get = do c <- get
d <- get
return $ AmCall c d
data AmSpawnOptions = AmSpawnOptions
{
amsoPaused :: Bool,
amsoLink :: Maybe ProcessId,
amsoMonitor :: Maybe (ProcessId,MonitorAction),
amsoName :: Maybe String
} deriving (Typeable)
instance Binary AmSpawnOptions where
put (AmSpawnOptions a b c d) = put a >> put b >> put c >> put d
get = do a <- get
b <- get
c <- get
d <- get
return $ AmSpawnOptions a b c d
defaultSpawnOptions :: AmSpawnOptions
defaultSpawnOptions = AmSpawnOptions {amsoPaused=False, amsoLink=Nothing, amsoMonitor=Nothing, amsoName=Nothing}
data AmSpawnUnpause = AmSpawnUnpause deriving (Typeable)
instance Binary AmSpawnUnpause where
put AmSpawnUnpause = return ()
get = return AmSpawnUnpause
spawn :: NodeId -> Closure (ProcessM ()) -> ProcessM ProcessId
spawn node clo = spawnAnd node clo defaultSpawnOptions
terminate :: ProcessM a
terminate = throw ProcessTerminationException
unpause :: ProcessId -> ProcessM ()
unpause pid = send pid AmSpawnUnpause
spawnLink :: NodeId -> Closure (ProcessM ()) -> ProcessM ProcessId
spawnLink node clo = do mypid <- getSelfPid
spawnAnd node clo defaultSpawnOptions {amsoLink=Just mypid}
spawnAnd :: NodeId -> Closure (ProcessM ()) -> AmSpawnOptions -> ProcessM ProcessId
spawnAnd node clo opt =
do res <- roundtripQueryUnsafe PldAdmin (adminGetPid node ServiceSpawner) (AmSpawn clo opt)
case res of
Left e -> throw $ TransmitException e
Right pid -> return pid
callRemote :: (Serializable a) => NodeId -> Closure (ProcessM a) -> ProcessM a
callRemote node clo = callRemoteImpl node clo
callRemoteIO :: (Serializable a) => NodeId -> Closure (IO a) -> ProcessM a
callRemoteIO node clo = callRemoteImpl node clo
callRemotePure :: (Serializable a) => NodeId -> Closure a -> ProcessM a
callRemotePure node clo = callRemoteImpl node clo
callRemoteImpl :: (Serializable a) => NodeId -> Closure b -> ProcessM a
callRemoteImpl node clo =
let newclo = makePayloadClosure clo
in case newclo of
Nothing -> throw $ TransmitException QteUnknownCommand
Just plclo ->
do mypid <- getSelfPid
res <- roundtripQuery PldAdmin (adminGetPid node ServiceSpawner) (AmCall mypid plclo)
case res of
Right (Just mval) ->
do val <- liftIO $ serialDecode mval
case val of
Just a -> return a
_ -> throw $ TransmitException QteUnknownCommand
Left e -> throw (TransmitException e)
_ -> throw $ TransmitException QteUnknownCommand
startSpawnerService :: ProcessM ()
startSpawnerService = serviceThread ServiceSpawner spawner
where spawner = receiveWait [matchSpawnRequest,matchCallRequest,matchUnknownThrow] >> spawner
exceptFilt :: SomeException -> q -> q
exceptFilt _ q = q
callWorker c responder = do a <- ptry $ invokeClosure c
case a of
Left q -> exceptFilt q (responder Nothing)
Right Nothing -> responder Nothing
Right (Just pl) -> responder (Just pl)
spawnWorker c = do a <- invokeClosure c
case a of
Nothing -> (logS "SYS" LoCritical $ "Failed to invoke closure "++(show c))
Just q -> q
matchCallRequest = roundtripResponseAsync
(\cmd sender -> case cmd of
AmCall _pid clo -> spawnLocal (callWorker clo sender) >> return ()) False
matchSpawnRequest = roundtripResponse
(\cmd -> case cmd of
AmSpawn c opt ->
let
namePostlude = case amsoName opt of
Nothing -> return ()
Just name -> nameSet name
pausePrelude = case amsoPaused opt of
False -> return ()
True -> receiveWait [match (\AmSpawnUnpause -> return ())]
linkPostlude = case amsoLink opt of
Nothing -> return ()
Just pid -> linkProcess pid
monitorPostlude = case amsoMonitor opt of
Nothing -> return ()
Just (pid,ma) -> do mypid <- getSelfPid
_ <- monitorProcessQuiet pid mypid ma
return ()
in do newpid <- spawnLocalAnd (pausePrelude >> spawnWorker c) (namePostlude >> linkPostlude >> monitorPostlude)
return (newpid,()))
localRegistryMagicMagic :: String
localRegistryMagicMagic = "__LocalRegistry"
localRegistryMagicRole :: String
localRegistryMagicRole = "__LocalRegistry"
type LocalProcessName = String
type PeerInfo = Map.Map String [NodeId]
data LocalNodeData = LocalNodeData {ldmRoles :: PeerInfo}
data LocalProcessMessage =
LocalNodeRegister String String NodeId
| LocalNodeUnregister String String NodeId
| LocalNodeQuery String
| LocalNodeAnswer PeerInfo
| LocalNodeResponseOK
| LocalNodeResponseError String
| LocalNodeHello
deriving (Typeable)
instance Binary LocalProcessMessage where
put (LocalNodeRegister m r n) = putWord8 0 >> put m >> put r >> put n
put (LocalNodeUnregister m r n) = putWord8 1 >> put m >> put r >> put n
put (LocalNodeQuery r) = putWord8 3 >> put r
put (LocalNodeAnswer pi) = putWord8 4 >> put pi
put (LocalNodeResponseOK) = putWord8 5
put (LocalNodeResponseError s) = putWord8 6 >> put s
put (LocalNodeHello) = putWord8 7
get = do g <- getWord8
case g of
0 -> get >>= \m -> get >>= \r -> get >>= \n -> return (LocalNodeRegister m r n)
1 -> get >>= \m -> get >>= \r -> get >>= \n -> return (LocalNodeUnregister m r n)
3 -> get >>= return . LocalNodeQuery
4 -> get >>= return . LocalNodeAnswer
5 -> return LocalNodeResponseOK
6 -> get >>= return . LocalNodeResponseError
7 -> return LocalNodeHello
remoteRegistryPid :: NodeId -> ProcessM ProcessId
remoteRegistryPid nid =
do let (NodeId hostname _) = nid
cfg <- getConfig
return $ adminGetPid (NodeId hostname (cfgLocalRegistryListenPort cfg)) ServiceNodeRegistry
localRegistryPid :: ProcessM ProcessId
localRegistryPid =
do (NodeId hostname _) <- getSelfNode
cfg <- getConfig
return $ adminGetPid (NodeId hostname (cfgLocalRegistryListenPort cfg)) ServiceNodeRegistry
standaloneLocalRegistry :: String -> IO ()
standaloneLocalRegistry cfgn = do cfg <- readConfig True (Just cfgn)
res <- startLocalRegistry cfg True
liftIO $ putStrLn $ "Terminating standalone local registry: " ++ show res
localRegistryRegisterNode :: ProcessM ()
localRegistryRegisterNode = localRegistryRegisterNodeImpl LocalNodeRegister
localRegistryUnregisterNode :: ProcessM ()
localRegistryUnregisterNode = localRegistryRegisterNodeImpl LocalNodeUnregister
localRegistryRegisterNodeImpl :: (String -> String -> NodeId -> LocalProcessMessage) -> ProcessM ()
localRegistryRegisterNodeImpl cons =
do cfg <- getConfig
lrpid <- localRegistryPid
nid <- getSelfNode
let regMsg = cons (cfgNetworkMagic cfg) (cfgRole cfg) nid
res <- roundtripQueryUnsafe PldAdmin lrpid regMsg
case res of
Left ts -> throw $ TransmitException ts
Right LocalNodeResponseOK -> return ()
Right (LocalNodeResponseError s) -> throw $ TransmitException $ QteOther s
localRegistryHello :: ProcessM ()
localRegistryHello = do lrpid <- localRegistryPid
res <- roundtripQueryUnsafe PldAdmin lrpid LocalNodeHello
case res of
(Right LocalNodeHello) -> return ()
(Left n) -> throw $ ConfigException $ "Can't talk to local node registry: " ++ show n
_ -> throw $ ConfigException $ "No response from local node registry"
localRegistryQueryNodes :: NodeId -> ProcessM (Maybe PeerInfo)
localRegistryQueryNodes nid =
do cfg <- getConfig
lrpid <- remoteRegistryPid nid
let regMsg = LocalNodeQuery (cfgNetworkMagic cfg)
res <- roundtripQueryUnsafe PldAdmin lrpid regMsg
case res of
Left _ts -> return Nothing
Right (LocalNodeAnswer pi) -> return $ Just pi
startLocalRegistry :: Config -> Bool -> IO TransmitStatus
startLocalRegistry cfg waitforever = startit
where
regConfig = cfg {cfgListenPort = cfgLocalRegistryListenPort cfg, cfgNetworkMagic=localRegistryMagicMagic, cfgRole = localRegistryMagicRole}
handler tbl = receiveWait [roundtripResponse (registryCommand tbl)] >>= handler
emptyNodeData = LocalNodeData {ldmRoles = Map.empty}
lookup tbl magic role = case Map.lookup magic tbl of
Nothing -> Nothing
Just ldm -> Map.lookup role (ldmRoles ldm)
remove tbl magic role nid =
let
roler Nothing = Nothing
roler (Just lst) = Just $ filter ((/=)nid) lst
removeFrom ldm = ldm {ldmRoles = Map.alter (roler) role (ldmRoles ldm)}
remover Nothing = Nothing
remover (Just ldm) = Just (removeFrom ldm)
in Map.alter remover magic tbl
insert tbl magic role nid =
let
roler Nothing = Just [nid]
roler (Just lst) = if elem nid lst
then (Just lst)
else (Just (nid:lst))
insertTo ldm = ldm {ldmRoles = Map.alter (roler) role (ldmRoles ldm)}
inserter Nothing = Just (insertTo emptyNodeData)
inserter (Just ldm) = Just (insertTo ldm)
in Map.alter inserter magic tbl
registryCommand tbl (LocalNodeQuery magic) =
case Map.lookup magic tbl of
Nothing -> return (LocalNodeAnswer Map.empty,tbl)
Just pi -> return (LocalNodeAnswer (ldmRoles pi),tbl)
registryCommand tbl (LocalNodeUnregister magic role nid) =
return (LocalNodeResponseOK,remove tbl magic role nid)
registryCommand tbl (LocalNodeRegister magic role nid) =
case lookup tbl magic role of
Nothing -> return (LocalNodeResponseOK,insert tbl magic role nid)
Just nids -> if elem nid nids
then return (LocalNodeResponseError ("Multiple node registration for " ++ show nid ++ " as " ++ role),tbl)
else return (LocalNodeResponseOK,insert tbl magic role nid)
registryCommand tbl (LocalNodeHello) = return (LocalNodeHello,tbl)
registryCommand tbl _ = return (LocalNodeResponseError "Unknown command",tbl)
startit = do node <- initNode regConfig Remote.Reg.empty
res <- try $ forkAndListenAndDeliver node regConfig :: IO (Either SomeException ())
case res of
Left e -> return $ QteNetworkError $ show e
Right () -> runLocalProcess node (startLoggingService >>
adminRegister ServiceNodeRegistry >>
handler Map.empty) >>
if waitforever
then waitForThreads node >>
(return $ QteOther "Local registry process terminated")
else return QteOK
makeClosure :: (Typeable a,Serializable v) => String -> v -> ProcessM (Closure a)
makeClosure fun env = do
enc <- liftIO $ serialEncode env
return $ Closure fun enc
makePayloadClosure :: Closure a -> Maybe (Closure Payload)
makePayloadClosure (Closure name arg) =
case isSuffixOf "__impl" name of
False -> Nothing
True -> Just $ Closure (name++"Pl") arg
evaluateClosure :: (Typeable b) => Closure a -> ProcessM (Maybe (Payload -> b))
evaluateClosure (Closure name _) =
do node <- getLookup
return $ getEntryByIdent node name
invokeClosure :: (Typeable a) => Closure a -> ProcessM (Maybe a)
invokeClosure (Closure name arg) =
(\_id ->
do node <- getLookup
res <- sequence [pureFun node,ioFun node,procFun node]
case catMaybes res of
(a:_) -> return $ Just a
_ -> return Nothing ) Prelude.id
where pureFun node = case getEntryByIdent node name of
Nothing -> return Nothing
Just x -> return $ Just $ (x arg)
ioFun node = case getEntryByIdent node name of
Nothing -> return Nothing
Just x -> liftIO (x arg) >>= (return.Just)
procFun node = case getEntryByIdent node name of
Nothing -> return Nothing
Just x -> (x arg) >>= (return.Just)
data Queue a = Queue [a] [a]
queueMake :: Queue a
queueMake = Queue [] []
queueInsert :: Queue a -> a -> Queue a
queueInsert (Queue incoming outgoing) a = Queue (a:incoming) outgoing
queueInsertMulti :: Queue a -> [a] -> Queue a
queueInsertMulti (Queue incoming outgoing) a = Queue (a++incoming) outgoing
queueToList :: Queue a -> [a]
queueToList (Queue incoming outgoing) = outgoing ++ reverse incoming
queueFromList :: [a] -> Queue a
queueFromList l = Queue [] l
withSem :: QSem -> IO a -> IO a
withSem sem f = action `finally` signalQSem sem
where action = waitQSem sem >> f