module Control.Distributed.Process.ManagedProcess.Internal.GenProcess
(recvLoop, precvLoop) where
import Control.Applicative ((<$>))
import Control.Concurrent (threadDelay)
import Control.Concurrent.STM hiding (check)
import Control.Distributed.Process hiding (call, Message)
import qualified Control.Distributed.Process as P (Message)
import Control.Distributed.Process.ManagedProcess.Server
import Control.Distributed.Process.ManagedProcess.Internal.Types
import Control.Distributed.Process.Extras.Internal.Queue.PriorityQ
( PriorityQ
, enqueue
, dequeue
)
import qualified Control.Distributed.Process.Extras.Internal.Queue.PriorityQ as PriorityQ
( empty
)
import Control.Distributed.Process.Extras
( ExitReason(..)
, Shutdown(..)
)
import qualified Control.Distributed.Process.Extras.SystemLog as Log
import Control.Distributed.Process.Extras.Time
import Control.Distributed.Process.Extras.Timer
( cancelTimer
, runAfter
, TimerRef
)
import Control.Monad (void)
import Prelude hiding (init)
type Queue = PriorityQ Int P.Message
type TimeoutSpec = (Delay, Maybe (TimerRef, (STM ())))
data TimeoutAction s = Stop s ExitReason | Go Delay s
precvLoop :: PrioritisedProcessDefinition s -> s -> Delay -> Process ExitReason
precvLoop ppDef pState recvDelay = do
void $ verify $ processDef ppDef
tref <- startTimer recvDelay
recvQueue ppDef pState tref $ PriorityQ.empty
where
verify pDef = mapM_ disallowCC $ apiHandlers pDef
disallowCC (DispatchCC _ _) = die $ ExitOther "IllegalControlChannel"
disallowCC _ = return ()
recvQueue :: PrioritisedProcessDefinition s
-> s
-> TimeoutSpec
-> Queue
-> Process ExitReason
recvQueue p s t q =
let pDef = processDef p
ps = priorities p
in do (ac, d, q') <- catchExit (processNext pDef ps s t q)
(\_ (r :: ExitReason) ->
return (ProcessStop r, Infinity, q))
nextAction ac d q'
where
nextAction ac d q'
| ProcessContinue s' <- ac = recvQueueAux p (priorities p) s' d q'
| ProcessTimeout t' s' <- ac = recvQueueAux p (priorities p) s' t' q'
| ProcessHibernate d' s' <- ac = block d' >> recvQueueAux p (priorities p) s' d q'
| ProcessStop r <- ac = (shutdownHandler $ processDef p) s r >> return r
| ProcessStopping s' r <- ac = (shutdownHandler $ processDef p) s' r >> return r
| otherwise = die "IllegalState"
recvQueueAux ppDef prioritizers pState delay queue =
let ex = (trapExit:(exitHandlers $ processDef ppDef))
eh = map (\d' -> (dispatchExit d') pState) ex
in (do t' <- startTimer delay
mq <- drainMessageQueue pState prioritizers queue
recvQueue ppDef pState t' mq)
`catchExit`
(\pid (reason :: ExitReason) -> do
let pd = processDef ppDef
let ps = pState
let pq = queue
let em = unsafeWrapMessage reason
(a, d, q') <- findExitHandlerOrStop pd ps pq eh pid em
nextAction a d q')
findExitHandlerOrStop :: ProcessDefinition s
-> s
-> Queue
-> [ProcessId -> P.Message -> Process (Maybe (ProcessAction s))]
-> ProcessId
-> P.Message
-> Process (ProcessAction s, Delay, Queue)
findExitHandlerOrStop _ _ pq [] _ er = do
mEr <- unwrapMessage er :: Process (Maybe ExitReason)
case mEr of
Nothing -> die "InvalidExitHandler"
Just er' -> return (ProcessStop er', Infinity, pq)
findExitHandlerOrStop pd ps pq (eh:ehs) pid er = do
mAct <- eh pid er
case mAct of
Nothing -> findExitHandlerOrStop pd ps pq ehs pid er
Just pa -> return (pa, Infinity, pq)
processNext def ps' pState tSpec queue =
let ex = (trapExit:(exitHandlers def))
h = timeoutHandler def in do
timedOut <- checkTimer pState tSpec h
case timedOut of
Stop s' r -> return $ (ProcessStopping s' r, (fst tSpec), queue)
Go t' s' -> do
case (dequeue queue) of
Nothing -> do
drainOrTimeout s' t' queue ps' h
Just (m', q') -> do
act <- catchesExit (processApply def s' m')
(map (\d' -> (dispatchExit d') s') ex)
return (act, t', q')
processApply def pState msg =
let pol = unhandledMessagePolicy def
apiMatchers = map (dynHandleMessage pol pState) (apiHandlers def)
infoMatchers = map (dynHandleMessage pol pState) (infoHandlers def)
shutdown' = dynHandleMessage pol pState shutdownHandler'
ms' = (shutdown':apiMatchers) ++ infoMatchers
in processApplyAux ms' pol pState msg
processApplyAux [] p' s' m' = applyPolicy p' s' m'
processApplyAux (h:hs) p' s' m' = do
attempt <- h m'
case attempt of
Nothing -> processApplyAux hs p' s' m'
Just act -> return act
drainOrTimeout pState delay queue ps' h = do
let matches = [ matchMessage return ]
recv = case delay of
Infinity -> receiveWait matches >>= return . Just
NoDelay -> receiveTimeout 0 matches
Delay i -> receiveTimeout (asTimeout i) matches in do
r <- recv
case r of
Nothing -> h pState delay >>= \act -> return $ (act, delay, queue)
Just m -> do
queue' <- enqueueMessage pState ps' m queue
return $ (ProcessContinue pState, delay, queue')
drainMessageQueue :: s -> [DispatchPriority s] -> Queue -> Process Queue
drainMessageQueue pState priorities' queue = do
m <- receiveTimeout 0 [ matchMessage return ]
case m of
Nothing -> return queue
Just m' -> do
queue' <- enqueueMessage pState priorities' m' queue
drainMessageQueue pState priorities' queue'
enqueueMessage :: s
-> [DispatchPriority s]
-> P.Message
-> Queue
-> Process Queue
enqueueMessage _ [] m' q = return $ enqueue (1 :: Int) m' q
enqueueMessage s (p:ps) m' q = let checkPrio = prioritise p s in do
checkPrio m' >>= maybeEnqueue s m' q ps
where
maybeEnqueue :: s
-> P.Message
-> Queue
-> [DispatchPriority s]
-> Maybe (Int, P.Message)
-> Process Queue
maybeEnqueue s' msg q' ps' Nothing = enqueueMessage s' ps' msg q'
maybeEnqueue _ _ q' _ (Just (i, m)) = return $ enqueue (i * (1 :: Int)) m q'
recvLoop :: ProcessDefinition s -> s -> Delay -> Process ExitReason
recvLoop pDef pState recvDelay =
let p = unhandledMessagePolicy pDef
handleTimeout = timeoutHandler pDef
handleStop = shutdownHandler pDef
shutdown' = matchDispatch p pState shutdownHandler'
matchers = map (matchDispatch p pState) (apiHandlers pDef)
ex' = (trapExit:(exitHandlers pDef))
ms' = (shutdown':matchers) ++ matchAux p pState (infoHandlers pDef)
in do
ac <- catchesExit (processReceive ms' handleTimeout pState recvDelay)
(map (\d' -> (dispatchExit d') pState) ex')
case ac of
(ProcessContinue s') -> recvLoop pDef s' recvDelay
(ProcessTimeout t' s') -> recvLoop pDef s' t'
(ProcessHibernate d' s') -> block d' >> recvLoop pDef s' recvDelay
(ProcessStop r) -> handleStop pState r >> return (r :: ExitReason)
(ProcessStopping s' r) -> handleStop s' r >> return (r :: ExitReason)
where
matchAux :: UnhandledMessagePolicy
-> s
-> [DeferredDispatcher s]
-> [Match (ProcessAction s)]
matchAux p ps ds = [matchAny (auxHandler (applyPolicy p ps) ps ds)]
auxHandler :: (P.Message -> Process (ProcessAction s))
-> s
-> [DeferredDispatcher s]
-> P.Message
-> Process (ProcessAction s)
auxHandler policy _ [] msg = policy msg
auxHandler policy st (d:ds :: [DeferredDispatcher s]) msg
| length ds > 0 = let dh = dispatchInfo d in do
m <- dh st msg
case m of
Nothing -> auxHandler policy st ds msg
Just act -> return act
| otherwise = let dh = dispatchInfo d in do
m <- dh st msg
case m of
Nothing -> policy msg
Just act -> return act
processReceive :: [Match (ProcessAction s)]
-> TimeoutHandler s
-> s
-> Delay
-> Process (ProcessAction s)
processReceive ms handleTimeout st d = do
next <- recv ms d
case next of
Nothing -> handleTimeout st d
Just pa -> return pa
recv :: [Match (ProcessAction s)]
-> Delay
-> Process (Maybe (ProcessAction s))
recv matches d' =
case d' of
Infinity -> receiveWait matches >>= return . Just
NoDelay -> receiveTimeout 0 matches
Delay t' -> receiveTimeout (asTimeout t') matches
startTimer :: Delay -> Process TimeoutSpec
startTimer d
| Delay t <- d = do sig <- liftIO $ newEmptyTMVarIO
tref <- runAfter t $ liftIO $ atomically $ putTMVar sig ()
return (d, Just (tref, (readTMVar sig)))
| otherwise = return (d, Nothing)
checkTimer :: s
-> TimeoutSpec
-> TimeoutHandler s
-> Process (TimeoutAction s)
checkTimer pState spec handler = let delay = fst spec in do
timedOut <- pollTimer spec
case timedOut of
False -> go spec pState
True -> do
act <- handler pState delay
case act of
ProcessTimeout t' s' -> return $ Go t' s'
ProcessStop r -> return $ Stop pState r
ProcessStopping s' r -> return $ Stop s' r
ProcessHibernate d' s' -> block d' >> go spec s'
ProcessContinue s' -> go spec s'
where
go d s = return $ Go (fst d) s
pollTimer :: TimeoutSpec -> Process Bool
pollTimer (_, Nothing ) = return False
pollTimer (_, Just (tref, sig)) = do
cancelTimer tref
gotSignal <- liftIO $ atomically $ pollSTM sig
return $ maybe False (const True) gotSignal
where
pollSTM :: (STM ()) -> STM (Maybe ())
pollSTM sig' = (Just <$> sig') `orElse` return Nothing
shutdownHandler' :: Dispatcher s
shutdownHandler' = handleCast (\_ Shutdown -> stop $ ExitNormal)
trapExit :: ExitSignalDispatcher s
trapExit = handleExit (\_ _ (r :: ExitReason) -> stop r)
block :: TimeInterval -> Process ()
block i = liftIO $ threadDelay (asTimeout i)
applyPolicy :: UnhandledMessagePolicy
-> s
-> P.Message
-> Process (ProcessAction s)
applyPolicy p s m =
case p of
Terminate -> stop $ ExitOther "UnhandledInput"
DeadLetter pid -> forward m pid >> continue s
Drop -> continue s
Log -> logIt >> continue s
where
logIt =
Log.report Log.info Log.logChannel $ "Unhandled Gen Input Message: " ++ (show m)