module Control.Eff.Concurrent.Api.Observer
( Observer(..)
, Api(RegisterObserver, ForgetObserver, Observed)
, registerObserver
, forgetObserver
, handleObservations
, toObserver
, toObserverFor
, ObserverRegistry
, ObserverState
, handleObserverRegistration
, manageObservers
, observed
)
where
import Control.Eff
import Control.Eff.Concurrent.Process
import Control.Eff.Concurrent.Api
import Control.Eff.Concurrent.Api.Client
import Control.Eff.Concurrent.Api.Server
import Control.Eff.State.Strict
import Control.Lens
import Data.Dynamic
import Data.Foldable
import Data.Proxy
import Data.Set ( Set )
import qualified Data.Set as Set
import Data.Typeable ( typeRep )
import GHC.Stack
data Observer o where
Observer
:: (Show (Server p), Typeable p, Typeable o)
=> (o -> Maybe (Api p 'Asynchronous)) -> Server p -> Observer o
instance Show (Observer o) where
showsPrec d (Observer _ p) = showParen
(d >= 10)
(shows (typeRep (Proxy :: Proxy o)) . showString " observer: " . shows p)
instance Ord (Observer o) where
compare (Observer _ s1) (Observer _ s2) =
compare (s1 ^. fromServer) (s2 ^. fromServer)
instance Eq (Observer o) where
(==) (Observer _ s1) (Observer _ s2) =
(==) (s1 ^. fromServer) (s2 ^. fromServer)
registerObserver
:: ( SetMember Process (Process q) r
, HasCallStack
, Member Interrupts r
, Typeable o
)
=> Observer o
-> Server (ObserverRegistry o)
-> Eff r ()
registerObserver observer observerRegistry =
cast observerRegistry (RegisterObserver observer)
forgetObserver
:: ( SetMember Process (Process q) r
, HasCallStack
, Member Interrupts r
, Typeable o
)
=> Observer o
-> Server (ObserverRegistry o)
-> Eff r ()
forgetObserver observer observerRegistry =
cast observerRegistry (ForgetObserver observer)
data instance Api (Observer o) r where
Observed :: o -> Api (Observer o) 'Asynchronous
handleObservations
:: (HasCallStack, Typeable o, SetMember Process (Process q) r)
=> (o -> Eff r CallbackResult)
-> MessageCallback (Observer o) r
handleObservations k = handleCasts
(\case
Observed o -> k o
)
toObserver :: Typeable o => Server (Observer o) -> Observer o
toObserver = toObserverFor Observed
toObserverFor
:: (Typeable a, Typeable o)
=> (o -> Api a 'Asynchronous)
-> Server a
-> Observer o
toObserverFor wrapper = Observer (Just . wrapper)
data ObserverRegistry o
data instance Api (ObserverRegistry o) r where
RegisterObserver :: Observer o -> Api (ObserverRegistry o) 'Asynchronous
ForgetObserver :: Observer o -> Api (ObserverRegistry o) 'Asynchronous
handleObserverRegistration
:: forall o q r
. ( HasCallStack
, Typeable o
, SetMember Process (Process q) r
, Member (ObserverState o) r
)
=> MessageCallback (ObserverRegistry o) r
handleObserverRegistration = handleCasts
(\case
RegisterObserver ob ->
get @(Observers o)
>>= put
. over observers (Set.insert ob)
>> pure AwaitNext
ForgetObserver ob ->
get @(Observers o)
>>= put
. over observers (Set.delete ob)
>> pure AwaitNext
)
manageObservers :: Eff (ObserverState o ': r) a -> Eff r a
manageObservers = evalState (Observers Set.empty)
data Observers o =
Observers { _observers :: Set (Observer o) }
type ObserverState o = State (Observers o)
observers :: Iso' (Observers o) (Set (Observer o))
observers = iso _observers Observers
observed
:: forall o r q
. ( SetMember Process (Process q) r
, Member (ObserverState o) r
, Member Interrupts r
)
=> o
-> Eff r ()
observed observation = do
os <- view observers <$> get
mapM_ notifySomeObserver os
where
notifySomeObserver (Observer messageFilter receiver) =
traverse_ (cast receiver) (messageFilter observation)