{-# LANGUAGE ExistentialQuantification #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE PatternGuards #-} {-# LANGUAGE BangPatterns #-} {-# LANGUAGE RecordWildCards #-} {-# LANGUAGE TupleSections #-} {-# LANGUAGE DeriveDataTypeable #-} {-# LANGUAGE DeriveGeneric #-} {-# LANGUAGE GeneralizedNewtypeDeriving #-} {-# LANGUAGE MultiParamTypeClasses #-} {-# LANGUAGE Rank2Types #-} -- | This is the @Process@ implementation of a /managed process/ module Control.Distributed.Process.ManagedProcess.Internal.GenProcess ( recvLoop , precvLoop , currentTimeout , systemTimeout , drainTimeout , processState , processDefinition , processFilters , processUnhandledMsgPolicy , processQueue , gets , getAndModifyState , modifyState , setUserTimeout , setProcessState , GenProcess , peek , push , enqueue , dequeue , addUserTimer , removeUserTimer , eval , act , runAfter , evalAfter ) where import Control.Applicative (liftA2) import Control.Distributed.Process ( match , matchAny , matchMessage , handleMessage , handleMessageIf , receiveTimeout , receiveWait , forward , catchesExit , catchExit , die , unsafeWrapMessage , Process , ProcessId , Match ) import qualified Control.Distributed.Process as P ( liftIO ) import Control.Distributed.Process.Internal.Types ( Message(..) , ProcessExitException(..) ) import Control.Distributed.Process.ManagedProcess.Server ( handleCast , handleExitIf , stop , continue ) import Control.Distributed.Process.ManagedProcess.Timer ( Timer(timerDelay) , TimerKey , TimedOut(..) , delayTimer , startTimer , stopTimer , matchTimeout , matchKey , matchRun ) import Control.Distributed.Process.ManagedProcess.Internal.Types hiding (Message) import qualified Control.Distributed.Process.ManagedProcess.Internal.PriorityQueue as Q ( empty , dequeue , enqueue , peek , toList ) import Control.Distributed.Process.Extras ( ExitReason(..) , Shutdown(..) ) import qualified Control.Distributed.Process.Extras.SystemLog as Log import Control.Distributed.Process.Extras.Time import Control.Distributed.Process.Serializable (Serializable) import Control.Monad (void) import Control.Monad.Catch ( mask_ , catch , throwM , mask , SomeException ) import qualified Control.Monad.State.Strict as ST ( get ) import Data.IORef (newIORef, atomicModifyIORef') import Data.Maybe (fromJust) import qualified Data.Map.Strict as Map ( size , insert , delete , lookup , empty , foldrWithKey ) -------------------------------------------------------------------------------- -- Priority Mailbox Handling -- -------------------------------------------------------------------------------- type Safe = Bool -- | Evaluate the given function over the @ProcessState s@ for the caller, and -- return the result. gets :: forall s a . (ProcessState s -> a) -> GenProcess s a gets f = ST.get >>= \(s :: State s) -> liftIO $ do atomicModifyIORef' s $ \(s' :: ProcessState s) -> (s', f s' :: a) -- | Modify our state. modifyState :: (ProcessState s -> ProcessState s) -> GenProcess s () modifyState f = ST.get >>= \s -> liftIO $ mask_ $ do atomicModifyIORef' s $ \s' -> (f s', ()) -- | Modify our state and return a value (potentially from it). getAndModifyState :: (ProcessState s -> (ProcessState s, a)) -> GenProcess s a getAndModifyState f = ST.get >>= \s -> liftIO $ mask_ $ do atomicModifyIORef' s $ \s' -> f s' -- | Set the current process state. setProcessState :: s -> GenProcess s () setProcessState st' = modifyState $ \st@ProcessState{..} -> st { procState = st' } -- | Set the mailbox draining timer. setDrainTimeout :: Timer -> GenProcess s () setDrainTimeout t = modifyState $ \st@ProcessState{..} -> st { sysTimeout = t } -- | Set the user timeout applied whilst a prioritised process loop is in -- a blocking receive. setUserTimeout :: Delay -> GenProcess s () setUserTimeout d = modifyState $ \st@ProcessState{..} -> st { usrTimeout = d } -- | Add a /user timer/, bound to the given datum. addUserTimer :: Timer -> Message -> GenProcess s TimerKey addUserTimer t m = getAndModifyState $ \st@ProcessState{..} -> let sz = Map.size usrTimers tk = sz + 1 in (st { usrTimers = (Map.insert tk (t, m) usrTimers) }, tk) -- | Remove a /user timer/, for the given key. removeUserTimer :: TimerKey -> GenProcess s () removeUserTimer i = modifyState $ \st@ProcessState{..} -> st { usrTimers = (Map.delete i usrTimers) } -- | Consume the timer with the given @TimerKey@. The timer is removed from the -- @ProcessState@ and given to the supplied expression, whose evaluation is given -- back to the caller. consumeTimer :: forall s a . TimerKey -> (Message -> GenProcess s a) -> GenProcess s a consumeTimer k f = do mt <- gets usrTimers let tm = Map.lookup k mt let ut = Map.delete k mt modifyState $ \st@ProcessState{..} -> st { usrTimers = ut } case tm of Nothing -> lift $ die $ "GenProcess.consumeTimer - InvalidTimerKey" Just (_, m) -> f m -- | The @ProcessDefinition@ for the current loop. processDefinition :: GenProcess s (ProcessDefinition s) processDefinition = gets procDef -- | The list of prioritisers for the current loop. processPriorities :: GenProcess s ([DispatchPriority s]) processPriorities = gets procPrio -- | The list of filters for the current loop. processFilters :: GenProcess s ([DispatchFilter s]) processFilters = gets procFilters -- | Evaluates to the user defined state for the currently executing server loop. processState :: GenProcess s s processState = gets procState -- | Evaluates to the @UnhandledMessagePolicy@ for the current loop. processUnhandledMsgPolicy :: GenProcess s UnhandledMessagePolicy processUnhandledMsgPolicy = gets (unhandledMessagePolicy . procDef) -- | Returns a /read only view/ on the internal priority queue. processQueue :: GenProcess s [Message] processQueue = gets internalQ >>= return . Q.toList -- | The @Timer@ for the system timeout. See @drainTimeout@. systemTimeout :: GenProcess s Timer systemTimeout = gets sysTimeout -- | The policy for the system timeout. This is used to determine how the loop -- should limit the time spent draining the /real/ process mailbox into our -- internal priority queue. timeoutPolicy :: GenProcess s RecvTimeoutPolicy timeoutPolicy = gets timeoutSpec -- | The @Delay@ for the @drainTimeout@. drainTimeout :: GenProcess s Delay drainTimeout = gets (timerDelay . sysTimeout) -- | The current (user supplied) timeout. currentTimeout :: GenProcess s Delay currentTimeout = gets usrTimeout -- | Update and store the internal priority queue. updateQueue :: (Queue -> Queue) -> GenProcess s () updateQueue f = modifyState $ \st@ProcessState{..} -> st { internalQ = f internalQ } -- | Evaluate any matching /info handler/ with the supplied datum after waiting -- for at least @TimeInterval@. The process state (for the resulting @Action s@) -- is also given and the process loop will go on as per @Server.continue@. -- -- Informally, evaluating this expression (such that the @Action@ is given as the -- result of a handler or filter) will ensure that the supplied message (datum) -- is availble for processing no sooner than @TimeInterval@. -- -- Currently, this expression creates an @Action@ that triggers immediate -- evaluation in the process loop before continuing with the given state. The -- process loop stores a /user timeout/ for the given time interval, which is -- trigerred like a wait/drain timeout. This implementation is subject to change. evalAfter :: forall s m . (Serializable m) => TimeInterval -> m -> s -> Action s evalAfter d m s = act $ runAfter d m >> setProcessState s -- | Produce an @Action s@ that, if it is the result of a handler, will cause the -- server loop to evaluate the supplied expression. This is given in the @GenProcess@ -- monad, which is intended for internal use only. act :: forall s . GenProcess s () -> Action s act = return . ProcessActivity {-# WARNING act "This interface is intended for internal use only" #-} -- | Evaluate an expression in the 'GenProcess' monad. eval :: forall s . GenProcess s (ProcessAction s) -> Action s eval = return . ProcessExpression -- | Starts a timer and adds it as a /user timeout/. runAfter :: forall s m . (Serializable m) => TimeInterval -> m -> GenProcess s () runAfter d m = do t <- lift $ startTimer (Delay d) void $ addUserTimer t (unsafeWrapMessage m) {-# WARNING runAfter "This interface is intended for internal use only" #-} -------------------------------------------------------------------------------- -- Internal Priority Queue -- -------------------------------------------------------------------------------- -- | Dequeue a message from the internal priority queue. dequeue :: GenProcess s (Maybe Message) dequeue = getAndModifyState $ \st -> do let pq = internalQ st case Q.dequeue pq of Nothing -> (st, Nothing) Just (m, q') -> (st { internalQ = q' }, Just m) -- | Peek at the next available message in the internal priority queue, without -- removing it. peek :: GenProcess s (Maybe Message) peek = getAndModifyState $ \st -> do let pq = internalQ st (st, Q.peek pq) -- | Push a message to the head of the internal priority queue. push :: forall s . Message -> GenProcess s () push m = do st <- processState enqueueMessage st [ PrioritiseInfo { prioritise = (\_ m' -> return $ Just ((101 :: Int), m')) :: s -> Message -> Process (Maybe (Int, Message)) } ] m -- | Enqueue a message to the back of the internal priority queue. enqueue :: forall s . Message -> GenProcess s () enqueue m = do st <- processState enqueueMessage st [] m -- | Enqueue a message in the internal priority queue. The given message will be -- evaluated by all the supplied prioritisers, and if none match it, then it will -- be assigned the lowest possible priority (i.e. put at the back of the queue). enqueueMessage :: forall s . s -> [DispatchPriority s] -> Message -> GenProcess s () enqueueMessage s [] m' = enqueueMessage s [ PrioritiseInfo { prioritise = (\_ m -> return $ Just ((-1 :: Int), m)) :: s -> Message -> Process (Maybe (Int, Message)) } ] m' enqueueMessage s (p:ps) m' = let checkPrio = prioritise p s in do (lift $ checkPrio m') >>= doEnqueue s ps m' where doEnqueue :: s -> [DispatchPriority s] -> Message -> Maybe (Int, Message) -> GenProcess s () doEnqueue s' ps' msg Nothing = enqueueMessage s' ps' msg doEnqueue _ _ _ (Just (i, m)) = updateQueue (Q.enqueue (i * (-1 :: Int)) m) -------------------------------------------------------------------------------- -- Process Loop Implementations -- -------------------------------------------------------------------------------- -- | Maps handlers to a dynamic action that can take place outside of a -- expect/recieve block. This is used by the prioritised process loop. class DynMessageHandler d where dynHandleMessage :: UnhandledMessagePolicy -> s -> d s -> Message -> Process (Maybe (ProcessAction s)) instance DynMessageHandler Dispatcher where dynHandleMessage _ s (Dispatch d) msg = handleMessage msg (d s) dynHandleMessage _ s (DispatchIf d c) msg = handleMessageIf msg (c s) (d s) instance DynMessageHandler ExternDispatcher where dynHandleMessage _ s (DispatchCC _ d) msg = handleMessage msg (d s) dynHandleMessage _ s (DispatchSTM _ d _ _) msg = handleMessage msg (d s) instance DynMessageHandler DeferredDispatcher where dynHandleMessage _ s (DeferredDispatcher d) = d s -- | Maps filters to an action that can take place outside of a -- expect/recieve block. class DynFilterHandler d where dynHandleFilter :: s -> d s -> Message -> Process (Maybe (Filter s)) instance DynFilterHandler DispatchFilter where dynHandleFilter s (FilterApi d) msg = handleMessage msg (d s) dynHandleFilter s (FilterAny d) msg = handleMessage msg (d s) dynHandleFilter s (FilterRaw d) msg = d s msg dynHandleFilter s (FilterState d) _ = d s -- | Prioritised process loop. -- -- Evaluating this function will cause the caller to enter a server loop, -- constantly reading messages from its mailbox (and/or other supplied control -- planes) and passing these to handler functions in the supplied process -- definition. Only when it is determined that the server process should -- terminate - either by the handlers deciding to stop the process, or by an -- unhandled exit signal or other form of failure condition (e.g. synchronous or -- asynchronous exceptions). -- -- ensureIOManagerIsRunning before evaluating this loop... -- precvLoop :: PrioritisedProcessDefinition s -> s -> Delay -> Process ExitReason precvLoop ppDef pState recvDelay = do st <- P.liftIO $ newIORef $ ProcessState { timeoutSpec = recvTimeout ppDef , sysTimeout = delayTimer Infinity , usrTimeout = recvDelay , internalQ = Q.empty , procState = pState , procDef = processDef ppDef , procPrio = priorities ppDef , procFilters = filters ppDef , usrTimers = Map.empty } mask $ \restore -> do res <- catch (fmap Right $ restore $ loop st) (\(e :: SomeException) -> return $ Left e) -- res could be (Left ex), so we restore process state & def from our IORef ps <- P.liftIO $ atomicModifyIORef' st $ \s' -> (s', s') let st' = procState ps pd = procDef ps sh = shutdownHandler pd case res of Right (exitReason, _) -> do restore $ sh (CleanShutdown st') exitReason return exitReason Left ex -> do -- we'll attempt to run the exit handler with the original state restore $ sh (LastKnown st') (ExitOther $ show ex) throwM ex where loop st' = catchExit (runProcess st' recvQueue) (\_ (r :: ExitReason) -> return (r, st')) recvQueue :: GenProcess s ExitReason recvQueue = do pd <- processDefinition let ex = trapExit:(exitHandlers $ pd) let exHandlers = map (\d' -> (dispatchExit d')) ex catch (drainMailbox >> processNext >>= nextAction) (\(e :: ProcessExitException) -> handleExit exHandlers e >>= nextAction) where handleExit :: [(s -> ProcessId -> Message -> Process (Maybe (ProcessAction s)))] -> ProcessExitException -> GenProcess s (ProcessAction s) handleExit [] ex = throwM ex handleExit (h:hs) ex@(ProcessExitException pid msg) = do r <- processState >>= \s -> lift $ h s pid msg case r of Nothing -> handleExit hs ex Just p -> return p nextAction :: ProcessAction s -> GenProcess s ExitReason nextAction ac | ProcessExpression expr <- ac = expr >>= nextAction | ProcessActivity act' <- ac = act' >> recvQueue | ProcessSkip <- ac = recvQueue | ProcessContinue ps' <- ac = recvQueueAux ps' | ProcessTimeout d ps' <- ac = setUserTimeout d >> recvQueueAux ps' | ProcessStop xr <- ac = return xr | ProcessStopping ps' xr <- ac = setProcessState ps' >> return xr | ProcessHibernate d' s' <- ac = (lift $ block d') >> recvQueueAux s' | ProcessBecome pd' ps' <- ac = do modifyState $ \st@ProcessState{..} -> st { procDef = pd', procState = ps' } -- liftIO $ putStrLn "modified process def" recvQueue | otherwise {- compiler foo -} = return $ ExitOther "IllegalState" recvQueueAux st = setProcessState st >> recvQueue -- TODO: at some point we should re-implement our state monad in terms of -- mkWeakIORef instead of a full IORef. At that point, we can implement hiberation -- in the following terms: -- 1. the user defines (at some level, perhaps outside of this API) some -- means for writing a process' state to a backing store -- NB: this could be /persistent/, or a file, or database, etc... -- 2. when we enter hibernation, we do the following: -- (a) write the process state to the chosen backing store -- (b) evaluate yield (telling the RTS we're willing to give up our time slice) -- (c) enter a blocking receiveWait with no state on our stack... -- [NB] presumably at this point our state will be eligible for GC -- (d) when we finally receive a message, reboot the process thus: -- (i) read our state back from the given backing store -- (ii) call a user defined function to rebuild the state if custom -- actions need to be taken (e.g. they might've stored something -- like an STM TVar and need to request a new one from some -- well known service or registry - alt. they might want to -- /replay/ actions to rebuild their state as an FSM might) -- (iii) re-enter the recv loop and immediately processNext -- -- This will give roughly the same semantics as erlang's hibernate/3, although -- the RTS does GC globally rather than per-thread, but that might change in -- some future release (who knows!?). -- -- Also, this gives us the ability to migrate process state across remote -- boundaries. Not only can a process be moved in this way, if we generalise -- the mechanism to move a serialised closure, we can migrate the whole process -- and its state as well. The main difference here (with ordinary use of -- @Closure@ et al for moving processes around, is that we do not insist -- on the process state being serializable, simply that they provide a -- function to read+write the state, and a (state -> state) function to be -- called during rehydration if custom actions need to be taken. -- processNext :: GenProcess s (ProcessAction s) processNext = do (up, pf) <- gets $ liftA2 (,) (unhandledMessagePolicy . procDef) procFilters case pf of [] -> consumeMessage _ -> filterMessage (filterNext False up pf Nothing) consumeMessage = applyNext dequeue processApply filterMessage = applyNext peek filterNext :: Safe -> UnhandledMessagePolicy -> [DispatchFilter s] -> Maybe (Filter s) -> Message -> GenProcess s (ProcessAction s) filterNext isSafe mp' fs mf msg | Just (FilterSafe s') <- mf = filterNext True mp' fs (Just $ FilterOk s') msg | Just (FilterSkip s') <- mf = setProcessState s' >> dequeue >> return ProcessSkip | Just (FilterStop s' r) <- mf = return $ ProcessStopping s' r | isSafe , Just (FilterOk s') <- mf , [] <- fs = do setProcessState s' act' <- processApply msg dequeue >> return act' | Just (FilterOk s') <- mf , [] <- fs = setProcessState s' >> applyNext dequeue processApply | Nothing <- mf, [] <- fs = applyNext dequeue processApply | Just (FilterOk s') <- mf , (f:fs') <- fs = do setProcessState s' act' <- lift $ dynHandleFilter s' f msg filterNext isSafe mp' fs' act' msg | Just (FilterReject _ s') <- mf = do setProcessState s' >> dequeue >>= lift . applyPolicy mp' s' . fromJust | Nothing <- mf {- filter didn't apply to the input type -} , (f:fs') <- fs = processState >>= \s' -> do lift (dynHandleFilter s' f msg) >>= \a -> filterNext isSafe mp' fs' a msg applyNext :: (GenProcess s (Maybe Message)) -> (Message -> GenProcess s (ProcessAction s)) -> GenProcess s (ProcessAction s) applyNext queueOp handler = do next <- queueOp case next of Nothing -> drainOrTimeout Just msg -> handler msg processApply msg = do (def, pState) <- gets $ liftA2 (,) procDef procState let pol = unhandledMessagePolicy def apiMatchers = map (dynHandleMessage pol pState) (apiHandlers def) infoMatchers = map (dynHandleMessage pol pState) (infoHandlers def) extMatchers = map (dynHandleMessage pol pState) (externHandlers def) shutdown' = dynHandleMessage pol pState shutdownHandler' ms' = (shutdown':extMatchers) ++ apiMatchers ++ infoMatchers -- liftIO $ putStrLn $ "we have " ++ (show $ (length apiMatchers, length infoMatchers)) ++ " handlers" processApplyAux ms' pol pState msg processApplyAux [] p' s' m' = lift $ applyPolicy p' s' m' processApplyAux (h:hs) p' s' m' = do attempt <- lift $ h m' case attempt of Nothing -> processApplyAux hs p' s' m' Just act' -> return act' drainMailbox :: GenProcess s () drainMailbox = do -- see note [timer handling whilst draining the process' mailbox] ps <- processState pd <- processDefinition pp <- processPriorities ut <- gets usrTimers let ts = Map.foldrWithKey (\k (t, _) ms -> ms ++ matchKey k t) [] ut let ms = ts ++ (matchAny (return . Right) : (mkMatchers ps pd)) timerAcc <- timeoutPolicy >>= \spec -> case spec of RecvTimer _ -> return Nothing RecvMaxBacklog cnt -> return $ Just cnt -- see note [handling async exceptions during non-blocking reads] -- Also note that we only use the system timeout here, dropping into the -- user timeout only if we end up in a blocking read on the mailbox. -- mask_ $ do tt <- maybeStartTimer drainAux ps pp timerAcc (ms ++ matchTimeout tt) (lift $ stopTimer tt) >>= setDrainTimeout drainAux :: s -> [DispatchPriority s] -> Limit -> [Match (Either TimedOut Message)] -> GenProcess s () drainAux ps' pp' maxbq ms = do (cnt, m) <- scanMailbox maxbq ms case m of Nothing -> return () Just (Right m') -> do enqueueMessage ps' pp' m' drainAux ps' pp' cnt ms Just (Left TimedOut) -> return () Just (Left (Yield i)) -> -- we saw a user defined timer fire, and will have an associated message... -- this is a bit complex, we have to enqueue the message and remove the timer -- the latter part of which is handled for us by consumeTimer consumeTimer i push >> drainAux ps' pp' cnt ms maybeStartTimer :: GenProcess s Timer maybeStartTimer = do tp <- timeoutPolicy t <- case tp of RecvTimer d -> (lift $ startTimer $ Delay d) _ -> return $ delayTimer Infinity setDrainTimeout t return t scanMailbox :: Limit -> [Match (Either TimedOut Message)] -> GenProcess s (Limit, Maybe (Either TimedOut Message)) scanMailbox lim ms | Just 0 <- lim = return (lim, Just $ Left TimedOut) | Just c <- lim = do {- non-blocking read on our mailbox, any external inputs, plus whatever match specs the TimeoutManager gives -} lift $ fmap (Just (c - 1), ) (receiveTimeout 0 ms) | otherwise = lift $ fmap (lim, ) (receiveTimeout 0 ms) -- see note [timer handling whilst draining the process' mailbox] drainOrTimeout :: GenProcess s (ProcessAction s) drainOrTimeout = do pd <- processDefinition ps <- processState ud <- currentTimeout mr <- mkMatchRunners let ump = unhandledMessagePolicy pd hto = timeoutHandler pd matches = mr ++ ((matchMessage return):map (matchExtern ump ps) (externHandlers pd)) recv = case ud of Infinity -> lift $ fmap Just (receiveWait matches) NoDelay -> lift $ receiveTimeout 0 matches Delay i -> lift $ receiveTimeout (asTimeout i) matches -- see note [masking async exceptions during recv] mask $ \restore -> recv >>= \r -> case r of Nothing -> restore $ lift $ hto ps ud Just m -> do pp <- processPriorities enqueueMessage ps pp m -- Returning @ProcessSkip@ simply causes us to go back into -- listening mode until we hit RecvTimeoutPolicy restore $ return ProcessSkip mkMatchRunners :: GenProcess s [Match Message] mkMatchRunners = do ut <- gets usrTimers fn <- mkRunner let ms = Map.foldrWithKey (\k (t, _) ms' -> ms' ++ matchRun fn k t) [] ut return ms mkRunner :: GenProcess s (TimerKey -> Process Message) mkRunner = do st <- ST.get let fn = \k -> do (m, _) <- runProcess st (consumeTimer k return) return m return fn mkMatchers :: s -> ProcessDefinition s -> [Match (Either TimedOut Message)] mkMatchers st df = map (matchMapExtern (unhandledMessagePolicy df) st toRight) (externHandlers df) toRight :: Message -> Either TimedOut Message toRight = Right -- note [handling async exceptions during non-blocking reads] -- Our golden rule is that if we've dequeued any kind of Message at all -- from the process mailbox (or input channels), we must not /lose/ it -- if an asynchronous exception arrives. We therefore mask when we perform a -- non-blocking scan on the mailbox, and whilst we enqueue messages. -- -- If an initial scan of the mailbox yields no data, we fall back to making -- a blocking read; See note [masking async exceptions during recv]. -- -- Once messages have been safely moved from the mailbox to our priority queue, -- we restore the masking state whilst running handlers. -- -- note [timer handling whilst draining the process' mailbox] -- To prevent a DOS vector - and quite a likely accidental one at that - we do not -- sit draining the mailbox indefinitely, since continuous reading would thus -- leave us unable to process any inputs and we'd eventually run out of memory. -- Instead, the PrioritisedProcessDefinition holds a RecvTimeoutPolicy which can -- hold either a max-messages-processed limit or a timeout value. Using whichever -- policy is provided, drainMessageQueue will stop attempting to receive new mail -- either once the message count limit is exceeded or the timer expires, at which -- point we go back to processNext. -- note [masking async exceptions during recv] -- Reading the process' mailbox is mask'ed anyway, however this only -- covers dequeue on the underlying CQueue, such that either before -- the dequeue takes place, or after (during evaluation of the result, -- or execution of the discovered @Match@ for the message), we can still -- be terminated by an asynchronous exception. This is wrong, from the -- perspective of a managed process, since in the case of an exit signal -- we might handle the exception, at which point we've dequeued and -- subsequently lost a message. -- -- Masking recv then, prevents this from happening, and is relatively -- safe, because we know the following (having written all the handlers -- explicitly ourselves): -- -- 1. each handler does nothing more than return the underlying message -- 2. in the most complex case, we have @Left . unsafeWrapMessage@ or -- @fmap Right readSTM thing@ inside of @matchSTM@ -- 3. We should not, therefore, introduce any uninterruptible behaviour -- 4. We cannot, however, be certain that this holds true for decoding -- (and subsequent calls into Binary and/or Bytestrings), so at best -- we can mask, but not uninterruptibleMask -- -- NB: According to /qnikst/, atomicModifyIORef' does not require us to -- use uninterruptibleMask anyway, so this is fine... -- -------------------------------------------------------------------------------- -- Ordinary/Blocking Mailbox Handling -- -------------------------------------------------------------------------------- -- TODO: wrap recvLoop in the same exception handling as precvLoop -- notably, we need to ensure the shutdownHandler runs even in the face -- of exceptions, and it would be useful/good IMO to pass an IORef for -- the state, so we can have a decent LastKnown value for it -- | Managed process loop. -- -- Evaluating this function will cause the caller to enter a server loop, -- constantly reading messages from its mailbox (and/or other supplied control -- planes) and passing these to handler functions in the supplied process -- definition. Only when it is determined that the server process should -- terminate - either by the handlers deciding to stop the process, or by an -- unhandled exit signal or other form of failure condition (e.g. synchronous or -- asynchronous exceptions). -- recvLoop :: ProcessDefinition s -> s -> Delay -> Process ExitReason recvLoop pDef pState recvDelay = let p = unhandledMessagePolicy pDef handleTimeout = timeoutHandler pDef handleStop = shutdownHandler pDef shutdown' = matchDispatch p pState shutdownHandler' extMatchers = map (matchDispatch p pState) (externHandlers pDef) matchers = extMatchers ++ (map (matchDispatch p pState) (apiHandlers pDef)) ex' = (trapExit:(exitHandlers pDef)) ms' = (shutdown':matchers) ++ matchAux p pState (infoHandlers pDef) in do ac <- catchesExit (processReceive ms' handleTimeout pState recvDelay) (map (\d' -> (dispatchExit d') pState) ex') case ac of ProcessSkip -> recvLoop pDef pState recvDelay -- TODO: handle differently... (ProcessContinue s') -> recvLoop pDef s' recvDelay (ProcessTimeout t' s') -> recvLoop pDef s' t' (ProcessHibernate d' s') -> block d' >> recvLoop pDef s' recvDelay (ProcessStop r) -> handleStop (LastKnown pState) r >> return (r :: ExitReason) (ProcessStopping s' r) -> handleStop (LastKnown s') r >> return (r :: ExitReason) (ProcessBecome d' s') -> recvLoop d' s' recvDelay (ProcessActivity _) -> die $ "recvLoop.InvalidState - ProcessActivityNotSupported" (ProcessExpression _) -> die $ "recvLoop.InvalidState - ProcessExpressionNotSupported" where matchAux :: UnhandledMessagePolicy -> s -> [DeferredDispatcher s] -> [Match (ProcessAction s)] matchAux p ps ds = [matchAny (auxHandler (applyPolicy p ps) ps ds)] auxHandler :: (Message -> Process (ProcessAction s)) -> s -> [DeferredDispatcher s] -> Message -> Process (ProcessAction s) auxHandler policy _ [] msg = policy msg auxHandler policy st (d:ds :: [DeferredDispatcher s]) msg | length ds > 0 = let dh = dispatchInfo d in do -- NB: we *do not* want to terminate/dead-letter messages until -- we've exhausted all the possible info handlers m <- dh st msg case m of Nothing -> auxHandler policy st ds msg Just act' -> return act' -- but here we *do* let the policy kick in | otherwise = let dh = dispatchInfo d in do m <- dh st msg case m of Nothing -> policy msg Just act' -> return act' processReceive :: [Match (ProcessAction s)] -> TimeoutHandler s -> s -> Delay -> Process (ProcessAction s) processReceive ms handleTimeout st d = do next <- recv ms d case next of Nothing -> handleTimeout st d Just pa -> return pa recv :: [Match (ProcessAction s)] -> Delay -> Process (Maybe (ProcessAction s)) recv matches d' = case d' of Infinity -> receiveWait matches >>= return . Just NoDelay -> receiveTimeout 0 matches Delay t' -> receiveTimeout (asTimeout t') matches -------------------------------------------------------------------------------- -- Utilities -- -------------------------------------------------------------------------------- -- an explicit 'cast' giving 'Shutdown' will stop the server gracefully shutdownHandler' :: Dispatcher s shutdownHandler' = handleCast (\_ Shutdown -> stop $ ExitNormal) -- @(ProcessExitException from ExitShutdown)@ will stop the server gracefully trapExit :: ExitSignalDispatcher s trapExit = handleExitIf (\_ e -> e == ExitShutdown) (\_ _ (r :: ExitReason) -> stop r) block :: TimeInterval -> Process () block i = void $ receiveTimeout (asTimeout i) [ match (\(_ :: TimedOut) -> return ()) ] applyPolicy :: UnhandledMessagePolicy -> s -> Message -> Process (ProcessAction s) applyPolicy p s m = case p of Terminate -> stop $ ExitOther "UnhandledInput" DeadLetter pid -> forward m pid >> continue s Drop -> continue s Log -> logIt >> continue s where logIt = Log.report Log.info Log.logChannel $ "Unhandled Gen Input Message: " ++ (show m)