module Control.Eff.Concurrent.Protocol.Observer
( Observer(..)
, TangibleObserver
, Pdu(RegisterObserver, ForgetObserver, Observed)
, registerObserver
, forgetObserver
, handleObservations
, toObserver
, toObserverFor
, ObserverRegistry
, ObserverState
, Observers()
, emptyObservers
, handleObserverRegistration
, manageObservers
, observed
)
where
import Control.DeepSeq (NFData(rnf))
import Control.Eff
import Control.Eff.Concurrent.Process
import Control.Eff.Concurrent.Protocol
import Control.Eff.Concurrent.Protocol.Client
import Control.Eff.State.Strict
import Control.Eff.Log
import Control.Lens
import Data.Data (typeOf)
import Data.Dynamic
import Data.Foldable
import Data.Proxy
import Data.Set ( Set )
import qualified Data.Set as Set
import Data.Text ( pack )
import Data.Typeable ( typeRep )
import Data.Type.Pretty
import GHC.Stack
data Observer o where
Observer
:: ( Tangible o
, TangiblePdu p 'Asynchronous
, Tangible (Endpoint p)
, Typeable p
)
=> (o -> Maybe (Pdu p 'Asynchronous)) -> Endpoint p -> Observer o
type TangibleObserver o =
( Tangible o, TangiblePdu (Observer o) 'Asynchronous)
type instance ToPretty (Observer o) =
PrettyParens ("observing" <:> ToPretty o)
instance (NFData o) => NFData (Observer o) where
rnf (Observer k s) = rnf k `seq` rnf s
instance Show (Observer o) where
showsPrec d (Observer _ p) = showParen
(d >= 10)
(shows (typeRep (Proxy :: Proxy o)) . showString " observer: " . shows p)
instance Ord (Observer o) where
compare (Observer _ s1) (Observer _ s2) =
compare (s1 ^. fromEndpoint) (s2 ^. fromEndpoint)
instance Eq (Observer o) where
(==) (Observer _ s1) (Observer _ s2) =
(==) (s1 ^. fromEndpoint) (s2 ^. fromEndpoint)
registerObserver
:: ( SetMember Process (Process q) r
, HasCallStack
, Member Interrupts r
, TangibleObserver o
, EmbedProtocol x (ObserverRegistry o)
, TangiblePdu x 'Asynchronous
)
=> Observer o
-> Endpoint x
-> Eff r ()
registerObserver observer observerRegistry =
cast observerRegistry (RegisterObserver observer)
forgetObserver
:: ( SetMember Process (Process q) r
, HasCallStack
, Member Interrupts r
, Typeable o
, NFData o
, EmbedProtocol x (ObserverRegistry o)
, TangiblePdu x 'Asynchronous
)
=> Observer o
-> Endpoint x
-> Eff r ()
forgetObserver observer observerRegistry =
cast observerRegistry (ForgetObserver observer)
data instance Pdu (Observer o) r where
Observed :: o -> Pdu (Observer o) 'Asynchronous
deriving Typeable
instance NFData o => NFData (Pdu (Observer o) 'Asynchronous) where
rnf (Observed o) = rnf o
instance Show o => Show (Pdu (Observer o) r) where
showsPrec d (Observed o) = showParen (d>=10) (showString "observered: " . shows o)
handleObservations
:: (HasCallStack, Typeable o, SetMember Process (Process q) r, NFData (Observer o))
=> (o -> Eff r ())
-> Pdu (Observer o) 'Asynchronous -> Eff r ()
handleObservations k (Observed r) = k r
toObserver :: TangibleObserver o => Endpoint (Observer o) -> Observer o
toObserver = toObserverFor Observed
toObserverFor
:: (TangibleObserver o, Typeable a, TangiblePdu a 'Asynchronous)
=> (o -> Pdu a 'Asynchronous)
-> Endpoint a
-> Observer o
toObserverFor wrapper = Observer (Just . wrapper)
data ObserverRegistry o
deriving Typeable
type instance ToPretty (ObserverRegistry o) =
PrettyParens ("observer registry" <:> ToPretty o)
data instance Pdu (ObserverRegistry o) r where
RegisterObserver :: NFData o => Observer o -> Pdu (ObserverRegistry o) 'Asynchronous
ForgetObserver :: NFData o => Observer o -> Pdu (ObserverRegistry o) 'Asynchronous
deriving Typeable
instance NFData (Pdu (ObserverRegistry o) r) where
rnf (RegisterObserver o) = rnf o
rnf (ForgetObserver o) = rnf o
instance Show (Pdu (ObserverRegistry o) r) where
showsPrec d (RegisterObserver o) = showParen (d >= 10) (showString "register observer: " . showsPrec 11 o)
showsPrec d (ForgetObserver o) = showParen (d >= 10) (showString "forget observer: " . showsPrec 11 o)
handleObserverRegistration
:: forall o q r
. ( HasCallStack
, Typeable o
, SetMember Process (Process q) r
, Member (ObserverState o) r
, Member Logs r
)
=> Pdu (ObserverRegistry o) 'Asynchronous -> Eff r ()
handleObserverRegistration = \case
RegisterObserver ob -> do
os <- get @(Observers o)
logDebug ("registering "
<> pack (show (typeOf ob))
<> " current number of observers: "
<> pack (show (Set.size (view observers os))))
put (over observers (Set.insert ob) os)
ForgetObserver ob -> do
os <- get @(Observers o)
logDebug ("forgetting "
<> pack (show (typeOf ob))
<> " current number of observers: "
<> pack (show (Set.size (view observers os))))
put (over observers (Set.delete ob) os)
manageObservers :: Eff (ObserverState o ': r) a -> Eff r a
manageObservers = evalState (Observers Set.empty)
emptyObservers :: Observers o
emptyObservers = Observers Set.empty
newtype Observers o =
Observers { _observers :: Set (Observer o) }
deriving (Eq, Ord, Typeable, Show, NFData)
type ObserverState o = State (Observers o)
observers :: Iso' (Observers o) (Set (Observer o))
observers = iso _observers Observers
observed
:: forall o r q
. ( SetMember Process (Process q) r
, Member (ObserverState o) r
, Member Interrupts r
, TangibleObserver o
)
=> o
-> Eff r ()
observed observation = do
os <- view observers <$> get
mapM_ notifySomeObserver os
where
notifySomeObserver (Observer messageFilter receiver) =
traverse_ (cast receiver) (messageFilter observation)