| Safe Haskell | None |
|---|---|
| Language | Haskell2010 |
Control.Eff.Concurrent
Contents
Description
Erlang style processes with message passing concurrency based on
(more) extensible-effects.
Synopsis
- newtype ProcessId = ProcessId {}
- data SchedulerProxy :: [Type -> Type] -> Type where
- SchedulerProxy :: SchedulerProxy q
- SP :: SchedulerProxy q
- type ConsProcess r = Process r ': r
- data ResumeProcess v where
- ShutdownRequested :: ResumeProcess v
- OnError :: String -> ResumeProcess v
- ResumeWith :: a -> ResumeProcess a
- RetryLastAction :: ResumeProcess v
- data Process (r :: [Type -> Type]) b where
- YieldProcess :: Process r (ResumeProcess ())
- SelfPid :: Process r (ResumeProcess ProcessId)
- Spawn :: Eff (Process r ': r) () -> Process r (ResumeProcess ProcessId)
- Shutdown :: Process r a
- ExitWithError :: String -> Process r b
- RaiseError :: String -> Process r b
- SendShutdown :: ProcessId -> Process r (ResumeProcess Bool)
- SendMessage :: ProcessId -> Dynamic -> Process r (ResumeProcess Bool)
- ReceiveMessage :: Process r (ResumeProcess Dynamic)
- executeAndResume :: forall r q v. (SetMember Process (Process q) r, HasCallStack) => Process q (ResumeProcess v) -> Eff r v
- executeAndCatch :: forall q r v. (SetMember Process (Process q) r, HasCallStack) => SchedulerProxy q -> Eff r (ResumeProcess v) -> Eff r (Either String v)
- thisSchedulerProxy :: Eff (Process r ': r) (SchedulerProxy r)
- yieldProcess :: forall r q. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Eff r ()
- sendMessage :: forall r q. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> ProcessId -> Dynamic -> Eff r ()
- sendMessageChecked :: forall r q. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> ProcessId -> Dynamic -> Eff r Bool
- sendMessageAs :: forall o r q. (HasCallStack, SetMember Process (Process q) r, Typeable o) => SchedulerProxy q -> ProcessId -> o -> Eff r ()
- sendShutdown :: forall r q. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> ProcessId -> Eff r ()
- sendShutdownChecked :: forall r q. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> ProcessId -> Eff r Bool
- spawn :: forall r q. (HasCallStack, SetMember Process (Process q) r) => Eff (Process q ': q) () -> Eff r ProcessId
- spawn_ :: forall r q. (HasCallStack, SetMember Process (Process q) r) => Eff (Process q ': q) () -> Eff r ()
- receiveMessage :: forall r q. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Eff r Dynamic
- receiveMessageAs :: forall a r q. (HasCallStack, Typeable a, SetMember Process (Process q) r) => SchedulerProxy q -> Eff r a
- receiveLoop :: forall r q. (SetMember Process (Process q) r, HasCallStack) => SchedulerProxy q -> (Either (Maybe String) Dynamic -> Eff r ()) -> Eff r ()
- self :: (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Eff r ProcessId
- exitNormally :: forall r q a. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Eff r a
- exitWithError :: forall r q a. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> String -> Eff r a
- raiseError :: forall r q b. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> String -> Eff r b
- catchRaisedError :: forall r q w. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> (String -> Eff r w) -> Eff r w -> Eff r w
- ignoreProcessError :: (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Eff r a -> Eff r (Either String a)
- fromProcessId :: Iso' ProcessId Int
- newtype Server api = Server {}
- data Synchronicity
- data family Api (api :: Type) (reply :: Synchronicity)
- fromServer :: forall api api. Iso (Server api) (Server api) ProcessId ProcessId
- proxyAsServer :: proxy api -> ProcessId -> Server api
- asServer :: forall api. ProcessId -> Server api
- type ServesApi o r q = (Typeable o, SetMember Process (Process q) r, Member (Reader (Server o)) r)
- castChecked :: forall r q o. (HasCallStack, SetMember Process (Process q) r, Typeable o, Typeable (Api o Asynchronous)) => SchedulerProxy q -> Server o -> Api o Asynchronous -> Eff r Bool
- cast :: forall r q o. (HasCallStack, SetMember Process (Process q) r, Typeable o, Typeable (Api o Asynchronous)) => SchedulerProxy q -> Server o -> Api o Asynchronous -> Eff r ()
- call :: forall result api r q. (SetMember Process (Process q) r, Typeable api, Typeable (Api api (Synchronous result)), Typeable result, HasCallStack) => SchedulerProxy q -> Server api -> Api api (Synchronous result) -> Eff r result
- registerServer :: Server o -> Eff (Reader (Server o) ': r) a -> Eff r a
- callRegistered :: (Typeable reply, ServesApi o r q) => SchedulerProxy q -> Api o (Synchronous reply) -> Eff r reply
- callRegisteredA :: forall r q o f reply. (Alternative f, Typeable f, Typeable reply, ServesApi o r q) => SchedulerProxy q -> Api o (Synchronous (f reply)) -> Eff r (f reply)
- castRegistered :: (Typeable o, ServesApi o r q) => SchedulerProxy q -> Api o Asynchronous -> Eff r ()
- data UnhandledRequest
- data ApiHandler p r where
- ApiHandler :: {..} -> ApiHandler p r
- serve :: forall r q p. (Typeable p, SetMember Process (Process q) r, HasCallStack) => SchedulerProxy q -> ApiHandler p r -> Eff r ()
- unhandledCallError :: forall p x r q. (Show (Api p (Synchronous x)), Typeable p, HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Api p (Synchronous x) -> (x -> Eff r ()) -> Eff r ()
- unhandledCastError :: forall p r q. (Show (Api p Asynchronous), Typeable p, HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Api p Asynchronous -> Eff r ()
- defaultTermination :: forall q r. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Maybe String -> Eff r ()
- serveBoth :: forall r q p1 p2. (Typeable p1, Typeable p2, SetMember Process (Process q) r, HasCallStack) => SchedulerProxy q -> ApiHandler p1 r -> ApiHandler p2 r -> Eff r ()
- serve3 :: forall r q p1 p2 p3. (Typeable p1, Typeable p2, Typeable p3, SetMember Process (Process q) r, HasCallStack) => SchedulerProxy q -> ApiHandler p1 r -> ApiHandler p2 r -> ApiHandler p3 r -> Eff r ()
- tryApiHandler :: forall r q p. (Typeable p, SetMember Process (Process q) r, HasCallStack) => SchedulerProxy q -> ApiHandler p r -> Dynamic -> Eff (Exc UnhandledRequest ': r) ()
- catchUnhandled :: forall r a. (Member (Exc UnhandledRequest) r, HasCallStack) => Eff r a -> (Dynamic -> Eff r a) -> Eff r a
- ensureAllHandled :: forall r q. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Eff (Exc UnhandledRequest ': r) () -> Eff r ()
- requestFromDynamic :: forall r a. (HasCallStack, Typeable a, Member (Exc UnhandledRequest) r) => Dynamic -> Eff r a
- exitUnhandled :: forall r q. (SetMember Process (Process q) r, HasCallStack) => SchedulerProxy q -> Dynamic -> Eff r ()
- data CallbackObserver o
- data Observers o
- data SomeObserver o where
- SomeObserver :: (Show (Server p), Typeable p, Observer p o) => Server p -> SomeObserver o
- class (Typeable o, Typeable (Observation o)) => Observable o where
- data Observation o
- class (Typeable p, Observable o) => Observer p o where
- notifyObserver :: (SetMember Process (Process q) r, Observable o, Observer p o, HasCallStack) => SchedulerProxy q -> Server p -> Server o -> Observation o -> Eff r ()
- registerObserver :: (SetMember Process (Process q) r, Observable o, Observer p o, HasCallStack) => SchedulerProxy q -> Server p -> Server o -> Eff r ()
- forgetObserver :: (SetMember Process (Process q) r, Observable o, Observer p o) => SchedulerProxy q -> Server p -> Server o -> Eff r ()
- notifySomeObserver :: (SetMember Process (Process q) r, Observable o, HasCallStack) => SchedulerProxy q -> Server o -> Observation o -> SomeObserver o -> Eff r ()
- manageObservers :: Eff (State (Observers o) ': r) a -> Eff r a
- addObserver :: (SetMember Process (Process q) r, Member (State (Observers o)) r, Observable o) => SomeObserver o -> Eff r ()
- removeObserver :: (SetMember Process (Process q) r, Member (State (Observers o)) r, Observable o) => SomeObserver o -> Eff r ()
- notifyObservers :: forall o r q. (Observable o, SetMember Process (Process q) r, Member (State (Observers o)) r) => SchedulerProxy q -> Observation o -> Eff r ()
- spawnCallbackObserver :: forall o r q. (SetMember Process (Process q) r, Typeable o, Show (Observation o), Observable o, Member (Logs String) q) => SchedulerProxy q -> (Server o -> Observation o -> Eff (Process q ': q) Bool) -> Eff r (Server (CallbackObserver o))
- spawnLoggingObserver :: forall o r q. (SetMember Process (Process q) r, Typeable o, Show (Observation o), Observable o, Member (Logs String) q) => SchedulerProxy q -> Eff r (Server (CallbackObserver o))
- type SchedulerIO = '[Reader SchedulerVar, Logs String, Lift IO]
- type HasSchedulerIO r = (HasCallStack, SetMember Lift (Lift IO) r, Member (Logs String) r, Member (Reader SchedulerVar) r)
- data SchedulerError
- forkIoScheduler :: SchedulerProxy SchedulerIO
- schedule :: Eff (ConsProcess SchedulerIO) () -> LogChannel String -> IO ()
- defaultMain :: Eff (ConsProcess SchedulerIO) () -> IO ()
- defaultMainWithLogChannel :: LogChannel String -> Eff (ConsProcess SchedulerIO) () -> IO ()
- schedulePure :: Eff (ConsProcess '[]) a -> Either String a
- module Control.Eff.Log
- module Control.Eff.Loop
Concurrent Processes with Message Passing Concurrency
Each process is identified by a single process id, that stays constant throughout the life cycle of a process. Also, message sending relies on these values to address messages to processes.
Constructors
| ProcessId | |
Fields | |
Instances
data SchedulerProxy :: [Type -> Type] -> Type where Source #
Every function for Process things needs such a proxy value
for the low-level effect list, i.e. the effects identified by
r in , this might be dependent on the
scheduler implementation.Process r : r
Constructors
| SchedulerProxy :: SchedulerProxy q | Tell the typechecker what effects we have below |
| SP :: SchedulerProxy q | Like |
type ConsProcess r = Process r ': r Source #
Cons Process onto a list of effects.
data ResumeProcess v where Source #
Every Process action returns it's actual result wrapped in this type. It
will allow to signal errors as well as pass on normal results such as
incoming messages.
Constructors
| ShutdownRequested :: ResumeProcess v | The process is required to exit. |
| OnError :: String -> ResumeProcess v | The process is required to exit from an error condition, that cannot be recovered from. |
| ResumeWith :: a -> ResumeProcess a | The process may resume to do work, using the given result. |
| RetryLastAction :: ResumeProcess v | This indicates that the action did not complete, and maybe retried |
Instances
data Process (r :: [Type -> Type]) b where Source #
The process effect is the basis for message passing concurrency. This effect describes an interface for concurrent, communicating isolated processes identified uniquely by a process-id.
Processes can raise exceptions that can be caught, exit gracefully or with an error, or be killed by other processes, with the option of ignoring the shutdown request.
Process Scheduling is implemented in different modules. All scheduler implementations should follow some basic rules:
- fair scheduling
- sending a message does not block
- receiving a message does block
- spawning a child blocks only a very moment
- a newly spawned process shall be scheduled before the parent process after
- the spawn
- when the first process exists, all process should be killed immediately
Constructors
| YieldProcess :: Process r (ResumeProcess ()) | In cooperative schedulers, this will give processing time to the scheduler. Every other operation implicitly serves the same purpose. |
| SelfPid :: Process r (ResumeProcess ProcessId) | Return the current |
| Spawn :: Eff (Process r ': r) () -> Process r (ResumeProcess ProcessId) | Start a new process, the new process will execute an effect, the function
will return immediately with a |
| Shutdown :: Process r a | Process exit, this is the same as if the function that was applied to a spawn function returned. |
| ExitWithError :: String -> Process r b | Exit the process due to an error, this cannot be caught. |
| RaiseError :: String -> Process r b | Raise an error, that can be handled. |
| SendShutdown :: ProcessId -> Process r (ResumeProcess Bool) | Request that another a process exits. The targeted process is interrupted
and gets a |
| SendMessage :: ProcessId -> Dynamic -> Process r (ResumeProcess Bool) | Send a message to a process addressed by the |
| ReceiveMessage :: Process r (ResumeProcess Dynamic) | Receive a message. This should block until an a message was received. The
pure function may convert the incoming message into something, and the
result is returned as |
executeAndResume :: forall r q v. (SetMember Process (Process q) r, HasCallStack) => Process q (ResumeProcess v) -> Eff r v Source #
Execute a Process action and resume the process, retry the action or exit
the process when a shutdown was requested.
executeAndCatch :: forall q r v. (SetMember Process (Process q) r, HasCallStack) => SchedulerProxy q -> Eff r (ResumeProcess v) -> Eff r (Either String v) Source #
Execute a and action and resume the process, retry the action, shutdown the process or return an error.
thisSchedulerProxy :: Eff (Process r ': r) (SchedulerProxy r) Source #
Return a SchedulerProxy for a Process effect.
yieldProcess :: forall r q. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Eff r () Source #
Use executeAndResume to execute YieldProcess. Refer to YieldProcess
for more information.
sendMessage :: forall r q. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> ProcessId -> Dynamic -> Eff r () Source #
Send a message to a process addressed by the ProcessId.
See SendMessage.
sendMessageChecked :: forall r q. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> ProcessId -> Dynamic -> Eff r Bool Source #
Send a message to a process addressed by the ProcessId.
See SendMessage. Return True if the process existed.
I you don't care, just sendMessage instead.
sendMessageAs :: forall o r q. (HasCallStack, SetMember Process (Process q) r, Typeable o) => SchedulerProxy q -> ProcessId -> o -> Eff r () Source #
Send a message to a process addressed by the ProcessId.
See SendMessage.
sendShutdown :: forall r q. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> ProcessId -> Eff r () Source #
Exit a process addressed by the ProcessId.
See SendShutdown.
sendShutdownChecked :: forall r q. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> ProcessId -> Eff r Bool Source #
Like sendShutdown, but also return True iff the process to exit exists.
spawn :: forall r q. (HasCallStack, SetMember Process (Process q) r) => Eff (Process q ': q) () -> Eff r ProcessId Source #
Start a new process, the new process will execute an effect, the function
will return immediately with a ProcessId.
spawn_ :: forall r q. (HasCallStack, SetMember Process (Process q) r) => Eff (Process q ': q) () -> Eff r () Source #
Like spawn but return ().
receiveMessage :: forall r q. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Eff r Dynamic Source #
Block until a message was received.
receiveMessageAs :: forall a r q. (HasCallStack, Typeable a, SetMember Process (Process q) r) => SchedulerProxy q -> Eff r a Source #
Receive and cast the message to some Typeable instance.
receiveLoop :: forall r q. (SetMember Process (Process q) r, HasCallStack) => SchedulerProxy q -> (Either (Maybe String) Dynamic -> Eff r ()) -> Eff r () Source #
Enter a loop to receive messages and pass them to a callback, until the
function returns False.
self :: (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Eff r ProcessId Source #
Returns the ProcessId of the current process.
exitNormally :: forall r q a. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Eff r a Source #
Exit the process.
exitWithError :: forall r q a. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> String -> Eff r a Source #
Exit the process with an error.
raiseError :: forall r q b. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> String -> Eff r b Source #
Thrown an error, can be caught by catchRaisedError.
catchRaisedError :: forall r q w. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> (String -> Eff r w) -> Eff r w -> Eff r w Source #
Catch and handle an error raised by raiseError. Works independent of the
handler implementation.
ignoreProcessError :: (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Eff r a -> Eff r (Either String a) Source #
Like catchRaisedError it catches raiseError, but instead of invoking a
user provided handler, the result is wrapped into an Either.
Data Types and Functions for APIs (aka Protocols)
Constructors
| Server | |
Fields | |
Instances
| Eq (Server api) Source # | |
| Ord (Server api) Source # | |
Defined in Control.Eff.Concurrent.Api | |
| Read (Server api) Source # | |
| Typeable api => Show (Server api) Source # | |
data Synchronicity Source #
The (promoted) constructors of this type specify (at the type level) the
reply behavior of a specific constructor of an Api instance.
Constructors
| Synchronous Type | Specify that handling a request is a blocking operation
with a specific return type, e.g. |
| Asynchronous | Non-blocking, asynchronous, request handling |
data family Api (api :: Type) (reply :: Synchronicity) Source #
This data family defines an API, a communication interface description between at least two processes. The processes act as servers or client(s) regarding a specific instance of this type.
The first parameter is usually a user defined phantom type that identifies
the Api instance.
The second parameter specifies if a specific constructor of an (GADT-like)
Api instance is Synchronous, i.e. returns a result and blocks the caller
or if it is Asynchronous
Example:
data BookShop deriving Typeable
data instance Api BookShop r where
RentBook :: BookId -> Api BookShop ('Synchronous (Either RentalError RentalId))
BringBack :: RentalId -> Api BookShop 'Asynchronous
type BookId = Int
type RentalId = Int
type RentalError = String
Instances
| Show (Observation o) => Show (Api (CallbackObserver o) r) # | |
Defined in Control.Eff.Concurrent.Api.Observer | |
| data Api (CallbackObserver o) r Source # | |
Defined in Control.Eff.Concurrent.Api.Observer | |
proxyAsServer :: proxy api -> ProcessId -> Server api Source #
Client Functions for Consuming APIs
type ServesApi o r q = (Typeable o, SetMember Process (Process q) r, Member (Reader (Server o)) r) Source #
castChecked :: forall r q o. (HasCallStack, SetMember Process (Process q) r, Typeable o, Typeable (Api o Asynchronous)) => SchedulerProxy q -> Server o -> Api o Asynchronous -> Eff r Bool Source #
Send an Api request that has no return value and return as fast as
possible. The type signature enforces that the corresponding Api clause is
Asynchronous. Return True if the message was sent to the process. Note
that this is totally not the same as that the request was successfully
handled. If that is important, use call instead.
cast :: forall r q o. (HasCallStack, SetMember Process (Process q) r, Typeable o, Typeable (Api o Asynchronous)) => SchedulerProxy q -> Server o -> Api o Asynchronous -> Eff r () Source #
Send an Api request that has no return value and return as fast as
possible. The type signature enforces that the corresponding Api clause is
Asynchronous.
call :: forall result api r q. (SetMember Process (Process q) r, Typeable api, Typeable (Api api (Synchronous result)), Typeable result, HasCallStack) => SchedulerProxy q -> Server api -> Api api (Synchronous result) -> Eff r result Source #
Send an Api request and wait for the server to return a result value.
The type signature enforces that the corresponding Api clause is
Synchronous.
registerServer :: Server o -> Eff (Reader (Server o) ': r) a -> Eff r a Source #
Run a reader effect that contains the one server handling a specific
Api instance.
callRegistered :: (Typeable reply, ServesApi o r q) => SchedulerProxy q -> Api o (Synchronous reply) -> Eff r reply Source #
Like call but take the Server from the reader provided by
registerServer.
callRegisteredA :: forall r q o f reply. (Alternative f, Typeable f, Typeable reply, ServesApi o r q) => SchedulerProxy q -> Api o (Synchronous (f reply)) -> Eff r (f reply) Source #
Like callRegistered but also catch errors raised if e.g. the server
crashed. By allowing Alternative instances to contain the reply,
application level errors can be combined with errors rising from inter
process communication.
castRegistered :: (Typeable o, ServesApi o r q) => SchedulerProxy q -> Api o Asynchronous -> Eff r () Source #
Like cast but take the Server from the reader provided by
registerServer.
Server Functions for Providing APIs
data UnhandledRequest Source #
An exception that is used by the mechanism that chains together multiple
different ApiHandler allowing a single process to implement multiple
Apis. This exception is thrown by requestFromDynamic. This
exception can be caught with catchUnhandled, this way, several
distinct ApiHandler can be tried until one fits or until the
exitUnhandled is invoked.
data ApiHandler p r where Source #
A record of callbacks, handling requests sent to a server Process, all
belonging to a specific Api family instance.
Constructors
| ApiHandler :: {..} -> ApiHandler p r | |
Fields
| |
serve :: forall r q p. (Typeable p, SetMember Process (Process q) r, HasCallStack) => SchedulerProxy q -> ApiHandler p r -> Eff r () Source #
Receive and process incoming requests until the process exits, using an ApiHandler.
unhandledCallError :: forall p x r q. (Show (Api p (Synchronous x)), Typeable p, HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Api p (Synchronous x) -> (x -> Eff r ()) -> Eff r () Source #
A default handler to use in _handleCall in ApiHandler. It will call
raiseError with a nice error message.
unhandledCastError :: forall p r q. (Show (Api p Asynchronous), Typeable p, HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Api p Asynchronous -> Eff r () Source #
A default handler to use in _handleCast in ApiHandler. It will call
raiseError with a nice error message.
defaultTermination :: forall q r. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Maybe String -> Eff r () Source #
Exit the process either normally of the error message is Nothing
or with exitWithError otherwise.
serveBoth :: forall r q p1 p2. (Typeable p1, Typeable p2, SetMember Process (Process q) r, HasCallStack) => SchedulerProxy q -> ApiHandler p1 r -> ApiHandler p2 r -> Eff r () Source #
serve two ApiHandlers at once. The first handler is used for
termination handling.
serve3 :: forall r q p1 p2 p3. (Typeable p1, Typeable p2, Typeable p3, SetMember Process (Process q) r, HasCallStack) => SchedulerProxy q -> ApiHandler p1 r -> ApiHandler p2 r -> ApiHandler p3 r -> Eff r () Source #
serve three ApiHandlers at once. The first handler is used for
termination handling.
tryApiHandler :: forall r q p. (Typeable p, SetMember Process (Process q) r, HasCallStack) => SchedulerProxy q -> ApiHandler p r -> Dynamic -> Eff (Exc UnhandledRequest ': r) () Source #
The basic building block of the combination of ApiHandlers is this
function, which can not only be passed to receiveLoop, but also throws an
UnhandledRequest exception if requestFromDynamic failed, such that multiple
invokation of this function for different ApiHandlers can be tried, by using catchUnhandled.
tryApiHandler px handlers1 message `catchUnhandled` tryApiHandler px handlers2 `catchUnhandled` tryApiHandler px handlers3
catchUnhandled :: forall r a. (Member (Exc UnhandledRequest) r, HasCallStack) => Eff r a -> (Dynamic -> Eff r a) -> Eff r a Source #
If requestFromDynamic failes to cast the message to a Request for a
certain ApiHandler it throws an UnhandledRequest exception. That
exception is caught by this function and the raw message is given to the
handler function. This is the basis for chaining ApiHandlers.
ensureAllHandled :: forall r q. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Eff (Exc UnhandledRequest ': r) () -> Eff r () Source #
Catch UnhandledRequests and terminate the process with an error, if necessary.
requestFromDynamic :: forall r a. (HasCallStack, Typeable a, Member (Exc UnhandledRequest) r) => Dynamic -> Eff r a Source #
Cast a Dynamic value, and if that fails, throw an UnhandledRequest
error.
exitUnhandled :: forall r q. (SetMember Process (Process q) r, HasCallStack) => SchedulerProxy q -> Dynamic -> Eff r () Source #
If an incoming message could not be casted to a Request corresponding to
an ApiHandler (e.g. with requestFromDynamic) one should use this function to
exit the process with a corresponding error.
Observer Functions for Events and Event Listener
data CallbackObserver o Source #
An Observer that schedules the observations to an effectful callback.
Instances
| Show (Observation o) => Show (Api (CallbackObserver o) r) Source # | |
Defined in Control.Eff.Concurrent.Api.Observer | |
| Observable o => Observer (CallbackObserver o) o Source # | |
Defined in Control.Eff.Concurrent.Api.Observer Methods observationMessage :: Server o -> Observation o -> Api (CallbackObserver o) Asynchronous Source # | |
| data Api (CallbackObserver o) r Source # | |
Defined in Control.Eff.Concurrent.Api.Observer | |
data SomeObserver o where Source #
An existential wrapper around a Server of an Observer.
Needed to support different types of observers to observe the
same Observable in a general fashion.
Constructors
| SomeObserver :: (Show (Server p), Typeable p, Observer p o) => Server p -> SomeObserver o |
Instances
| Eq (SomeObserver o) Source # | |
Defined in Control.Eff.Concurrent.Api.Observer Methods (==) :: SomeObserver o -> SomeObserver o -> Bool # (/=) :: SomeObserver o -> SomeObserver o -> Bool # | |
| Ord (SomeObserver o) Source # | |
Defined in Control.Eff.Concurrent.Api.Observer Methods compare :: SomeObserver o -> SomeObserver o -> Ordering # (<) :: SomeObserver o -> SomeObserver o -> Bool # (<=) :: SomeObserver o -> SomeObserver o -> Bool # (>) :: SomeObserver o -> SomeObserver o -> Bool # (>=) :: SomeObserver o -> SomeObserver o -> Bool # max :: SomeObserver o -> SomeObserver o -> SomeObserver o # min :: SomeObserver o -> SomeObserver o -> SomeObserver o # | |
| Show (SomeObserver o) Source # | |
Defined in Control.Eff.Concurrent.Api.Observer Methods showsPrec :: Int -> SomeObserver o -> ShowS # show :: SomeObserver o -> String # showList :: [SomeObserver o] -> ShowS # | |
class (Typeable o, Typeable (Observation o)) => Observable o where Source #
Minimal complete definition
Methods
registerObserverMessage :: SomeObserver o -> Api o Asynchronous Source #
Return the Api value for the cast_ that registeres an observer
forgetObserverMessage :: SomeObserver o -> Api o Asynchronous Source #
Return the Api value for the cast_ that de-registeres an observer
class (Typeable p, Observable o) => Observer p o where Source #
An Api index that support observation of the
another Api that is Observable.
Minimal complete definition
Methods
observationMessage :: Server o -> Observation o -> Api p Asynchronous Source #
Wrap the Observation and the ProcessId (i.e. the Server)
that caused the observation into a Api value that the
Observable understands.
Instances
| Observable o => Observer (CallbackObserver o) o Source # | |
Defined in Control.Eff.Concurrent.Api.Observer Methods observationMessage :: Server o -> Observation o -> Api (CallbackObserver o) Asynchronous Source # | |
notifyObserver :: (SetMember Process (Process q) r, Observable o, Observer p o, HasCallStack) => SchedulerProxy q -> Server p -> Server o -> Observation o -> Eff r () Source #
Send an Observation to an Observer
registerObserver :: (SetMember Process (Process q) r, Observable o, Observer p o, HasCallStack) => SchedulerProxy q -> Server p -> Server o -> Eff r () Source #
Send the registerObserverMessage
forgetObserver :: (SetMember Process (Process q) r, Observable o, Observer p o) => SchedulerProxy q -> Server p -> Server o -> Eff r () Source #
Send the forgetObserverMessage
notifySomeObserver :: (SetMember Process (Process q) r, Observable o, HasCallStack) => SchedulerProxy q -> Server o -> Observation o -> SomeObserver o -> Eff r () Source #
Send an Observation to SomeObserver.
manageObservers :: Eff (State (Observers o) ': r) a -> Eff r a Source #
Keep track of registered Observers Observers can be added and removed,
and an Observation can be sent to all registerd observers at once.
addObserver :: (SetMember Process (Process q) r, Member (State (Observers o)) r, Observable o) => SomeObserver o -> Eff r () Source #
Add an Observer to the Observers managed by manageObservers.
removeObserver :: (SetMember Process (Process q) r, Member (State (Observers o)) r, Observable o) => SomeObserver o -> Eff r () Source #
Delete an Observer from the Observers managed by manageObservers.
notifyObservers :: forall o r q. (Observable o, SetMember Process (Process q) r, Member (State (Observers o)) r) => SchedulerProxy q -> Observation o -> Eff r () Source #
Send an Observation to all SomeObservers in the Observers state.
spawnCallbackObserver :: forall o r q. (SetMember Process (Process q) r, Typeable o, Show (Observation o), Observable o, Member (Logs String) q) => SchedulerProxy q -> (Server o -> Observation o -> Eff (Process q ': q) Bool) -> Eff r (Server (CallbackObserver o)) Source #
Start a new process for an Observer that schedules
all observations to an effectful callback.
spawnLoggingObserver :: forall o r q. (SetMember Process (Process q) r, Typeable o, Show (Observation o), Observable o, Member (Logs String) q) => SchedulerProxy q -> Eff r (Server (CallbackObserver o)) Source #
Use spawnCallbackObserver to create a universal logging observer,
using the Show instance of the Observation.
| Start a new process for an Observer that schedules
all observations to an effectful callback.
Since: extensible-effects-concurrent-0.3.0.0
Scheduler Process Effect Handler
Concurrent Scheduler
type SchedulerIO = '[Reader SchedulerVar, Logs String, Lift IO] Source #
The concrete list of Effects for this scheduler implementation.
See HasSchedulerIO
type HasSchedulerIO r = (HasCallStack, SetMember Lift (Lift IO) r, Member (Logs String) r, Member (Reader SchedulerVar) r) Source #
data SchedulerError Source #
A sum-type with errors that can occur when scheduleing messages.
Constructors
| ProcessNotFound ProcessId | No process info was found for a |
| ProcessRaisedError String | A process called |
| ProcessExitError String | A process called |
| ProcessShuttingDown | A process exits. |
| SchedulerShuttingDown | An action was not performed while the scheduler was exiting. |
Instances
| Show SchedulerError Source # | |
Defined in Control.Eff.Concurrent.Process.ForkIOScheduler Methods showsPrec :: Int -> SchedulerError -> ShowS # show :: SchedulerError -> String # showList :: [SchedulerError] -> ShowS # | |
| Exception SchedulerError Source # | |
Defined in Control.Eff.Concurrent.Process.ForkIOScheduler Methods toException :: SchedulerError -> SomeException # | |
schedule :: Eff (ConsProcess SchedulerIO) () -> LogChannel String -> IO () Source #
This is the main entry point to running a message passing concurrency
application. This function takes a Process on top of the SchedulerIO
effect and a LogChannel for concurrent logging.
defaultMain :: Eff (ConsProcess SchedulerIO) () -> IO () Source #
Start the message passing concurrency system then execute a Process on
top of SchedulerIO effect. All logging is sent to standard output.
defaultMainWithLogChannel :: LogChannel String -> Eff (ConsProcess SchedulerIO) () -> IO () Source #
Start the message passing concurrency system then execute a Process on
top of SchedulerIO effect. All logging is sent to standard output.
Single Threaded Scheduler
schedulePure :: Eff (ConsProcess '[]) a -> Either String a Source #
Like schedule but pure. The yield effect is just return ().
schedulePure == runIdentity . scheduleM (Identity . run) (return ())
Since: extensible-effects-concurrent-0.3.0.2
Utilities
Logging Effect
module Control.Eff.Log
Preventing Space Leaks
module Control.Eff.Loop