module Control.Eff.Concurrent.Protocol.Observer
( Observer(..)
, TangibleObserver
, Pdu(RegisterObserver, ForgetObserver, Observed)
, registerObserver
, forgetObserver
, handleObservations
, toObserver
, toObserverFor
, ObserverRegistry
, ObserverState
, Observers()
, emptyObservers
, handleObserverRegistration
, manageObservers
, observed
)
where
import Control.DeepSeq (NFData(rnf))
import Control.Eff
import Control.Eff.Concurrent.Process
import Control.Eff.Concurrent.Protocol
import Control.Eff.Concurrent.Protocol.Client
import Control.Eff.State.Strict
import Control.Eff.Log
import Control.Lens
import Data.Data (typeOf)
import Data.Dynamic
import Data.Foldable
import Data.Kind
import Data.Proxy
import Data.Set ( Set )
import qualified Data.Set as Set
import Data.Text ( pack )
import Data.Typeable ( typeRep )
import Data.Type.Pretty
import GHC.Stack
data Observer o where
Observer
:: ( Tangible o
, IsPdu p 'Asynchronous
, Tangible (Endpoint p)
, Typeable p
)
=> (o -> Maybe (Pdu p 'Asynchronous)) -> Endpoint p -> Observer o
type TangibleObserver o =
( Tangible o, IsPdu (Observer o) 'Asynchronous)
type instance ToPretty (Observer o) =
PrettyParens ("observing" <:> ToPretty o)
instance (NFData o) => NFData (Observer o) where
rnf (Observer k s) = rnf k `seq` rnf s
instance Show (Observer o) where
showsPrec d (Observer _ p) = showParen
(d >= 10)
(shows (typeRep (Proxy :: Proxy o)) . showString " observer: " . shows p)
instance Ord (Observer o) where
compare (Observer _ s1) (Observer _ s2) =
compare (s1 ^. fromEndpoint) (s2 ^. fromEndpoint)
instance Eq (Observer o) where
(==) (Observer _ s1) (Observer _ s2) =
(==) (s1 ^. fromEndpoint) (s2 ^. fromEndpoint)
registerObserver
:: ( SetMember Process (Process q) r
, HasCallStack
, Member Interrupts r
, TangibleObserver o
, EmbedProtocol x (ObserverRegistry o)
, IsPdu x 'Asynchronous
)
=> Observer o
-> Endpoint x
-> Eff r ()
registerObserver observer observerRegistry =
cast observerRegistry (RegisterObserver observer)
forgetObserver
:: ( SetMember Process (Process q) r
, HasCallStack
, Member Interrupts r
, Typeable o
, NFData o
, EmbedProtocol x (ObserverRegistry o)
, IsPdu x 'Asynchronous
)
=> Observer o
-> Endpoint x
-> Eff r ()
forgetObserver observer observerRegistry =
cast observerRegistry (ForgetObserver observer)
instance (NFData o, Show o, Typeable o, Typeable r) => IsPdu (Observer o) r where
data instance Pdu (Observer o) r where
Observed :: o -> Pdu (Observer o) 'Asynchronous
deriving Typeable
instance NFData o => NFData (Pdu (Observer o) r) where
rnf (Observed o) = rnf o
instance Show o => Show (Pdu (Observer o) r) where
showsPrec d (Observed o) = showParen (d>=10) (showString "observered: " . shows o)
handleObservations
:: (HasCallStack, Typeable o, SetMember Process (Process q) r, NFData (Observer o))
=> (o -> Eff r ())
-> Pdu (Observer o) 'Asynchronous -> Eff r ()
handleObservations k (Observed r) = k r
toObserver
:: forall o p
. ( IsPdu p 'Asynchronous
, EmbedProtocol p (Observer o)
, TangibleObserver o
)
=> Endpoint p
-> Observer o
toObserver = toObserverFor (embedPdu @p . Observed)
toObserverFor
:: (TangibleObserver o, Typeable a, IsPdu a 'Asynchronous)
=> (o -> Pdu a 'Asynchronous)
-> Endpoint a
-> Observer o
toObserverFor wrapper = Observer (Just . wrapper)
data ObserverRegistry (o :: Type)
deriving Typeable
type instance ToPretty (ObserverRegistry o) =
PrettyParens ("observer registry" <:> ToPretty o)
instance (Typeable o, Typeable r) => IsPdu (ObserverRegistry o) r where
data instance Pdu (ObserverRegistry o) r where
RegisterObserver :: NFData o => Observer o -> Pdu (ObserverRegistry o) 'Asynchronous
ForgetObserver :: NFData o => Observer o -> Pdu (ObserverRegistry o) 'Asynchronous
deriving Typeable
instance NFData (Pdu (ObserverRegistry o) r) where
rnf (RegisterObserver o) = rnf o
rnf (ForgetObserver o) = rnf o
instance Show (Pdu (ObserverRegistry o) r) where
showsPrec d (RegisterObserver o) = showParen (d >= 10) (showString "register observer: " . showsPrec 11 o)
showsPrec d (ForgetObserver o) = showParen (d >= 10) (showString "forget observer: " . showsPrec 11 o)
handleObserverRegistration
:: forall o q r
. ( HasCallStack
, Typeable o
, SetMember Process (Process q) r
, Member (ObserverState o) r
, Member Logs r
)
=> Pdu (ObserverRegistry o) 'Asynchronous -> Eff r ()
handleObserverRegistration = \case
RegisterObserver ob -> do
os <- get @(Observers o)
logDebug ("registering "
<> pack (show (typeOf ob))
<> " current number of observers: "
<> pack (show (Set.size (view observers os))))
put (over observers (Set.insert ob) os)
ForgetObserver ob -> do
os <- get @(Observers o)
logDebug ("forgetting "
<> pack (show (typeOf ob))
<> " current number of observers: "
<> pack (show (Set.size (view observers os))))
put (over observers (Set.delete ob) os)
manageObservers :: Eff (ObserverState o ': r) a -> Eff r a
manageObservers = evalState (Observers Set.empty)
emptyObservers :: Observers o
emptyObservers = Observers Set.empty
newtype Observers o =
Observers { _observers :: Set (Observer o) }
deriving (Eq, Ord, Typeable, Show, NFData)
type ObserverState o = State (Observers o)
observers :: Iso' (Observers o) (Set (Observer o))
observers = iso _observers Observers
observed
:: forall o r q
. ( SetMember Process (Process q) r
, Member (ObserverState o) r
, Member Interrupts r
, TangibleObserver o
)
=> o
-> Eff r ()
observed observation = do
os <- view observers <$> get
mapM_ notifySomeObserver os
where
notifySomeObserver (Observer messageFilter receiver) =
traverse_ (cast receiver) (messageFilter observation)