module Control.Eff.Concurrent.Protocol.Observer
( Observer(..)
, ObservationSink()
, IsObservable
, CanObserve
, Pdu(RegisterObserver, ForgetObserver, Observed)
, registerObserver
, forgetObserver
, forgetObserverUnsafe
, ObserverRegistry(..)
, ObserverRegistryState
, observerRegistryNotify
, evalObserverRegistryState
, emptyObserverRegistry
, observerRegistryHandlePdu
, observerRegistryRemoveProcess
)
where
import Control.DeepSeq ( NFData(rnf) )
import Control.Eff
import Control.Eff.Concurrent.Misc
import Control.Eff.Concurrent.Process
import Control.Eff.Concurrent.Protocol
import Control.Eff.Concurrent.Protocol.Client
import Control.Eff.Concurrent.Protocol.Wrapper (Request(Cast))
import Control.Eff.State.Strict
import Control.Eff.Log
import Control.Lens
import Control.Monad
import Data.Dynamic
import Data.Kind
import Data.Semigroup
import Data.Map ( Map )
import qualified Data.Map as Map
import Data.Text ( pack )
import Data.Type.Pretty
import GHC.Generics
import GHC.Stack
newtype Observer event =
MkObserver (Arg ProcessId (ObservationSink event))
deriving (Eq, Ord, Typeable)
instance NFData (Observer event) where
rnf (MkObserver (Arg x y)) = rnf x `seq` rnf y
instance Typeable event => Show (Observer event) where
showsPrec d (MkObserver (Arg x (MkObservationSink _ m))) =
showParen (d>=10) (showString "observer: " . showSTypeable @event . showString " ". showsPrec 10 x . showChar ' ' . showsPrec 10 m )
type instance ToPretty (Observer event) =
PrettyParens ("observing" <:> ToPretty event)
instance (Tangible event) => HasPdu (Observer event) where
data Pdu (Observer event) r where
Observed :: event -> Pdu (Observer event) 'Asynchronous
deriving Typeable
instance NFData event => NFData (Pdu (Observer event) r) where
rnf (Observed event) = rnf event
instance Show event => Show (Pdu (Observer event) r) where
showsPrec d (Observed event) =
showParen (d >= 10) (showString "observered: " . showsPrec 10 event)
data ObservationSink event =
MkObservationSink
{ _observerSerializer :: Serializer (Pdu (Observer event) 'Asynchronous)
, _observerMonitorReference :: MonitorReference
}
deriving (Generic, Typeable)
instance NFData (ObservationSink event) where
rnf (MkObservationSink s p) = s `seq` rnf p
type IsObservable eventSource event =
( Tangible event
, Embeds eventSource (ObserverRegistry event)
, HasPdu eventSource
)
type CanObserve eventSink event =
( Tangible event
, Embeds eventSink (Observer event)
, HasPdu eventSink
)
registerObserver
:: forall event eventSink eventSource r q .
( HasCallStack
, HasProcesses r q
, IsObservable eventSource event
, Tangible (Pdu eventSource 'Asynchronous)
, Tangible (Pdu eventSink 'Asynchronous)
, CanObserve eventSink event
)
=> Endpoint eventSource
-> Endpoint eventSink
-> Eff r ()
registerObserver eventSource eventSink =
cast eventSource (RegisterObserver serializer (eventSink ^. fromEndpoint))
where
serializer =
MkSerializer
( toStrictDynamic
. Cast
. embedPdu @eventSink @(Observer event) @( 'Asynchronous )
)
forgetObserver
:: forall event eventSink eventSource r q .
( HasProcesses r q
, HasCallStack
, Tangible (Pdu eventSource 'Asynchronous)
, Tangible (Pdu eventSink 'Asynchronous)
, IsObservable eventSource event
, CanObserve eventSink event
)
=> Endpoint eventSource
-> Endpoint eventSink
-> Eff r ()
forgetObserver eventSource eventSink =
forgetObserverUnsafe @event @eventSource eventSource (eventSink ^. fromEndpoint)
forgetObserverUnsafe
:: forall event eventSource r q .
( HasProcesses r q
, HasCallStack
, Tangible (Pdu eventSource 'Asynchronous)
, IsObservable eventSource event
)
=> Endpoint eventSource
-> ProcessId
-> Eff r ()
forgetObserverUnsafe eventSource eventSink =
cast eventSource (ForgetObserver @event eventSink)
data ObserverRegistry (event :: Type) = MkObserverRegistry
{ _observerRegistry :: Map ProcessId (ObservationSink event) }
deriving Typeable
type instance ToPretty (ObserverRegistry event) =
PrettyParens ("observer registry" <:> ToPretty event)
instance (Tangible event) => HasPdu (ObserverRegistry event) where
data instance Pdu (ObserverRegistry event) r where
RegisterObserver :: Serializer (Pdu (Observer event) 'Asynchronous) -> ProcessId -> Pdu (ObserverRegistry event) 'Asynchronous
ForgetObserver :: ProcessId -> Pdu (ObserverRegistry event) 'Asynchronous
deriving Typeable
instance NFData (Pdu (ObserverRegistry event) r) where
rnf (RegisterObserver ser pid) = rnf ser `seq` rnf pid
rnf (ForgetObserver pid) = rnf pid
instance Typeable event => Show (Pdu (ObserverRegistry event) r) where
showsPrec d (RegisterObserver ser pid) = showParen (d >= 10) (showString "register observer: " . shows ser . showChar ' ' . shows pid)
showsPrec d (ForgetObserver p) = showParen (d >= 10) (showString "forget observer: " . shows p)
observerRegistryHandlePdu
:: forall event q r
. ( HasCallStack
, Typeable event
, HasProcesses r q
, Member (ObserverRegistryState event) r
, Member Logs r
)
=> Pdu (ObserverRegistry event) 'Asynchronous -> Eff r ()
observerRegistryHandlePdu = \case
RegisterObserver ser pid -> do
monRef <- monitor pid
let sink = MkObservationSink ser monRef
observer = MkObserver (Arg pid sink)
modify @(ObserverRegistry event) (over observerRegistry (Map.insert pid sink))
os <- get @(ObserverRegistry event)
logDebug ( "registered "
<> pack (show observer)
<> " current number of observers: "
<> pack (show (Map.size (view observerRegistry os))))
ForgetObserver ob -> do
wasRemoved <- observerRegistryRemoveProcess @event ob
unless wasRemoved $
logDebug (pack (show ("unknown observer " ++ show ob)))
observerRegistryRemoveProcess
:: forall event q r
. ( HasCallStack
, Typeable event
, HasProcesses r q
, Member (ObserverRegistryState event) r
, Member Logs r
)
=> ProcessId -> Eff r Bool
observerRegistryRemoveProcess ob = do
mSink <- view (observerRegistry . at ob) <$> get @(ObserverRegistry event)
modify @(ObserverRegistry event) (observerRegistry . at ob .~ Nothing)
os <- get @(ObserverRegistry event)
maybe
(pure False)
(foundIt os)
mSink
where
foundIt os sink@(MkObservationSink _ monRef) = do
demonitor monRef
logDebug ( "removed: "
<> (pack $ show $ MkObserver $ Arg ob sink)
<> " current number of observers: "
<> pack (show (Map.size (view observerRegistry os))))
pure True
evalObserverRegistryState :: HasCallStack => Eff (ObserverRegistryState event ': r) a -> Eff r a
evalObserverRegistryState = evalState emptyObserverRegistry
emptyObserverRegistry :: ObserverRegistry event
emptyObserverRegistry = MkObserverRegistry Map.empty
type ObserverRegistryState event = State (ObserverRegistry event)
observerRegistry :: Iso' (ObserverRegistry event) (Map ProcessId (ObservationSink event))
observerRegistry = iso _observerRegistry MkObserverRegistry
observerRegistryNotify
:: forall event r q
. ( HasProcesses r q
, Member (ObserverRegistryState event) r
, Tangible event
, HasCallStack
)
=> event
-> Eff r ()
observerRegistryNotify observation = do
os <- view observerRegistry <$> get
mapM_ notifySomeObserver (Map.assocs os)
where
notifySomeObserver (destination, (MkObservationSink serializer _)) =
sendAnyMessage destination (runSerializer serializer (Observed observation))