-- | Implement Erlang style message passing concurrency. -- -- This handles the 'MessagePassing' and 'Process' effects, using -- 'STM.TQueue's and 'forkIO'. -- -- This aims to be a pragmatic implementation, so even logging is -- supported. -- -- At the core is a /main process/ that enters 'schedule' -- and creates all of the internal state stored in 'STM.TVar's -- to manage processes with message queues. -- -- The 'Eff' handler for 'Process' and 'MessagePassing' use -- are implemented and available through 'spawn'. -- module Control.Eff.Concurrent.Process.ForkIOScheduler ( schedule , defaultMain , defaultMainWithLogChannel , ProcEff , InterruptableProcEff , SchedulerIO , HasSchedulerIO , forkIoScheduler ) where import Control.Exception.Safe as Safe import qualified Control.Eff.ExceptionExtra as ExcExtra () import Control.Concurrent.STM as STM import Control.Concurrent (yield) import qualified Control.Concurrent.Async as Async import Control.Concurrent.Async (Async(..)) import Control.Eff import Control.Eff.Extend import Control.Eff.Concurrent.Process import Control.Eff.Lift import Control.Eff.Log import Control.Eff.Reader.Strict as Reader import Control.Lens import Control.Monad ( when , void , (>=>) ) import Control.Monad.Trans.Control (MonadBaseControl(..)) import Data.Default import Data.Dynamic import Data.Foldable import Data.Kind ( ) import Data.Map ( Map ) import qualified Data.Map as Map import Data.Set ( Set ) import qualified Data.Set as Set import Data.Maybe import Data.Sequence ( Seq(..) ) import qualified Data.Sequence as Seq import GHC.Stack import Text.Printf import System.Timeout -- * Process Types -- | A message queue of a process, contains the actual queue and maybe an -- exit reason. data MessageQ = MessageQ { _incomingMessages :: Seq Dynamic , _shutdownRequests :: Maybe SomeExitReason } instance Default MessageQ where def = MessageQ def def makeLenses ''MessageQ -- | Return any '_shutdownRequests' from a 'MessageQ' in a 'TVar' and -- reset the '_shutdownRequests' field to 'Nothing' in the 'TVar'. tryTakeNextShutdownRequestSTM :: TVar MessageQ -> STM (Maybe SomeExitReason) tryTakeNextShutdownRequestSTM mqVar = do mq <- readTVar mqVar when (isJust (mq^.shutdownRequests)) (writeTVar mqVar (mq & shutdownRequests .~ Nothing)) return (mq^.shutdownRequests) -- | Information about a process, needed to implement 'MessagePassing' and -- 'Process' handlers. The message queue is backed by a 'STM.TQueue' and contains -- 'MessageQEntry' values. data ProcessInfo = ProcessInfo { _processId :: ProcessId , _processState :: TVar ProcessState , _messageQ :: TVar MessageQ , _processLinks :: TVar (Set ProcessId) } makeLenses ''ProcessInfo -- * Scheduler Types -- | Contains all process info'elements, as well as the state needed to -- implement inter process communication. It contains also a 'LogChannel' to -- which the logs of all processes are forwarded to. data SchedulerState = SchedulerState { _nextPid :: TVar ProcessId , _processTable :: TVar (Map ProcessId ProcessInfo) , _processCancellationTable :: TVar (Map ProcessId (Async (ExitReason 'NoRecovery))) , _processMonitors :: TVar (Set (MonitorReference, ProcessId)) , _nextMonitorIndex :: TVar Int -- Set of monitors and monitor owners } makeLenses ''SchedulerState -- | Add monitor: If the process is dead, enqueue a ProcessDown message into the -- owners message queue addMonitoring :: ProcessId -> ProcessId -> SchedulerState -> STM MonitorReference addMonitoring target owner schedulerState = do mi <- readTVar (schedulerState ^. nextMonitorIndex) modifyTVar' (schedulerState ^. nextMonitorIndex) (+1) let mref = MonitorReference mi target when (target /= owner) $ do pt <- readTVar (schedulerState ^. processTable) if Map.member target pt then modifyTVar' (schedulerState ^. processMonitors) (Set.insert (mref, owner)) else let pdown = (ProcessDown mref (SomeExitReason (ProcessNotRunning target))) in enqueueMessageOtherProcess owner (toDyn pdown) schedulerState return mref removeMonitoring :: MonitorReference -> SchedulerState -> STM () removeMonitoring mref schedulerState = modifyTVar' (schedulerState ^. processMonitors) (Set.filter (\(ref,_) -> ref /= mref)) triggerAndRemoveMonitor :: ProcessId -> SomeExitReason -> SchedulerState -> STM () triggerAndRemoveMonitor downPid reason schedulerState = do monRefs <- readTVar (schedulerState ^. processMonitors) traverse_ go monRefs where go (mr, owner) = when (monitoredProcess mr == downPid) (do let pdown = ProcessDown mr reason enqueueMessageOtherProcess owner (toDyn pdown) schedulerState removeMonitoring mr schedulerState) -- * Process Implementation instance Show ProcessInfo where show p = "process info: " ++ show (p ^. processId) -- | Create a new 'ProcessInfo' newProcessInfo :: HasCallStack => ProcessId -> STM ProcessInfo newProcessInfo a = ProcessInfo a <$> newTVar ProcessBooting <*> newTVar def <*> newTVar def -- * Scheduler Implementation -- | Create a new 'SchedulerState' newSchedulerState :: HasCallStack => STM SchedulerState newSchedulerState = SchedulerState <$> newTVar 1 <*> newTVar def <*> newTVar def <*> newTVar def <*> newTVar def -- | Create a new 'SchedulerState' run an IO action, catching all exceptions, -- and when the actions returns, clean up and kill all processes. withNewSchedulerState :: (HasLogging IO SchedulerIO, HasCallStack) => Eff SchedulerIO () -> Eff LoggingAndIO () withNewSchedulerState mainProcessAction = Safe.bracketWithError (lift (atomically newSchedulerState)) (\exceptions schedulerState -> do traverse_ (logError . ("scheduler setup crashed with: " ++) . Safe.displayException) exceptions logDebug "scheduler cleanup begin" runReader schedulerState tearDownScheduler ) (\schedulerState -> do logDebug "scheduler loop entered" x <- runReader schedulerState mainProcessAction logDebug "scheduler loop returned" return x ) where tearDownScheduler = do schedulerState <- getSchedulerState let cancelTableVar = schedulerState ^. processCancellationTable -- cancel all processes allProcesses <- lift (atomically (readTVar cancelTableVar <* writeTVar cancelTableVar def)) logNotice ("cancelling processes: " ++ show (toListOf (ifolded.asIndex) allProcesses)) void (liftBaseWith (\runS -> timeout 5000000 (Async.mapConcurrently (\a -> do Async.cancel a runS (logNotice ("process cancelled: "++ show (asyncThreadId a))) ) allProcesses >> runS (logNotice "all processes cancelled")))) -- | The concrete list of 'Eff'ects of processes compatible with this scheduler. -- This builds upon 'SchedulerIO'. type ProcEff = ConsProcess SchedulerIO -- | The concrete list of the effects, that the 'Process' uses type InterruptableProcEff = InterruptableProcess SchedulerIO -- | Type class constraint to indicate that an effect union contains the -- effects required by every process and the scheduler implementation itself. type HasSchedulerIO r = ( HasCallStack , Lifted IO r , SchedulerIO <:: r ) -- | The concrete list of 'Eff'ects for this scheduler implementation. type SchedulerIO = ( Reader SchedulerState : LoggingAndIO) -- | Basic effects: 'Logs' 'LogMessage' and 'Lift' IO type LoggingAndIO = '[ Logs LogMessage , LogWriterReader LogMessage IO , Lift IO ] -- | Start the message passing concurrency system then execute a 'Process' on -- top of 'SchedulerIO' effect. All logging is sent to standard output. defaultMain :: HasCallStack => Eff InterruptableProcEff () -> IO () defaultMain c = withAsyncLogChannel 1024 (multiMessageLogWriter ($ printLogMessage)) (handleLoggingAndIO_ (schedule c)) -- | Start the message passing concurrency system then execute a 'Process' on -- top of 'SchedulerIO' effect. All logging is sent to standard output. defaultMainWithLogChannel :: HasCallStack => Eff InterruptableProcEff () -> LogChannel LogMessage -> IO () defaultMainWithLogChannel = handleLoggingAndIO_ . schedule -- | A 'SchedulerProxy' for 'SchedulerIO' forkIoScheduler :: SchedulerProxy SchedulerIO forkIoScheduler = SchedulerProxy -- ** MessagePassing execution handleProcess :: (HasLogging IO SchedulerIO, HasCallStack) => ProcessInfo -> Eff ProcEff (ExitReason 'NoRecovery) -> Eff SchedulerIO (ExitReason 'NoRecovery) handleProcess myProcessInfo = handle_relay_s 0 (const return) (\ !nextRef !request k -> stepProcessInterpreter nextRef request k return ) where myPid = myProcessInfo ^. processId myProcessStateVar = myProcessInfo ^. processState setMyProcessState = lift . atomically . setMyProcessStateSTM -- DEBUG variant: -- setMyProcessState st = do -- oldSt <- lift (atomically (readTVar myProcessStateVar <* setMyProcessStateSTM st)) -- logDebug ("state change: "++ show oldSt ++ " -> " ++ show st) setMyProcessStateSTM = writeTVar myProcessStateVar myMessageQVar = myProcessInfo ^. messageQ kontinueWith :: forall s v a . HasCallStack => (s -> Arr SchedulerIO v a) -> s -> Arr SchedulerIO v a kontinueWith kontinue !nextRef !result = do setMyProcessState ProcessIdle lift yield kontinue nextRef result diskontinueWith :: forall a . HasCallStack => Arr SchedulerIO (ExitReason 'NoRecovery) a -> Arr SchedulerIO (ExitReason 'NoRecovery) a diskontinueWith diskontinue !reason = do setMyProcessState ProcessShuttingDown diskontinue reason stepProcessInterpreter :: forall v a . HasCallStack => Int -> Process SchedulerIO v -> (Int -> Arr SchedulerIO v a) -> Arr SchedulerIO (ExitReason 'NoRecovery) a -> Eff SchedulerIO a stepProcessInterpreter !nextRef !request kontinue diskontinue = -- handle process shutdown requests: -- 1. take process exit reason -- 2. set process state to ProcessShuttingDown -- 3. apply kontinue to (Right Interrupted) -- tryTakeNextShutdownRequest >>= maybe noShutdownRequested (either onShutdownRequested onInterruptRequested . fromSomeExitReason) where tryTakeNextShutdownRequest = lift (atomically (tryTakeNextShutdownRequestSTM myMessageQVar)) onShutdownRequested shutdownRequest = do logDebug ("shutdown requested: " ++ show shutdownRequest) setMyProcessState ProcessShuttingDown interpretRequestAfterShutdownRequest (diskontinueWith diskontinue) shutdownRequest request onInterruptRequested interruptRequest = do logDebug ("interrupt requested: " ++ show interruptRequest) setMyProcessState ProcessShuttingDown interpretRequestAfterInterruptRequest (kontinueWith kontinue nextRef) (diskontinueWith diskontinue) interruptRequest request noShutdownRequested = do setMyProcessState ProcessBusy interpretRequest (kontinueWith kontinue) (diskontinueWith diskontinue) nextRef request -- This gets no nextRef and may not pass a Left value to the continuation. -- This forces the caller to defer the process exit to the next request -- and hence ensures that the scheduler code cannot forget to allow the -- client code to react to a shutdown request. interpretRequestAfterShutdownRequest :: forall v a . HasCallStack => Arr SchedulerIO (ExitReason 'NoRecovery) a -> (ExitReason 'NoRecovery) -> Process SchedulerIO v -> Eff SchedulerIO a interpretRequestAfterShutdownRequest diskontinue shutdownRequest = \case SendMessage _ _ -> diskontinue shutdownRequest SendInterrupt _ _ -> diskontinue shutdownRequest SendShutdown toPid r -> if toPid == myPid then diskontinue r else diskontinue shutdownRequest Spawn _ -> diskontinue shutdownRequest SpawnLink _ -> diskontinue shutdownRequest ReceiveSelectedMessage _ -> diskontinue shutdownRequest FlushMessages -> diskontinue shutdownRequest SelfPid -> diskontinue shutdownRequest MakeReference -> diskontinue shutdownRequest YieldProcess -> diskontinue shutdownRequest Shutdown r -> diskontinue r GetProcessState _ -> diskontinue shutdownRequest Monitor _ -> diskontinue shutdownRequest Demonitor _ -> diskontinue shutdownRequest Link _ -> diskontinue shutdownRequest Unlink _ -> diskontinue shutdownRequest interpretRequestAfterInterruptRequest :: forall v a . HasCallStack => Arr SchedulerIO v a -> Arr SchedulerIO (ExitReason 'NoRecovery) a -> (ExitReason 'Recoverable) -> Process SchedulerIO v -> Eff SchedulerIO a interpretRequestAfterInterruptRequest kontinue diskontinue interruptRequest = \case SendMessage _ _ -> kontinue (Interrupted interruptRequest) SendInterrupt _ _ -> kontinue (Interrupted interruptRequest) SendShutdown toPid r -> if toPid == myPid then diskontinue r else kontinue (Interrupted interruptRequest) Spawn _ -> kontinue (Interrupted interruptRequest) SpawnLink _ -> kontinue (Interrupted interruptRequest) ReceiveSelectedMessage _ -> kontinue (Interrupted interruptRequest) FlushMessages -> kontinue (Interrupted interruptRequest) SelfPid -> kontinue (Interrupted interruptRequest) MakeReference -> kontinue (Interrupted interruptRequest) YieldProcess -> kontinue (Interrupted interruptRequest) Shutdown r -> diskontinue r GetProcessState _ -> kontinue (Interrupted interruptRequest) Monitor _ -> kontinue (Interrupted interruptRequest) Demonitor _ -> kontinue (Interrupted interruptRequest) Link _ -> kontinue (Interrupted interruptRequest) Unlink _ -> kontinue (Interrupted interruptRequest) interpretRequest :: forall v a . HasCallStack => (Int -> Arr SchedulerIO v a) -> Arr SchedulerIO (ExitReason 'NoRecovery) a -> Int -> Process SchedulerIO v -> Eff SchedulerIO a interpretRequest kontinue diskontinue nextRef = \case SendMessage toPid msg -> interpretSend toPid msg >>= kontinue nextRef . ResumeWith SendInterrupt toPid msg -> if toPid == myPid then kontinue nextRef (Interrupted msg) else interpretSendShutdownOrInterrupt toPid (SomeExitReason msg) >>= kontinue nextRef . ResumeWith SendShutdown toPid msg -> if toPid == myPid then diskontinue msg else interpretSendShutdownOrInterrupt toPid (SomeExitReason msg) >>= kontinue nextRef . ResumeWith Spawn child -> spawnNewProcess Nothing child >>= kontinue nextRef . ResumeWith . fst SpawnLink child -> spawnNewProcess (Just myProcessInfo) child >>= kontinue nextRef . ResumeWith . fst ReceiveSelectedMessage f -> interpretReceive f >>= either diskontinue (kontinue nextRef) FlushMessages -> interpretFlush >>= kontinue nextRef SelfPid -> kontinue nextRef (ResumeWith myPid) MakeReference -> kontinue (nextRef + 1) (ResumeWith nextRef) YieldProcess -> kontinue nextRef (ResumeWith ()) Shutdown r -> diskontinue r GetProcessState toPid -> interpretGetProcessState toPid >>= kontinue nextRef . ResumeWith Monitor target -> interpretMonitor target >>= kontinue nextRef . ResumeWith Demonitor ref -> interpretDemonitor ref >>= kontinue nextRef . ResumeWith Link toPid -> interpretLink toPid >>= kontinue nextRef . either Interrupted ResumeWith Unlink toPid -> interpretUnlink toPid >>= kontinue nextRef . ResumeWith where interpretMonitor !target = do setMyProcessState ProcessBusyMonitoring schedulerState <- getSchedulerState lift (atomically (addMonitoring target myPid schedulerState)) interpretDemonitor !ref = do setMyProcessState ProcessBusyMonitoring schedulerState <- getSchedulerState lift (atomically (removeMonitoring ref schedulerState)) interpretUnlink !toPid = do setMyProcessState ProcessBusyUnlinking schedulerState <- getSchedulerState let procInfosVar = schedulerState^.processTable lift $ atomically $ do procInfos <- readTVar procInfosVar traverse_ (\toProcInfo -> modifyTVar' (toProcInfo ^. processLinks) (Set.delete myPid)) (procInfos ^. at toPid) modifyTVar' (myProcessInfo ^. processLinks) (Set.delete toPid) interpretGetProcessState !toPid = do setMyProcessState ProcessBusy schedulerState <- getSchedulerState let procInfosVar = schedulerState^.processTable lift $ atomically $ do procInfos <- readTVar procInfosVar traverse (\toProcInfo -> readTVar (toProcInfo ^. processState)) (procInfos ^. at toPid) interpretLink !toPid = do setMyProcessState ProcessBusyLinking schedulerState <- getSchedulerState let procInfosVar = schedulerState^.processTable lift $ atomically $ do procInfos <- readTVar procInfosVar case procInfos ^. at toPid of Just toProcInfo -> do modifyTVar' (toProcInfo ^. processLinks) (Set.insert myPid) modifyTVar' (myProcessInfo ^. processLinks) (Set.insert toPid) return (Right ()) Nothing -> return (Left (LinkedProcessCrashed toPid)) interpretSend !toPid msg = setMyProcessState ProcessBusySending *> getSchedulerState >>= lift . atomically . enqueueMessageOtherProcess toPid msg interpretSendShutdownOrInterrupt !toPid !msg = setMyProcessState (either (const ProcessBusySendingShutdown) (const ProcessBusySendingInterrupt) (fromSomeExitReason msg)) *> getSchedulerState >>= lift . atomically . enqueueShutdownRequest toPid msg interpretFlush :: Eff SchedulerIO (ResumeProcess [Dynamic]) interpretFlush = do setMyProcessState ProcessBusyReceiving lift $ atomically $ do myMessageQ <- readTVar myMessageQVar modifyTVar' myMessageQVar (incomingMessages .~ Seq.Empty) return (ResumeWith (toList (myMessageQ ^. incomingMessages))) interpretReceive :: MessageSelector b -> Eff SchedulerIO (Either (ExitReason 'NoRecovery) (ResumeProcess b)) interpretReceive f = do setMyProcessState ProcessBusyReceiving lift $ atomically $ do myMessageQ <- readTVar myMessageQVar case myMessageQ ^. shutdownRequests of Nothing -> case partitionMessages (myMessageQ ^. incomingMessages) Seq.Empty of Nothing -> retry Just (selectedMessage, otherMessages) -> do modifyTVar' myMessageQVar (incomingMessages .~ otherMessages) return (Right (ResumeWith selectedMessage)) Just shutdownRequest -> do modifyTVar' myMessageQVar (shutdownRequests .~ Nothing) case fromSomeExitReason shutdownRequest of Left sr -> return (Left sr) Right ir -> return (Right (Interrupted ir)) where partitionMessages Seq.Empty _acc = Nothing partitionMessages (m :<| msgRest) acc = maybe (partitionMessages msgRest (acc :|> m)) (\res -> Just (res, acc Seq.>< msgRest)) (runMessageSelector f m) -- | This is the main entry point to running a message passing concurrency -- application. This function takes a 'Process' on top of the 'SchedulerIO' -- effect and a 'LogChannel' for concurrent logging. schedule :: (HasLogging IO SchedulerIO, HasCallStack) => Eff InterruptableProcEff () -> Eff LoggingAndIO () schedule procEff = liftBaseWith (\runS -> Async.withAsync (runS $ withNewSchedulerState $ do (_, mainProcAsync) <- spawnNewProcess Nothing $ do logNotice "++++++++ main process started ++++++++" provideInterruptsShutdown procEff logNotice "++++++++ main process returned ++++++++" lift (void (Async.wait mainProcAsync)) ) (\ast -> runS $ do a <- restoreM ast void $ lift (Async.wait a) ) ) >>= restoreM spawnNewProcess :: (HasLogging IO SchedulerIO, HasCallStack) => Maybe ProcessInfo -> Eff ProcEff () -> Eff SchedulerIO (ProcessId, Async (ExitReason 'NoRecovery)) spawnNewProcess mlinkedParent mfa = do schedulerState <- getSchedulerState procInfo <- allocateProcInfo schedulerState traverse_ (linkToParent procInfo) mlinkedParent procAsync <- doForkProc procInfo schedulerState return (procInfo ^. processId, procAsync) where linkToParent toProcInfo parent = do let toPid = toProcInfo ^. processId parentPid = parent ^. processId lift $ atomically $ do modifyTVar' (toProcInfo ^. processLinks) (Set.insert parentPid) modifyTVar' (parent ^. processLinks) (Set.insert toPid) allocateProcInfo schedulerState = lift (atomically (do let nextPidVar = schedulerState ^. nextPid processInfosVar = schedulerState ^. processTable pid <- readTVar nextPidVar modifyTVar' nextPidVar (+1) procInfo <- newProcessInfo pid modifyTVar' processInfosVar (at pid ?~ procInfo) return procInfo )) logAppendProcInfo pid = let addProcessId = over lmProcessId (maybe (Just (printf "% 9s" (show pid))) Just) in traverseLogMessages (traverse setLogMessageThreadId >=> traverse (return . addProcessId)) triggerProcessLinksAndMonitors !pid !reason !linkSetVar = do schedulerState <- getSchedulerState lift $ atomically $ triggerAndRemoveMonitor pid (SomeExitReason reason) schedulerState let exitSeverity = toExitSeverity reason sendIt !linkedPid = do let msg = SomeExitReason (LinkedProcessCrashed pid) lift $ atomically $ do procInfos <- readTVar (schedulerState ^. processTable) let mLinkedProcInfo = procInfos ^? at linkedPid . _Just case mLinkedProcInfo of Nothing -> return (Left linkedPid) Just linkedProcInfo -> let linkedMsgQVar = linkedProcInfo ^. messageQ linkedLinkSetVar = linkedProcInfo ^. processLinks in do linkedLinkSet <- readTVar linkedLinkSetVar if Set.member pid linkedLinkSet then do writeTVar linkedLinkSetVar (Set.delete pid linkedLinkSet) when (exitSeverity == Crash) (modifyTVar' linkedMsgQVar (shutdownRequests ?~ msg)) return (Right linkedPid) else return (Left linkedPid) linkedPids <- lift (atomically (do linkSet <- readTVar linkSetVar writeTVar linkSetVar def return linkSet)) res <- traverse sendIt (toList linkedPids) traverse_ (logDebug . either (("linked process no found: " ++) . show) (("sent shutdown to linked process: " ++) . show)) res doForkProc :: ProcessInfo -> SchedulerState -> Eff SchedulerIO (Async (ExitReason 'NoRecovery)) doForkProc procInfo schedulerState = restoreM =<< liftBaseWith (\inScheduler -> do let cancellationsVar = schedulerState ^. processCancellationTable processInfosVar = schedulerState ^. processTable pid = procInfo ^. processId procAsync <- Async.async ( inScheduler (logAppendProcInfo pid (Safe.bracketWithError (logDebug "enter process") (\mExc () -> do lift (atomically (do modifyTVar' processInfosVar (at pid .~ Nothing) modifyTVar' cancellationsVar (at pid .~ Nothing))) traverse_ (\exc -> logExitAndTriggerLinksAndMonitors (exitReasonFromException exc) pid) mExc ) (const (do res <- handleProcess procInfo (mfa >> return ExitNormally) logExitAndTriggerLinksAndMonitors res pid)) ))) atomically (modifyTVar' cancellationsVar (at pid ?~ procAsync)) return procAsync) where exitReasonFromException exc = case Safe.fromException exc of Just Async.AsyncCancelled -> Killed Nothing -> UnexpectedException (prettyCallStack callStack) (Safe.displayException exc) logExitAndTriggerLinksAndMonitors reason pid = do triggerProcessLinksAndMonitors pid reason (procInfo ^. processLinks) logProcessExit reason return reason -- * Scheduler Accessor getSchedulerState :: HasSchedulerIO r => Eff r SchedulerState getSchedulerState = ask enqueueMessageOtherProcess :: HasCallStack => ProcessId -> Dynamic -> SchedulerState -> STM () enqueueMessageOtherProcess toPid msg schedulerState = view (at toPid) <$> readTVar (schedulerState ^. processTable) >>= maybe (return ()) (\toProcessTable -> do modifyTVar' (toProcessTable ^. messageQ ) (incomingMessages %~ (:|> msg)) return ()) enqueueShutdownRequest :: HasCallStack => ProcessId -> SomeExitReason -> SchedulerState -> STM () enqueueShutdownRequest toPid msg schedulerState = view (at toPid) <$> readTVar (schedulerState ^. processTable) >>= maybe (return ()) (\toProcessTable -> do modifyTVar' (toProcessTable ^. messageQ ) (shutdownRequests ?~ msg) return ())