module Control.Eff.Concurrent.Api.Observer
( Observer(..)
, Api(RegisterObserver, ForgetObserver, Observed)
, registerObserver
, forgetObserver
, handleObservations
, toObserver
, toObserverFor
, ObserverRegistry
, ObserverState
, handleObserverRegistration
, manageObservers
, observed
)
where
import Control.DeepSeq (NFData(rnf))
import Control.Eff
import Control.Eff.Concurrent.Process
import Control.Eff.Concurrent.Api
import Control.Eff.Concurrent.Api.Client
import Control.Eff.Concurrent.Api.Server
import Control.Eff.State.Strict
import Control.Eff.Log
import Control.Lens
import Data.Data (typeOf)
import Data.Dynamic
import Data.Foldable
import Data.Proxy
import Data.Set ( Set )
import qualified Data.Set as Set
import Data.Text ( pack )
import Data.Typeable ( typeRep )
import Data.Type.Pretty
import GHC.Stack
data Observer o where
Observer
:: (PrettyTypeShow (ToPretty p), Show (Server p), Typeable p, Typeable o, NFData o, NFData (Api p 'Asynchronous))
=> (o -> Maybe (Api p 'Asynchronous)) -> Server p -> Observer o
type instance ToPretty (Observer o) =
PrettyParens ("observing" <:> ToPretty o)
instance (NFData o) => NFData (Observer o) where
rnf (Observer k s) = rnf k `seq` rnf s
instance Show (Observer o) where
showsPrec d (Observer _ p) = showParen
(d >= 10)
(shows (typeRep (Proxy :: Proxy o)) . showString " observer: " . shows p)
instance Ord (Observer o) where
compare (Observer _ s1) (Observer _ s2) =
compare (s1 ^. fromServer) (s2 ^. fromServer)
instance Eq (Observer o) where
(==) (Observer _ s1) (Observer _ s2) =
(==) (s1 ^. fromServer) (s2 ^. fromServer)
registerObserver
:: ( SetMember Process (Process q) r
, HasCallStack
, Member Interrupts r
, Typeable o
, NFData o
, PrettyTypeShow (ToPretty o)
)
=> Observer o
-> Server (ObserverRegistry o)
-> Eff r ()
registerObserver observer observerRegistry =
cast observerRegistry (RegisterObserver observer)
forgetObserver
:: ( SetMember Process (Process q) r
, HasCallStack
, Member Interrupts r
, Typeable o
, NFData o
, PrettyTypeShow (ToPretty o)
)
=> Observer o
-> Server (ObserverRegistry o)
-> Eff r ()
forgetObserver observer observerRegistry =
cast observerRegistry (ForgetObserver observer)
data instance Api (Observer o) r where
Observed :: o -> Api (Observer o) 'Asynchronous
deriving Typeable
instance NFData o => NFData (Api (Observer o) 'Asynchronous) where
rnf (Observed o) = rnf o
handleObservations
:: (HasCallStack, Typeable o, SetMember Process (Process q) r, NFData (Observer o))
=> (o -> Eff r (CallbackResult 'Recoverable))
-> MessageCallback (Observer o) r
handleObservations k = handleCasts
(\case
Observed o -> k o
)
toObserver
:: (NFData o, Typeable o, NFData (Api (Observer o) 'Asynchronous), PrettyTypeShow (ToPretty o))
=> Server (Observer o) -> Observer o
toObserver = toObserverFor Observed
toObserverFor
:: (Typeable a, PrettyTypeShow (ToPretty a), NFData (Api a 'Asynchronous), Typeable o, NFData o)
=> (o -> Api a 'Asynchronous)
-> Server a
-> Observer o
toObserverFor wrapper = Observer (Just . wrapper)
data ObserverRegistry o
type instance ToPretty (ObserverRegistry o) =
PrettyParens ("observer registry" <:> ToPretty o)
data instance Api (ObserverRegistry o) r where
RegisterObserver :: NFData o => Observer o -> Api (ObserverRegistry o) 'Asynchronous
ForgetObserver :: NFData o => Observer o -> Api (ObserverRegistry o) 'Asynchronous
deriving Typeable
instance NFData (Api (ObserverRegistry o) r) where
rnf (RegisterObserver o) = rnf o
rnf (ForgetObserver o) = rnf o
handleObserverRegistration
:: forall o q r
. ( HasCallStack
, Typeable o
, SetMember Process (Process q) r
, Member (ObserverState o) r
, Member Logs r
)
=> MessageCallback (ObserverRegistry o) r
handleObserverRegistration = handleCasts
(\case
RegisterObserver ob -> do
os <- get @(Observers o)
logDebug ("registering "
<> pack (show (typeOf ob))
<> " current number of observers: "
<> pack (show (Set.size (view observers os))))
put (over observers (Set.insert ob)os)
pure AwaitNext
ForgetObserver ob -> do
os <- get @(Observers o)
logDebug ("forgetting "
<> pack (show (typeOf ob))
<> " current number of observers: "
<> pack (show (Set.size (view observers os))))
put (over observers (Set.delete ob) os)
pure AwaitNext
)
manageObservers :: Eff (ObserverState o ': r) a -> Eff r a
manageObservers = evalState (Observers Set.empty)
newtype Observers o =
Observers { _observers :: Set (Observer o) }
type ObserverState o = State (Observers o)
observers :: Iso' (Observers o) (Set (Observer o))
observers = iso _observers Observers
observed
:: forall o r q
. ( SetMember Process (Process q) r
, PrettyTypeShow (ToPretty o)
, Member (ObserverState o) r
, Member Interrupts r
)
=> o
-> Eff r ()
observed observation = do
os <- view observers <$> get
mapM_ notifySomeObserver os
where
notifySomeObserver (Observer messageFilter receiver) =
traverse_ (cast receiver) (messageFilter observation)