{-# LANGUAGE DeriveDataTypeable #-} {-# LANGUAGE DeriveGeneric #-} {-# LANGUAGE StandaloneDeriving #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE PatternGuards #-} {-# LANGUAGE RecordWildCards #-} {-# LANGUAGE FlexibleInstances #-} {-# LANGUAGE EmptyDataDecls #-} {-# LANGUAGE TemplateHaskell #-} {-# LANGUAGE ImpredicativeTypes #-} {-# LANGUAGE UndecidableInstances #-} {-# LANGUAGE MultiParamTypeClasses #-} -- | Internal Exchange Implementation module Control.Distributed.Process.Execution.Exchange.Internal ( Exchange(..) , Message(..) , ExchangeType(..) , startExchange , startSupervised , startSupervisedRef , runExchange , post , postMessage , configureExchange , createMessage , applyHandlers ) where import Control.Concurrent.MVar (MVar, takeMVar, putMVar, newEmptyMVar) import Control.DeepSeq (NFData) import Control.Distributed.Process ( Process , ProcessMonitorNotification(..) , ProcessId , liftIO , spawnLocal , unsafeWrapMessage ) import qualified Control.Distributed.Process as P (Message, link) import Control.Distributed.Process.Serializable hiding (SerializableDict) import Control.Distributed.Process.Extras.Internal.Types ( Resolvable(..) ) import Control.Distributed.Process.Extras.Internal.Primitives ( Linkable(..) ) import Control.Distributed.Process.ManagedProcess ( channelControlPort , handleControlChan , handleInfo , handleRaw , continue , defaultProcess , InitHandler , InitResult(..) , ProcessAction , ProcessDefinition(..) , ControlChannel , ControlPort ) import qualified Control.Distributed.Process.ManagedProcess as MP ( chanServe ) import Control.Distributed.Process.ManagedProcess.UnsafeClient ( sendControlMessage ) import Control.Distributed.Process.Supervisor (SupervisorPid) import Control.Distributed.Process.Extras.Time (Delay(Infinity)) import Data.Binary import Data.Typeable (Typeable) import GHC.Generics import Prelude hiding (drop) {- [design notes] Messages are sent to exchanges and forwarded to clients. An exchange is parameterised by its routing mechanism, which is responsible for maintaining its own client state and selecting the clients to which messages are forwarded. -} -- | Opaque handle to an exchange. -- data Exchange = Exchange { pid :: !ProcessId , cchan :: !(ControlPort ControlMessage) , xType :: !String } deriving (Typeable, Generic, Eq) instance Binary Exchange where instance Show Exchange where show Exchange{..} = (xType ++ ":" ++ (show pid)) instance Resolvable Exchange where resolve = return . Just . pid {- instance Observable Exchange MonitorRef ProcessMonitorNotification where observe = P.monitor . pid unobserve = P.unmonitor observableFrom ref (ProcessMonitorNotification ref' _ r) = return $ if ref' == ref then Just r else Nothing -} instance Linkable Exchange where linkTo = P.link . pid -- we communicate with exchanges using control channels sendCtrlMsg :: Exchange -> ControlMessage -> Process () sendCtrlMsg Exchange{..} = sendControlMessage cchan -- | Messages sent to an exchange can optionally provide a routing -- key and a list of (key, value) headers in addition to the underlying -- payload. data Message = Message { key :: !String -- ^ a /routing key/ for the payload , headers :: ![(String, String)] -- ^ arbitrary key-value headers , payload :: !P.Message -- ^ the underlying @Message@ payload } deriving (Typeable, Generic, Show) instance Binary Message where instance NFData Message where data ControlMessage = Configure !P.Message | Post !Message deriving (Typeable, Generic) instance Binary ControlMessage where instance NFData ControlMessage where -- | Different exchange types are defined using record syntax. -- The 'configureEx' and 'routeEx' API functions are called during the exchange -- lifecycle when incoming traffic arrives. Configuration messages are -- completely arbitrary types and the exchange type author is entirely -- responsible for decoding them. Messages posted to the exchange (see the -- 'Message' data type) are passed to the 'routeEx' API function along with the -- exchange type's own internal state. Both API functions return a new -- (potentially updated) state and run in the @Process@ monad. -- data ExchangeType s = ExchangeType { name :: String , state :: s , configureEx :: s -> P.Message -> Process s , routeEx :: s -> Message -> Process s } -------------------------------------------------------------------------------- -- Starting/Running an Exchange -- -------------------------------------------------------------------------------- -- | Starts an /exchange process/ with the given 'ExchangeType'. startExchange :: forall s. ExchangeType s -> Process Exchange startExchange = doStart Nothing -- | Starts an exchange as part of a supervision tree. -- -- Example: -- > childSpec = toChildStart $ startSupervisedRef exType -- startSupervisedRef :: forall s . ExchangeType s -> SupervisorPid -> Process (ProcessId, P.Message) startSupervisedRef t s = do ex <- startSupervised t s return (pid ex, unsafeWrapMessage ex) -- | Starts an exchange as part of a supervision tree. -- -- Example: -- > childSpec = toChildStart $ startSupervised exType -- startSupervised :: forall s . ExchangeType s -> SupervisorPid -> Process Exchange startSupervised t s = doStart (Just s) t doStart :: Maybe SupervisorPid -> ExchangeType s -> Process Exchange doStart mSp t = do cchan <- liftIO $ newEmptyMVar spawnLocal (maybeLink mSp >> runExchange t cchan) >>= \pid -> do cc <- liftIO $ takeMVar cchan return $ Exchange pid cc (name t) where maybeLink Nothing = return () maybeLink (Just p') = P.link p' runExchange :: forall s. ExchangeType s -> MVar (ControlPort ControlMessage) -> Process () runExchange t tc = MP.chanServe t exInit (processDefinition t tc) exInit :: forall s. InitHandler (ExchangeType s) (ExchangeType s) exInit t = return $ InitOk t Infinity -------------------------------------------------------------------------------- -- Client Facing API -- -------------------------------------------------------------------------------- -- | Posts an arbitrary 'Serializable' datum to an /exchange/. The raw datum is -- wrapped in the 'Message' data type, with its 'key' set to @""@ and its -- 'headers' to @[]@. post :: Serializable a => Exchange -> a -> Process () post ex msg = postMessage ex $ Message "" [] (unsafeWrapMessage msg) -- | Posts a 'Message' to an /exchange/. postMessage :: Exchange -> Message -> Process () postMessage ex msg = msg `seq` sendCtrlMsg ex $ Post msg -- | Sends an arbitrary 'Serializable' datum to an /exchange/, for use as a -- configuration change - see 'configureEx' for details. configureExchange :: Serializable m => Exchange -> m -> Process () configureExchange e m = sendCtrlMsg e $ Configure (unsafeWrapMessage m) -- | Utility for creating a 'Message' datum from its 'key', 'headers' and -- 'payload'. createMessage :: Serializable m => String -> [(String, String)] -> m -> Message createMessage k h m = Message k h $ unsafeWrapMessage m -- | Utility for custom exchange type authors - evaluates a set of primitive -- message handlers from left to right, returning the first which evaluates -- to @Just a@, or the initial @e@ value if all the handlers yield @Nothing@. applyHandlers :: a -> P.Message -> [P.Message -> Process (Maybe a)] -> Process a applyHandlers e _ [] = return e applyHandlers e m (f:fs) = do r <- f m case r of Nothing -> applyHandlers e m fs Just r' -> return r' -------------------------------------------------------------------------------- -- Process Definition/State & API Handlers -- -------------------------------------------------------------------------------- processDefinition :: forall s. ExchangeType s -> MVar (ControlPort ControlMessage) -> ControlChannel ControlMessage -> Process (ProcessDefinition (ExchangeType s)) processDefinition _ tc cc = do liftIO $ putMVar tc $ channelControlPort cc return $ defaultProcess { apiHandlers = [ handleControlChan cc handleControlMessage ] , infoHandlers = [ handleInfo handleMonitor , handleRaw convertToCC ] } :: Process (ProcessDefinition (ExchangeType s)) handleMonitor :: forall s. ExchangeType s -> ProcessMonitorNotification -> Process (ProcessAction (ExchangeType s)) handleMonitor ex m = do handleControlMessage ex (Configure (unsafeWrapMessage m)) convertToCC :: forall s. ExchangeType s -> P.Message -> Process (ProcessAction (ExchangeType s)) convertToCC ex msg = do liftIO $ putStrLn "convert to cc" handleControlMessage ex (Post $ Message "" [] msg) handleControlMessage :: forall s. ExchangeType s -> ControlMessage -> Process (ProcessAction (ExchangeType s)) handleControlMessage ex@ExchangeType{..} cm = let action = case cm of Configure msg -> configureEx state msg Post msg -> routeEx state msg in action >>= \s -> continue $ ex { state = s }