module Control.Eff.Concurrent.Api.Observer
(
Observer(..)
, Observable(..)
, notifyObserver
, registerObserver
, forgetObserver
, SomeObserver(..)
, notifySomeObserver
, Observers()
, ObserverState
, manageObservers
, addObserver
, removeObserver
, notifyObservers
, CallbackObserver
, spawnCallbackObserver
, spawnLoggingObserver
)
where
import GHC.Stack
import Data.Dynamic
import Data.Set ( Set )
import qualified Data.Set as Set
import Control.Eff
import Control.Eff.Concurrent.Process
import Control.Eff.Concurrent.Api
import Control.Eff.Concurrent.Api.Client
import Control.Eff.Concurrent.Api.Server
import Control.Eff.Log
import Control.Eff.State.Strict
import Control.Lens
class (Typeable p, Observable o) => Observer p o where
observationMessage :: Server o -> Observation o -> Api p 'Asynchronous
class (Typeable o, Typeable (Observation o)) => Observable o where
data Observation o
registerObserverMessage :: SomeObserver o -> Api o 'Asynchronous
forgetObserverMessage :: SomeObserver o -> Api o 'Asynchronous
notifyObserver
:: ( SetMember Process (Process q) r
, Observable o
, Observer p o
, HasCallStack
, Member Interrupts r
)
=> SchedulerProxy q
-> Server p
-> Server o
-> Observation o
-> Eff r ()
notifyObserver px observer observed observation =
cast px observer (observationMessage observed observation)
registerObserver
:: ( SetMember Process (Process q) r
, Observable o
, Observer p o
, HasCallStack
, Member Interrupts r
)
=> SchedulerProxy q
-> Server p
-> Server o
-> Eff r ()
registerObserver px observer observed =
cast px observed (registerObserverMessage (SomeObserver observer))
forgetObserver
:: ( SetMember Process (Process q) r
, Observable o
, Observer p o
, Member Interrupts r
)
=> SchedulerProxy q
-> Server p
-> Server o
-> Eff r ()
forgetObserver px observer observed =
cast px observed (forgetObserverMessage (SomeObserver observer))
data SomeObserver o where
SomeObserver :: (Show (Server p), Typeable p, Observer p o) => Server p -> SomeObserver o
deriving instance Show (SomeObserver o)
instance Ord (SomeObserver o) where
compare (SomeObserver (Server o1)) (SomeObserver (Server o2)) =
compare o1 o2
instance Eq (SomeObserver o) where
(==) (SomeObserver (Server o1)) (SomeObserver (Server o2)) =
o1 == o2
notifySomeObserver
:: ( SetMember Process (Process q) r
, Observable o
, HasCallStack
, Member Interrupts r
)
=> SchedulerProxy q
-> Server o
-> Observation o
-> SomeObserver o
-> Eff r ()
notifySomeObserver px observed observation (SomeObserver observer) =
notifyObserver px observer observed observation
data Observers o =
Observers { _observers :: Set (SomeObserver o) }
type ObserverState o = State (Observers o)
observers :: Iso' (Observers o) (Set (SomeObserver o))
observers = iso _observers Observers
manageObservers :: Eff (ObserverState o ': r) a -> Eff r a
manageObservers = evalState (Observers Set.empty)
addObserver
:: (SetMember Process (Process q) r, Member (ObserverState o) r, Observable o)
=> SomeObserver o
-> Eff r ()
addObserver = modify . over observers . Set.insert
removeObserver
:: ( SetMember Process (Process q) r
, Member (ObserverState o) r
, Observable o
, Member Interrupts r
)
=> SomeObserver o
-> Eff r ()
removeObserver = modify . over observers . Set.delete
notifyObservers
:: forall o r q
. ( Observable o
, SetMember Process (Process q) r
, Member (ObserverState o) r
, Member Interrupts r
)
=> SchedulerProxy q
-> Observation o
-> Eff r ()
notifyObservers px observation = do
me <- asServer @o <$> self px
os <- view observers <$> get
mapM_ (notifySomeObserver px me observation) os
data CallbackObserver o
deriving Typeable
data instance Api (CallbackObserver o) r where
CbObserved :: (Typeable o, Typeable (Observation o)) =>
Server o -> Observation o -> Api (CallbackObserver o) 'Asynchronous
deriving Typeable
deriving instance Show (Observation o) => Show (Api (CallbackObserver o) r)
instance (Observable o) => Observer (CallbackObserver o) o where
observationMessage = CbObserved
spawnCallbackObserver
:: forall o r q
. ( SetMember Process (Process q) r
, Typeable o
, Show (Observation o)
, Observable o
, Member (Logs LogMessage) q
, Member Interrupts r
, HasCallStack
)
=> SchedulerProxy q
-> ( Server o
-> Observation o
-> Eff (InterruptableProcess q) ApiServerCmd
)
-> Eff r (Server (CallbackObserver o))
spawnCallbackObserver px onObserve = spawnServerWithEffects
px
(castHandler handleCastCbo)
id
where handleCastCbo (CbObserved fromSvr v) = onObserve fromSvr v
spawnLoggingObserver
:: forall o r q
. ( SetMember Process (Process q) r
, Typeable o
, Show (Observation o)
, Observable o
, Member (Logs LogMessage) q
, Member (Logs LogMessage) r
, Member Interrupts r
, HasCallStack
)
=> SchedulerProxy q
-> Eff r (Server (CallbackObserver o))
spawnLoggingObserver px = spawnCallbackObserver
px
(\s o ->
logDebug (show s ++ " OBSERVED: " ++ show o) >> return HandleNextRequest
)