extensible-effects-concurrent-0.13.0: Message passing concurrency as extensible-effect

Safe HaskellNone
LanguageHaskell2010

Control.Eff.Concurrent.Api.Observer.Queue

Description

Capture Observations and enqueue then into an STM TBQeueu.

Synopsis

Documentation

data ObservationQueue a Source #

Contains a TBQueue capturing observations received by enqueueObservationsRegistered or enqueueObservations.

readObservationQueue :: forall o r. (Member (ObservationQueueReader o) r, HasCallStack, MonadIO (Eff r), Typeable o, HasLogging IO r) => Eff r (Observation o) Source #

Read queued observations captured by observing a Server that implements an Observable Api using enqueueObservationsRegistered or enqueueObservations. This blocks until the next Observation received. For a non-blocking variant use tryReadObservationQueue or flushObservationQueue.

tryReadObservationQueue :: forall o r. (Member (ObservationQueueReader o) r, HasCallStack, MonadIO (Eff r), Typeable o, HasLogging IO r) => Eff r (Maybe (Observation o)) Source #

Read queued observations captured by observing a Server that implements an Observable Api using enqueueObservationsRegistered or enqueueObservations. Return the next Observation immediately or Nothing if the queue is empty. Use readObservationQueue to block until an observation is observed.

flushObservationQueue :: forall o r. (Member (ObservationQueueReader o) r, HasCallStack, MonadIO (Eff r), Typeable o, HasLogging IO r) => Eff r [Observation o] Source #

Read all currently queued Observations captured by enqueueObservations. This returns immediately all currently enqueued Observations. For a blocking variant use readObservationQueue.

enqueueObservationsRegistered :: forall o r q a. (ServesApi o r q, SetMember Process (Process q) r, Typeable o, Show (Observation o), Observable o, HasLogging IO q, HasLogging IO r, Member Interrupts r, Lifted IO r, HasCallStack, MonadCatch (Eff r)) => SchedulerProxy q -> Int -> Eff (ObservationQueueReader o ': r) a -> Eff r a Source #

Observe a(the) registered Server that implements an Observable Api. Based on enqueueObservations.

enqueueObservations :: forall o r q a. (SetMember Process (Process q) r, Typeable o, Show (Observation o), Observable o, HasLogging IO r, HasLogging IO q, Member Interrupts r, Lifted IO q, HasCallStack, MonadCatch (Eff r)) => SchedulerProxy q -> Server o -> Int -> Eff (ObservationQueueReader o ': r) a -> Eff r a Source #

Observe a Server that implements an Observable Api, the Observations can be obtained by readObservationQueue. All observations are captured up to the queue size limit, such that the first message received will be first message returned by readObservationQueue.

This function captures runtime exceptions and cleans up accordingly.