extensible-effects-concurrent-0.32.0: Message passing concurrency as extensible-effect

Safe HaskellNone
LanguageHaskell2010

Control.Eff.Concurrent.Protocol.Observer.Queue

Description

A small process to capture and _share_ observation's by enqueueing them into an STM TBQueue.

Synopsis

Documentation

newtype ObservationQueue a Source #

Contains a TBQueue capturing observations. See observe.

Constructors

ObservationQueue (TBQueue a) 
Instances
Default (Model (ObservationQueue event)) Source # 
Instance details

Defined in Control.Eff.Concurrent.Protocol.Observer.Queue

Methods

def :: Model (ObservationQueue event) #

(Typeable event, Lifted IO q, Member Logs q) => Server (ObservationQueue event) (Processes q) Source # 
Instance details

Defined in Control.Eff.Concurrent.Protocol.Observer.Queue

Associated Types

data StartArgument (ObservationQueue event) :: Type Source #

type Protocol (ObservationQueue event) :: Type Source #

data Model (ObservationQueue event) :: Type Source #

type Settings (ObservationQueue event) :: Type Source #

data StartArgument (ObservationQueue event) Source # 
Instance details

Defined in Control.Eff.Concurrent.Protocol.Observer.Queue

type Protocol (ObservationQueue event) Source # 
Instance details

Defined in Control.Eff.Concurrent.Protocol.Observer.Queue

type Protocol (ObservationQueue event) = Observer event
newtype Model (ObservationQueue event) Source # 
Instance details

Defined in Control.Eff.Concurrent.Protocol.Observer.Queue

type Settings (ObservationQueue event) Source # 
Instance details

Defined in Control.Eff.Concurrent.Protocol.Observer.Queue

type Settings (ObservationQueue event) = ()

observe :: forall event eventSource e q len b. (HasCallStack, HasProcesses e q, FilteredLogging e, FilteredLogging q, FilteredLogging (Processes q), Lifted IO e, Lifted IO q, IsObservable eventSource event, Integral len, Server (ObservationQueue event) (Processes q), Tangible (Pdu eventSource Asynchronous)) => len -> Endpoint eventSource -> Eff (Reader event ': e) b -> Eff e b Source #

Listen to, and capture observations in an ObservationQueue.

Fork an Observer process that runs only while the body expression is executed. Register the observer to the observable process passed to this function.

The captured observations can be obtained by await, tryRead and flush.

The queue size is limited to the given number.

Example

Expand
import qualified Control.Eff.Concurrent.Observer.Queue as OQ

foo =
  do
    observed <- startLink SomeObservable
    OQ.observe 100 observed $ do
      ...
      cast observed DoSomething
      evt <- OQ.await @TestEvent
      ...

Since: 0.28.0

await :: forall event r. (Member (Reader event) r, HasCallStack, MonadIO (Eff r), Typeable event, Member Logs r) => Eff r event Source #

Read queued observations captured and enqueued in the shared ObservationQueue by observe.

This blocks until something was captured or an interrupt or exceptions was thrown. For a non-blocking variant use tryRead or flush.

Since: 0.28.0

tryRead :: forall event r. (Member (Reader event) r, HasCallStack, MonadIO (Eff r), Typeable event, Member Logs r) => Eff r (Maybe event) Source #

Read queued observations captured and enqueued in the shared ObservationQueue by observe.

Return the oldest enqueued observation immediately or Nothing if the queue is empty. Use await to block until an observation is observerRegistryNotify.

Since: 0.28.0

flush :: forall event r. (Member (Reader event) r, HasCallStack, MonadIO (Eff r), Typeable event, Member Logs r) => Eff r [event] Source #

Read at once all currently queued observations captured and enqueued in the shared ObservationQueue by observe.

This returns immediately all currently enqueued observations. For a blocking variant use await.

Since: 0.28.0