{-# LANGUAGE DeriveDataTypeable #-} {-# LANGUAGE ExistentialQuantification #-} {-# LANGUAGE DeriveGeneric #-} {-# LANGUAGE StandaloneDeriving #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE GeneralizedNewtypeDeriving #-} {-# LANGUAGE LiberalTypeSynonyms #-} {-# LANGUAGE Rank2Types #-} {-# LANGUAGE UndecidableInstances #-} {-# LANGUAGE FlexibleInstances #-} {-# LANGUAGE TypeFamilies #-} {-# LANGUAGE MultiParamTypeClasses #-} {-# LANGUAGE FunctionalDependencies #-} -- | Types used throughout the ManagedProcess framework module Control.Distributed.Process.ManagedProcess.Internal.Types ( -- * Exported data types InitResult(..) , GenProcess() , runProcess , lift , liftIO , ProcessState(..) , State , Queue , Limit , Condition(..) , ProcessAction(..) , ProcessReply(..) , Action , Reply , ActionHandler , CallHandler , CastHandler , StatelessHandler , DeferredCallHandler , StatelessCallHandler , InfoHandler , ChannelHandler , StatelessChannelHandler , InitHandler , ShutdownHandler , ExitState(..) , isCleanShutdown , exitState , TimeoutHandler , UnhandledMessagePolicy(..) , ProcessDefinition(..) , Priority(..) , DispatchPriority(..) , DispatchFilter(..) , Filter(..) -- , Check(..) , PrioritisedProcessDefinition(..) , RecvTimeoutPolicy(..) , ControlChannel(..) , newControlChan , ControlPort(..) , channelControlPort , Dispatcher(..) , ExternDispatcher(..) , DeferredDispatcher(..) , ExitSignalDispatcher(..) , MessageMatcher(..) , ExternMatcher(..) , Message(..) , CallResponse(..) , CallId , CallRef(..) , CallRejected(..) , makeRef , caller , rejectToCaller , recipient , tag , initCall , unsafeInitCall , waitResponse ) where import Control.Concurrent.STM (STM) import Control.Distributed.Process hiding (Message, mask, finally, liftIO) import qualified Control.Distributed.Process as P (Message, liftIO) import Control.Distributed.Process.Serializable import Control.Distributed.Process.Extras ( Recipient(..) , ExitReason(..) , Addressable , Resolvable(..) , Routable(..) , NFSerializable ) import Control.Distributed.Process.ManagedProcess.Internal.PriorityQueue ( PriorityQ ) import Control.Distributed.Process.Extras.Internal.Types ( resolveOrDie ) import Control.Distributed.Process.Extras.Time import Control.Distributed.Process.ManagedProcess.Timer (Timer, TimerKey) import Control.DeepSeq (NFData(..)) import Control.Monad.Fix (MonadFix) import Control.Monad.Catch ( catch , throwM , uninterruptibleMask , mask , finally , MonadThrow , MonadCatch , MonadMask ) import qualified Control.Monad.Catch as Catch ( catch , throwM ) import Control.Monad.IO.Class (MonadIO) import qualified Control.Monad.State.Strict as ST ( MonadState , StateT , get , lift , runStateT ) import Data.Binary hiding (decode) import Data.Map.Strict (Map) import Data.Typeable (Typeable) import Data.IORef (IORef) import Prelude hiding (init) import GHC.Generics -------------------------------------------------------------------------------- -- API -- -------------------------------------------------------------------------------- -- | wrapper for a @MonitorRef@ type CallId = MonitorRef -- | Wraps a consumer of the call API newtype CallRef a = CallRef { unCaller :: (Recipient, CallId) } deriving (Eq, Show, Typeable, Generic) -- | Retrieve the @Recipient@ for a @CallRef@. recipient :: CallRef a -> Recipient recipient = fst . unCaller -- | Retrieve the @CallId@ for a @CallRef@. tag :: CallRef a -> CallId tag = snd . unCaller instance Binary (CallRef a) where instance NFData (CallRef a) where rnf (CallRef x) = rnf x `seq` () -- | Creates a @CallRef@ for the given @Recipient@ and @CallId@ makeRef :: Recipient -> CallId -> CallRef a makeRef r c = CallRef (r, c) -- | @Message@ type used internally by the call, cast, and rpcChan APIs. data Message a b = CastMessage a | CallMessage a (CallRef b) | ChanMessage a (SendPort b) deriving (Typeable, Generic) -- | Retrieve the @Recipient@ from a @Message@. If the supplied message is -- a /cast/ or /chan/ message will evaluate to @Nothing@, otherwise @Just ref@. caller :: forall a b . Message a b -> Maybe Recipient caller (CallMessage _ ref) = Just $ recipient ref caller _ = Nothing -- | Reject a /call/ message with the supplied string. Sends @CallRejected@ to -- the recipient if the input is a @CallMessage@, otherwise has no side effects. rejectToCaller :: forall a b . Message a b -> String -> Process () rejectToCaller (CallMessage _ ref) m = sendTo ref (CallRejected m (tag ref)) rejectToCaller _ _ = return () instance (Serializable a, Serializable b) => Binary (Message a b) where instance (NFSerializable a, NFSerializable b) => NFData (Message a b) where rnf (CastMessage a) = rnf a `seq` () rnf (CallMessage a b) = rnf a `seq` rnf b `seq` () rnf (ChanMessage a b) = rnf a `seq` rnf b `seq` () deriving instance (Eq a, Eq b) => Eq (Message a b) deriving instance (Show a, Show b) => Show (Message a b) -- | Response type for the call API data CallResponse a = CallResponse a CallId deriving (Typeable, Generic) instance Serializable a => Binary (CallResponse a) instance NFSerializable a => NFData (CallResponse a) where rnf (CallResponse a c) = rnf a `seq` rnf c `seq` () deriving instance Eq a => Eq (CallResponse a) deriving instance Show a => Show (CallResponse a) -- | Sent to a consumer of the /call/ API when a server filter expression -- explicitly rejects an incoming call message. data CallRejected = CallRejected String CallId deriving (Typeable, Generic, Show, Eq) instance Binary CallRejected where instance NFData CallRejected where instance Resolvable (CallRef a) where resolve (CallRef (r, _)) = resolve r instance Routable (CallRef a) where sendTo (CallRef (c, _)) = sendTo c unsafeSendTo (CallRef (c, _)) = unsafeSendTo c -- | Return type for and 'InitHandler' expression. data InitResult s = InitOk s Delay {- ^ a successful initialisation, initial state and timeout -} | InitStop String {- ^ failed initialisation and the reason, this will result in an error -} | InitIgnore {- ^ the process has decided not to continue starting - this is not an error -} deriving (Typeable) -- | Represent a max-backlog from RecvTimeoutPolicy type Limit = Maybe Int -- | Internal priority queue, used by prioritised processes. type Queue = PriorityQ Int P.Message -- | Map from @TimerKey@ to @(Timer, Message)@. type TimerMap = Map TimerKey (Timer, P.Message) -- | Internal state of a prioritised process loop. data ProcessState s = ProcessState { timeoutSpec :: RecvTimeoutPolicy , procDef :: ProcessDefinition s , procPrio :: [DispatchPriority s] , procFilters :: [DispatchFilter s] , usrTimeout :: Delay , sysTimeout :: Timer , usrTimers :: TimerMap , internalQ :: Queue , procState :: s } -- | Prioritised process state, held as an @IORef@. type State s = IORef (ProcessState s) -- | StateT based monad for prioritised process loops. newtype GenProcess s a = GenProcess { unManaged :: ST.StateT (State s) Process a } deriving ( Functor , Monad , ST.MonadState (State s) , MonadIO , MonadFix , Typeable , Applicative ) instance forall s . MonadThrow (GenProcess s) where throwM = lift . Catch.throwM instance forall s . MonadCatch (GenProcess s) where catch p h = do pSt <- ST.get -- we can throw away our state since it is always accessed via an IORef (a, _) <- lift $ Catch.catch (runProcess pSt p) (runProcess pSt . h) return a instance forall s . MonadMask (GenProcess s) where mask p = do pSt <- ST.get lift $ mask $ \restore -> do (a, _) <- runProcess pSt (p (liftRestore restore)) return a where liftRestore restoreP = \p2 -> do ourSTate <- ST.get (a', _) <- lift $ restoreP $ runProcess ourSTate p2 return a' uninterruptibleMask p = do pSt <- ST.get (a, _) <- lift $ uninterruptibleMask $ \restore -> runProcess pSt (p (liftRestore restore)) return a where liftRestore restoreP = \p2 -> do ourSTate <- ST.get (a', _) <- lift $ restoreP $ runProcess ourSTate p2 return a' -- | Run an action in the @GenProcess@ monad. runProcess :: State s -> GenProcess s a -> Process (a, State s) runProcess state proc = ST.runStateT (unManaged proc) state -- | Lift an action in the @Process@ monad to @GenProcess@. lift :: Process a -> GenProcess s a lift p = GenProcess $ ST.lift p -- | Lift an IO action directly into @GenProcess@, @liftIO = lift . Process.LiftIO@. liftIO :: IO a -> GenProcess s a liftIO = lift . P.liftIO -- | The action taken by a process after a handler has run and its updated state. -- See "Control.Distributed.Process.ManagedProcess.Server.continue" -- "Control.Distributed.Process.ManagedProcess.Server.timeoutAfter" -- "Control.Distributed.Process.ManagedProcess.Server.hibernate" -- "Control.Distributed.Process.ManagedProcess.Server.stop" -- "Control.Distributed.Process.ManagedProcess.Server.stopWith" -- -- Also see "Control.Distributed.Process.Management.Priority.act" and -- "Control.Distributed.Process.ManagedProcess.Priority.runAfter". -- -- And other actions. This type should not be used directly. data ProcessAction s = ProcessSkip | ProcessActivity (GenProcess s ()) -- ^ run the given activity | ProcessExpression (GenProcess s (ProcessAction s)) -- ^ evaluate an expression | ProcessContinue s -- ^ continue with (possibly new) state | ProcessTimeout Delay s -- ^ timeout if no messages are received | ProcessHibernate TimeInterval s -- ^ hibernate for /delay/ | ProcessStop ExitReason -- ^ stop the process, giving @ExitReason@ | ProcessStopping s ExitReason -- ^ stop the process with @ExitReason@, with updated state | ProcessBecome (ProcessDefinition s) s -- ^ changes the current process definition -- | Returned from handlers for the synchronous 'call' protocol, encapsulates -- the reply data /and/ the action to take after sending the reply. A handler -- can return @NoReply@ if they wish to ignore the call. data ProcessReply r s = ProcessReply r (ProcessAction s) | ProcessReject String (ProcessAction s) -- TODO: can we use a functional dependency here? | NoReply (ProcessAction s) -- | Wraps a predicate that is used to determine whether or not a handler -- is valid based on some combination of the current process state, the -- type and/or value of the input message or both. data Condition s m = Condition (s -> m -> Bool) -- ^ predicated on the process state /and/ the message | State (s -> Bool) -- ^ predicated on the process state only | Input (m -> Bool) -- ^ predicated on the input message only {- class Check c s m | s m -> c where -- data Checker c :: * -> * -> * -- apply :: s -> m -> Checker c s m -> Bool apply :: s -> m -> c -> Bool instance Check (Condition s m) s m where -- data Checker (Condition s m) s m = CheckCond (Condition s m) apply s m (Condition f) = f s m apply s _ (State f) = f s apply _ m (Input f) = f m instance Check (s -> m -> Bool) s m where -- data Checker (s -> m -> Bool) s m = CheckF (s -> m -> Bool) apply s m f = f s m -} -- | Informs a /shutdown handler/ of whether it is running due to a clean -- shutdown, or in response to an unhandled exception. data ExitState s = CleanShutdown s -- ^ given when an ordered shutdown is underway | LastKnown s {- ^ given due to an unhandled exception, passing the last known state -} -- | @True@ if the @ExitState@ is @CleanShutdown@, otherwise @False@. isCleanShutdown :: ExitState s -> Bool isCleanShutdown (CleanShutdown _) = True isCleanShutdown _ = False -- | Evaluates to the @s@ state datum in the given @ExitState@. exitState :: ExitState s -> s exitState (CleanShutdown s) = s exitState (LastKnown s) = s -- | An action (server state transition) in the @Process@ monad type Action s = Process (ProcessAction s) -- | An action (server state transition) causing a reply to a caller, in the -- @Process@ monad type Reply b s = Process (ProcessReply b s) -- | An expression used to handle a message type ActionHandler s a = s -> a -> Action s -- | An expression used to handle a message and providing a reply type CallHandler s a b = s -> a -> Reply b s -- | An expression used to ignore server state during handling type StatelessHandler s a = a -> (s -> Action s) -- | An expression used to handle a /call/ message where the reply is deferred -- via the 'CallRef' type DeferredCallHandler s a b = CallRef b -> CallHandler s a b -- | An expression used to handle a /call/ message ignoring server state type StatelessCallHandler s a b = CallRef b -> a -> Reply b s -- | An expression used to handle a /cast/ message type CastHandler s a = ActionHandler s a -- | An expression used to handle an /info/ message type InfoHandler s a = ActionHandler s a -- | An expression used to handle a /channel/ message type ChannelHandler s a b = SendPort b -> ActionHandler s a -- | An expression used to handle a /channel/ message in a stateless process type StatelessChannelHandler s a b = SendPort b -> StatelessHandler s a -- | An expression used to initialise a process with its state type InitHandler a s = a -> Process (InitResult s) -- | An expression used to handle process termination type ShutdownHandler s = ExitState s -> ExitReason -> Process () -- | An expression used to handle process timeouts type TimeoutHandler s = ActionHandler s Delay -- dispatching to implementation callbacks -- | Provides a means for servers to listen on a separate, typed /control/ -- channel, thereby segregating the channel from their regular -- (and potentially busy) mailbox. newtype ControlChannel m = ControlChannel { unControl :: (SendPort (Message m ()), ReceivePort (Message m ())) } -- | Creates a new 'ControlChannel'. newControlChan :: (Serializable m) => Process (ControlChannel m) newControlChan = fmap ControlChannel newChan -- | The writable end of a 'ControlChannel'. -- newtype ControlPort m = ControlPort { unPort :: SendPort (Message m ()) } deriving (Show) deriving instance (Serializable m) => Binary (ControlPort m) instance Eq (ControlPort m) where a == b = unPort a == unPort b -- | Obtain an opaque expression for communicating with a 'ControlChannel'. -- channelControlPort :: ControlChannel m -> ControlPort m channelControlPort cc = ControlPort $ fst $ unControl cc -- | Given as the result of evaluating a "DispatchFilter". This type is intended -- for internal use. For an API for working with filters, -- see "Control.Distributed.Process.ManagedProcess.Priority". data Filter s = FilterOk s | FilterSafe s | forall m . (Show m) => FilterReject m s | FilterSkip s | FilterStop s ExitReason -- | Provides dispatch from a variety of inputs to a typed filter handler. data DispatchFilter s = forall a b . (Serializable a, Serializable b) => FilterApi { apiFilter :: s -> Message a b -> Process (Filter s) } | forall a . (Serializable a) => FilterAny { anyFilter :: s -> a -> Process (Filter s) } | FilterRaw { rawFilter :: s -> P.Message -> Process (Maybe (Filter s)) } | FilterState { stateFilter :: s -> Process (Maybe (Filter s)) } -- | Provides dispatch from cast and call messages to a typed handler. data Dispatcher s = forall a b . (Serializable a, Serializable b) => Dispatch { dispatch :: s -> Message a b -> Process (ProcessAction s) } | forall a b . (Serializable a, Serializable b) => DispatchIf { dispatch :: s -> Message a b -> Process (ProcessAction s) , dispatchIf :: s -> Message a b -> Bool } -- | Provides dispatch for channels and STM actions data ExternDispatcher s = forall a b . (Serializable a, Serializable b) => DispatchCC -- control channel dispatch { channel :: ReceivePort (Message a b) , dispatchChan :: s -> Message a b -> Process (ProcessAction s) } | forall a . (Serializable a) => DispatchSTM -- arbitrary STM actions { stmAction :: STM a , dispatchStm :: s -> a -> Process (ProcessAction s) , matchStm :: Match P.Message , matchAnyStm :: forall m . (P.Message -> m) -> Match m } -- | Provides dispatch for any input, returns 'Nothing' for unhandled messages. data DeferredDispatcher s = DeferredDispatcher { dispatchInfo :: s -> P.Message -> Process (Maybe (ProcessAction s)) } -- | Provides dispatch for any exit signal - returns 'Nothing' for unhandled exceptions data ExitSignalDispatcher s = ExitSignalDispatcher { dispatchExit :: s -> ProcessId -> P.Message -> Process (Maybe (ProcessAction s)) } -- | Defines the means of dispatching inbound messages to a handler class MessageMatcher d where matchDispatch :: UnhandledMessagePolicy -> s -> d s -> Match (ProcessAction s) instance MessageMatcher Dispatcher where matchDispatch _ s (Dispatch d) = match (d s) matchDispatch _ s (DispatchIf d cond) = matchIf (cond s) (d s) instance MessageMatcher ExternDispatcher where matchDispatch _ s (DispatchCC c d) = matchChan c (d s) matchDispatch _ s (DispatchSTM c d _ _) = matchSTM c (d s) -- | Defines the means of dispatching messages from external channels (e.g. -- those defined in terms of "ControlChannel", and STM actions) to a handler. class ExternMatcher d where matchExtern :: UnhandledMessagePolicy -> s -> d s -> Match P.Message matchMapExtern :: forall m s . UnhandledMessagePolicy -> s -> (P.Message -> m) -> d s -> Match m instance ExternMatcher ExternDispatcher where matchExtern _ _ (DispatchCC c _) = matchChan c (return . unsafeWrapMessage) matchExtern _ _ (DispatchSTM _ _ m _) = m matchMapExtern _ _ f (DispatchCC c _) = matchChan c (return . f . unsafeWrapMessage) matchMapExtern _ _ f (DispatchSTM _ _ _ p) = p f -- | Priority of a message, encoded as an @Int@ newtype Priority a = Priority { getPrio :: Int } -- | Dispatcher for prioritised handlers data DispatchPriority s = PrioritiseCall { prioritise :: s -> P.Message -> Process (Maybe (Int, P.Message)) } | PrioritiseCast { prioritise :: s -> P.Message -> Process (Maybe (Int, P.Message)) } | PrioritiseInfo { prioritise :: s -> P.Message -> Process (Maybe (Int, P.Message)) } -- | For a 'PrioritisedProcessDefinition', this policy determines for how long -- the /receive loop/ should continue draining the process' mailbox before -- processing its received mail (in priority order). -- -- If a prioritised /managed process/ is receiving a lot of messages (into its -- /real/ mailbox), the server might never get around to actually processing its -- inputs. This (mandatory) policy provides a guarantee that eventually (i.e., -- after a specified number of received messages or time interval), the server -- will stop removing messages from its mailbox and process those it has already -- received. -- data RecvTimeoutPolicy = RecvMaxBacklog Int | RecvTimer TimeInterval deriving (Typeable) -- | A @ProcessDefinition@ decorated with @DispatchPriority@ for certain -- input domains. data PrioritisedProcessDefinition s = PrioritisedProcessDefinition { processDef :: ProcessDefinition s , priorities :: [DispatchPriority s] , filters :: [DispatchFilter s] , recvTimeout :: RecvTimeoutPolicy } -- | Policy for handling unexpected messages, i.e., messages which are not -- sent using the 'call' or 'cast' APIs, and which are not handled by any of the -- 'handleInfo' handlers. data UnhandledMessagePolicy = Terminate -- ^ stop immediately, giving @ExitOther "UnhandledInput"@ as the reason | DeadLetter ProcessId -- ^ forward the message to the given recipient | Log -- ^ log messages, then behave identically to @Drop@ | Drop -- ^ dequeue and then drop/ignore the message deriving (Show, Eq) -- | Stores the functions that determine runtime behaviour in response to -- incoming messages and a policy for responding to unhandled messages. data ProcessDefinition s = ProcessDefinition { apiHandlers :: [Dispatcher s] -- ^ functions that handle call/cast messages , infoHandlers :: [DeferredDispatcher s] -- ^ functions that handle non call/cast messages , externHandlers :: [ExternDispatcher s] -- ^ functions that handle control channel and STM inputs , exitHandlers :: [ExitSignalDispatcher s] -- ^ functions that handle exit signals , timeoutHandler :: TimeoutHandler s -- ^ a function that handles timeouts , shutdownHandler :: ShutdownHandler s -- ^ a function that is run just before the process exits , unhandledMessagePolicy :: UnhandledMessagePolicy -- ^ how to deal with unhandled messages } -- note [rpc calls] -- One problem with using plain expect/receive primitives to perform a -- synchronous (round trip) call is that a reply matching the expected type -- could come from anywhere! The Call.hs module uses a unique integer tag to -- distinguish between inputs but this is easy to forge, and forces all callers -- to maintain a tag pool, which is quite onerous. -- -- Here, we use a private (internal) tag based on a 'MonitorRef', which is -- guaranteed to be unique per calling process (in the absence of mallicious -- peers). This is handled throughout the roundtrip, such that the reply will -- either contain the CallId (i.e., the ame 'MonitorRef' with which we're -- tracking the server process) or we'll see the server die. -- -- Of course, the downside to all this is that the monitoring and receiving -- clutters up your mailbox, and if your mailbox is extremely full, could -- incur delays in delivery. The callAsync function provides a neat -- work-around for that, relying on the insulation provided by Async. -- TODO: Generify this /call/ API and use it in Call.hs to avoid tagging -- TODO: the code below should be moved elsewhere. Maybe to Client.hs? -- | The send part of the /call/ client-server interaction. The resulting -- "CallRef" can be used to identify the corrolary response message (if one is -- sent by the server), and is unique to this /call-reply/ pair. initCall :: forall s a b . (Addressable s, Serializable a, Serializable b) => s -> a -> Process (CallRef b) initCall sid msg = do pid <- resolveOrDie sid "initCall: unresolveable address " mRef <- monitor pid self <- getSelfPid let cRef = makeRef (Pid self) mRef in do sendTo pid (CallMessage msg cRef :: Message a b) return cRef -- | Version of @initCall@ that utilises "unsafeSendTo". unsafeInitCall :: forall s a b . ( Addressable s , NFSerializable a , NFSerializable b ) => s -> a -> Process (CallRef b) unsafeInitCall sid msg = do pid <- resolveOrDie sid "unsafeInitCall: unresolveable address " mRef <- monitor pid self <- getSelfPid let cRef = makeRef (Pid self) mRef in do unsafeSendTo pid (CallMessage msg cRef :: Message a b) return cRef -- | Wait on the server's response after an "initCall" has been previously been sent. -- -- This function does /not/ trap asynchronous exceptions. waitResponse :: forall b. (Serializable b) => Maybe TimeInterval -> CallRef b -> Process (Maybe (Either ExitReason b)) waitResponse mTimeout cRef = let (_, mRef) = unCaller cRef matchers = [ matchIf (\((CallResponse _ ref) :: CallResponse b) -> ref == mRef) (\((CallResponse m _) :: CallResponse b) -> return (Right m)) , matchIf (\((CallRejected _ ref)) -> ref == mRef) (\(CallRejected s _) -> return (Left $ ExitOther $ s)) , matchIf (\(ProcessMonitorNotification ref _ _) -> ref == mRef) (\(ProcessMonitorNotification _ _ r) -> return (Left (err r))) ] err r = ExitOther $ show r in case mTimeout of (Just ti) -> finally (receiveTimeout (asTimeout ti) matchers) (unmonitor mRef) Nothing -> finally (fmap Just (receiveWait matchers)) (unmonitor mRef)