-- | Observer Effects -- -- This module supports the implementation of observerRegistry and observables. Expected use -- case is event propagation. -- -- The observable event sources and the observers are usually server processes for a -- protocol that embeds the 'ObserverRegistry' and 'Observer' 'Pdu's respectively. -- -- A generic FIFO queue based observer can be found in "Control.Eff.Concurrent.Protocol.Observer.Queue". -- -- @since 0.16.0 module Control.Eff.Concurrent.Protocol.Observer ( Observer(..) , ObservationSink() , IsObservable , CanObserve , Pdu(RegisterObserver, ForgetObserver, Observed) , registerObserver , forgetObserver , forgetObserverUnsafe , ObserverRegistry(..) , ObserverRegistryState , observerRegistryNotify , evalObserverRegistryState , emptyObserverRegistry , observerRegistryHandlePdu , observerRegistryRemoveProcess ) where import Control.DeepSeq ( NFData(rnf) ) import Control.Eff import Control.Eff.Concurrent.Misc import Control.Eff.Concurrent.Process import Control.Eff.Concurrent.Protocol import Control.Eff.Concurrent.Protocol.Client import Control.Eff.Concurrent.Protocol.Wrapper (Request(Cast)) import Control.Eff.State.Strict import Control.Eff.Log import Control.Lens import Control.Monad import Data.Dynamic import Data.Kind import Data.Semigroup import Data.Map ( Map ) import qualified Data.Map as Map import Data.Text ( pack ) import Data.Type.Pretty import GHC.Generics import GHC.Stack -- * Observers -- ** Observables -- | A /protocol/ to communicate 'Observed' events from a sources to many sinks. -- -- A sink is any process that serves a protocol with a 'Pdu' instance that embeds -- the 'Observer' Pdu via an 'HasPduPrism' instance. -- -- This type has /dual use/, for one it serves as type-index for 'Pdu', i.e. -- 'HasPdu' respectively, and secondly it contains an 'ObservationSink' and -- a 'MonitorReference'. -- -- The 'ObservationSink' is used to serialize and send the 'Observed' events, -- while the 'ProcessId' serves as key for internal maps. -- -- @since 0.28.0 newtype Observer event = MkObserver (Arg ProcessId (ObservationSink event)) deriving (Eq, Ord, Typeable) instance NFData (Observer event) where rnf (MkObserver (Arg x y)) = rnf x `seq` rnf y instance Typeable event => Show (Observer event) where showsPrec d (MkObserver (Arg x (MkObservationSink _ m))) = showParen (d>=10) (showString "observer: " . showSTypeable @event . showString " ". showsPrec 10 x . showChar ' ' . showsPrec 10 m ) type instance ToPretty (Observer event) = PrettyParens ("observing" <:> ToPretty event) instance (Tangible event) => HasPdu (Observer event) where data Pdu (Observer event) r where Observed :: event -> Pdu (Observer event) 'Asynchronous deriving Typeable instance NFData event => NFData (Pdu (Observer event) r) where rnf (Observed event) = rnf event instance Show event => Show (Pdu (Observer event) r) where showsPrec d (Observed event) = showParen (d >= 10) (showString "observered: " . showsPrec 10 event) -- | The Information necessary to wrap an 'Observed' event to a process specific -- message, e.g. the embedded 'Observer' 'Pdu' instance, and the 'MonitorReference' of -- the destination process. -- -- @since 0.28.0 data ObservationSink event = MkObservationSink { _observerSerializer :: Serializer (Pdu (Observer event) 'Asynchronous) , _observerMonitorReference :: MonitorReference } deriving (Generic, Typeable) instance NFData (ObservationSink event) where rnf (MkObservationSink s p) = s `seq` rnf p -- | Convenience type alias. -- -- @since 0.28.0 type IsObservable eventSource event = ( Tangible event , Embeds eventSource (ObserverRegistry event) , HasPdu eventSource ) -- | Convenience type alias. -- -- @since 0.28.0 type CanObserve eventSink event = ( Tangible event , Embeds eventSink (Observer event) , HasPdu eventSink ) -- | And an 'Observer' to the set of recipients for all observations reported by 'observerRegistryNotify'. -- Note that the observerRegistry are keyed by the observing process, i.e. a previous entry for the process -- contained in the 'Observer' is overwritten. If you want multiple entries for a single process, just -- combine several filter functions. -- -- @since 0.16.0 registerObserver :: forall event eventSink eventSource r q . ( HasCallStack , HasProcesses r q , IsObservable eventSource event , Tangible (Pdu eventSource 'Asynchronous) , Tangible (Pdu eventSink 'Asynchronous) , CanObserve eventSink event ) => Endpoint eventSource -> Endpoint eventSink -> Eff r () registerObserver eventSource eventSink = cast eventSource (RegisterObserver serializer (eventSink ^. fromEndpoint)) where serializer = MkSerializer ( toStrictDynamic . Cast . embedPdu @eventSink @(Observer event) @( 'Asynchronous ) ) -- | Send the 'ForgetObserver' message -- -- @since 0.16.0 forgetObserver :: forall event eventSink eventSource r q . ( HasProcesses r q , HasCallStack , Tangible (Pdu eventSource 'Asynchronous) , Tangible (Pdu eventSink 'Asynchronous) , IsObservable eventSource event , CanObserve eventSink event ) => Endpoint eventSource -> Endpoint eventSink -> Eff r () forgetObserver eventSource eventSink = forgetObserverUnsafe @event @eventSource eventSource (eventSink ^. fromEndpoint) -- | Send the 'ForgetObserver' message, use a raw 'ProcessId' as parameter. -- -- @since 0.28.0 forgetObserverUnsafe :: forall event eventSource r q . ( HasProcesses r q , HasCallStack , Tangible (Pdu eventSource 'Asynchronous) , IsObservable eventSource event ) => Endpoint eventSource -> ProcessId -> Eff r () forgetObserverUnsafe eventSource eventSink = cast eventSource (ForgetObserver @event eventSink) -- ** Observer Support Functions -- * Managing Observers -- | A protocol for managing 'Observer's, encompassing registration and de-registration of -- 'Observer's. -- -- @since 0.28.0 data ObserverRegistry (event :: Type) = MkObserverRegistry { _observerRegistry :: Map ProcessId (ObservationSink event) } deriving Typeable type instance ToPretty (ObserverRegistry event) = PrettyParens ("observer registry" <:> ToPretty event) instance (Tangible event) => HasPdu (ObserverRegistry event) where -- | Protocol for managing observers. This can be added to any server for any number of different observation types. -- The functions 'evalObserverRegistryState' and 'observerRegistryHandlePdu' are used to include observer handling; -- -- @since 0.16.0 data instance Pdu (ObserverRegistry event) r where -- | This message denotes that the given 'Observer' should receive observations until 'ForgetObserver' is -- received. -- -- @since 0.28.0 RegisterObserver :: Serializer (Pdu (Observer event) 'Asynchronous) -> ProcessId -> Pdu (ObserverRegistry event) 'Asynchronous -- | This message denotes that the given 'Observer' should not receive observations anymore. -- -- @since 0.16.1 ForgetObserver :: ProcessId -> Pdu (ObserverRegistry event) 'Asynchronous -- -- | This message denotes that a monitored process died -- -- -- -- @since 0.28.0 -- ObserverMightBeDown :: MonitorReference -> Pdu (ObserverRegistry event) ( 'Synchronous Bool) deriving Typeable instance NFData (Pdu (ObserverRegistry event) r) where rnf (RegisterObserver ser pid) = rnf ser `seq` rnf pid rnf (ForgetObserver pid) = rnf pid instance Typeable event => Show (Pdu (ObserverRegistry event) r) where showsPrec d (RegisterObserver ser pid) = showParen (d >= 10) (showString "register observer: " . shows ser . showChar ' ' . shows pid) showsPrec d (ForgetObserver p) = showParen (d >= 10) (showString "forget observer: " . shows p) -- ** Protocol for integrating 'ObserverRegistry' into processes. -- | Provide the implementation for the 'ObserverRegistry' Protocol, this handled 'RegisterObserver' and 'ForgetObserver' -- messages. It also adds the 'ObserverRegistryState' constraint to the effect list. -- -- @since 0.28.0 observerRegistryHandlePdu :: forall event q r . ( HasCallStack , Typeable event , HasProcesses r q , Member (ObserverRegistryState event) r , Member Logs r ) => Pdu (ObserverRegistry event) 'Asynchronous -> Eff r () observerRegistryHandlePdu = \case RegisterObserver ser pid -> do monRef <- monitor pid let sink = MkObservationSink ser monRef observer = MkObserver (Arg pid sink) modify @(ObserverRegistry event) (over observerRegistry (Map.insert pid sink)) os <- get @(ObserverRegistry event) logDebug ( "registered " <> pack (show observer) <> " current number of observers: " -- TODO put this info into the process details <> pack (show (Map.size (view observerRegistry os)))) ForgetObserver ob -> do wasRemoved <- observerRegistryRemoveProcess @event ob unless wasRemoved $ logDebug (pack (show ("unknown observer " ++ show ob))) -- | Remove the entry in the 'ObserverRegistry' for the 'ProcessId' -- and return 'True' if there was an entry, 'False' otherwise. -- -- @since 0.28.0 observerRegistryRemoveProcess :: forall event q r . ( HasCallStack , Typeable event , HasProcesses r q , Member (ObserverRegistryState event) r , Member Logs r ) => ProcessId -> Eff r Bool observerRegistryRemoveProcess ob = do mSink <- view (observerRegistry . at ob) <$> get @(ObserverRegistry event) modify @(ObserverRegistry event) (observerRegistry . at ob .~ Nothing) os <- get @(ObserverRegistry event) maybe (pure False) (foundIt os) mSink where foundIt os sink@(MkObservationSink _ monRef) = do demonitor monRef logDebug ( "removed: " <> (pack $ show $ MkObserver $ Arg ob sink) <> " current number of observers: " <> pack (show (Map.size (view observerRegistry os)))) pure True -- | Keep track of registered 'Observer's. -- -- Handle the 'ObserverRegistryState' effect, i.e. run 'evalState' on an 'emptyObserverRegistry'. -- -- @since 0.28.0 evalObserverRegistryState :: HasCallStack => Eff (ObserverRegistryState event ': r) a -> Eff r a evalObserverRegistryState = evalState emptyObserverRegistry -- | The empty 'ObserverRegistryState' -- -- @since 0.28.0 emptyObserverRegistry :: ObserverRegistry event emptyObserverRegistry = MkObserverRegistry Map.empty -- | Alias for the effect that contains the observers managed by 'evalObserverRegistryState' type ObserverRegistryState event = State (ObserverRegistry event) -- | An 'Iso' for the 'Map' used internally. observerRegistry :: Iso' (ObserverRegistry event) (Map ProcessId (ObservationSink event)) observerRegistry = iso _observerRegistry MkObserverRegistry -- | Report an observation to all observers. -- The process needs to 'evalObserverRegistryState' and to 'observerRegistryHandlePdu'. -- -- @since 0.28.0 observerRegistryNotify :: forall event r q . ( HasProcesses r q , Member (ObserverRegistryState event) r , Tangible event , HasCallStack ) => event -> Eff r () observerRegistryNotify observation = do os <- view observerRegistry <$> get mapM_ notifySomeObserver (Map.assocs os) where notifySomeObserver (destination, (MkObservationSink serializer _)) = sendAnyMessage destination (runSerializer serializer (Observed observation))