{-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE RecordWildCards #-} module Control.Distributed.Process.Management.Internal.Agent where import Control.Applicative import Control.Concurrent (forkIO) import Control.Concurrent.MVar (MVar, newEmptyMVar, putMVar, takeMVar) import Control.Concurrent.STM (atomically) import Control.Concurrent.STM.TChan ( TChan , newBroadcastTChanIO , readTChan , writeTChan , dupTChan ) import Control.Distributed.Process.Internal.Primitives ( receiveWait , matchAny , die , catches , Handler(..) ) import Control.Distributed.Process.Internal.CQueue ( enqueueSTM , CQueue ) import Control.Distributed.Process.Management.Internal.Types ( Fork ) import Control.Distributed.Process.Management.Internal.Trace.Tracer ( traceController ) import Control.Distributed.Process.Internal.Types ( Process , Message , Tracer(..) , LocalProcess(..) , ProcessId , forever' ) import Control.Exception (AsyncException(ThreadKilled), SomeException) import Control.Monad (void) import Control.Monad.IO.Class (liftIO) import Control.Monad.Reader (ask) import GHC.Weak (Weak, deRefWeak) import Prelude -------------------------------------------------------------------------------- -- Agent Controller Implementation -- -------------------------------------------------------------------------------- -- | A triple containing a configured tracer, weak pointer to the -- agent controller's mailbox (CQueue) and an expression used to -- instantiate new agents on the current node. type AgentConfig = (Tracer, Weak (CQueue Message), (((TChan Message, TChan Message) -> Process ()) -> IO ProcessId)) -- | Starts a management agent for the current node. The agent process -- must not crash or be killed, so we generally avoid publishing its -- @ProcessId@ where possible. -- -- Our process is also responsible for forwarding messages to the trace -- controller, since having two /special processes/ handled via the -- @LocalNode@ would be inelegant. We forward messages directly to the -- trace controller's message queue, just as the @MxEventBus@ that's -- set up on the @LocalNode@ forwards messages directly to us. This -- optimises the code path for tracing and avoids overloading the node -- node controller's internal control plane with additional routing, at the -- cost of a little more complexity and two cases where we break -- encapsulation. -- mxAgentController :: Fork -> MVar AgentConfig -> Process () mxAgentController forkProcess mv = do trc <- liftIO $ startTracing forkProcess sigbus <- liftIO $ newBroadcastTChanIO liftIO $ startDeadLetterQueue sigbus weakQueue <- processWeakQ <$> ask liftIO $ putMVar mv (trc, weakQueue, mxStartAgent forkProcess sigbus) go sigbus trc where go bus tracer = forever' $ do void $ receiveWait [ -- This is exactly what it appears to be: a "catch all" handler. -- Since mxNotify can potentially pass an unevaluated thunk to -- our mailbox, the dequeue (i.e., matchMessage) can fail and -- crash this process, which we DO NOT want. Alternatively, -- we handle IO exceptions here explicitly, since we don't want -- this process to ever crash, and the assumption we therefore -- make is thus: -- -- 1. only ThreadKilled can tell this process to terminate -- 2. all other exceptions are invalid and should be ignored -- -- The outcome of course, is that /bad/ calls to mxNotify -- (e.g., passing unevaluated thunks that will crash when -- they're eventually forced) are thus silently ignored. -- matchAny (liftIO . broadcast bus tracer) ] `catches` [Handler (\ThreadKilled -> die "Killed"), Handler (\(_ :: SomeException) -> return ())] broadcast :: TChan Message -> Tracer -> Message -> IO () broadcast ch tr msg = do tmQueue <- tracerQueue tr atomicBroadcast ch tmQueue msg tracerQueue :: Tracer -> IO (Maybe (CQueue Message)) tracerQueue (Tracer _ wQ) = deRefWeak wQ atomicBroadcast :: TChan Message -> Maybe (CQueue Message) -> Message -> IO () atomicBroadcast ch Nothing msg = liftIO $ atomically $ writeTChan ch msg atomicBroadcast ch (Just q) msg = do -- liftIO $ putStrLn $ "broadcasting " ++ (show msg) liftIO $ atomically $ enqueueSTM q msg >> writeTChan ch msg -- | Forks a new process in which an mxAgent is run. -- mxStartAgent :: Fork -> TChan Message -> ((TChan Message, TChan Message) -> Process ()) -> IO ProcessId mxStartAgent fork chan handler = do chan' <- atomically (dupTChan chan) let proc = handler (chan, chan') fork proc -- | Start the tracer controller. -- startTracing :: Fork -> IO Tracer startTracing forkProcess = do mv <- newEmptyMVar pid <- forkProcess $ traceController mv wQ <- liftIO $ takeMVar mv return $ Tracer pid wQ -- | Start a dead letter (agent) queue. -- -- If no agents are registered on the system, the management -- event bus will fill up and its data won't be GC'ed until someone -- comes along and reads from the broadcast channel (via dupTChan -- of course). This is effectively a leak, so to mitigate it, we -- start a /dead letter queue/ that drains the event bus continuously, -- thus ensuring if there are no other consumers that we won't use -- up heap space unnecessarily. -- startDeadLetterQueue :: TChan Message -> IO () startDeadLetterQueue sigbus = do chan' <- atomically (dupTChan sigbus) void $ forkIO $ forever' $ do void $ atomically $ readTChan chan'