Safe Haskell | None |
---|---|
Language | Haskell2010 |
Erlang style processes with message passing concurrency based on
(more) extensible-effects
.
Synopsis
- newtype ProcessId = ProcessId {}
- data SchedulerProxy :: [Type -> Type] -> Type where
- SchedulerProxy :: SchedulerProxy q
- SP :: SchedulerProxy q
- type ConsProcess r = Process r ': r
- data ResumeProcess v where
- ShutdownRequested :: ResumeProcess v
- OnError :: String -> ResumeProcess v
- ResumeWith :: a -> ResumeProcess a
- RetryLastAction :: ResumeProcess v
- data Process (r :: [Type -> Type]) b where
- YieldProcess :: Process r (ResumeProcess ())
- SelfPid :: Process r (ResumeProcess ProcessId)
- Spawn :: Eff (Process r ': r) () -> Process r (ResumeProcess ProcessId)
- Shutdown :: Process r a
- ExitWithError :: String -> Process r b
- RaiseError :: String -> Process r b
- SendShutdown :: ProcessId -> Process r (ResumeProcess Bool)
- SendMessage :: ProcessId -> Dynamic -> Process r (ResumeProcess Bool)
- ReceiveMessage :: Process r (ResumeProcess Dynamic)
- executeAndResume :: forall r q v. (SetMember Process (Process q) r, HasCallStack) => Process q (ResumeProcess v) -> Eff r v
- executeAndCatch :: forall q r v. (SetMember Process (Process q) r, HasCallStack) => SchedulerProxy q -> Eff r (ResumeProcess v) -> Eff r (Either String v)
- thisSchedulerProxy :: Eff (Process r ': r) (SchedulerProxy r)
- yieldProcess :: forall r q. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Eff r ()
- sendMessage :: forall r q. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> ProcessId -> Dynamic -> Eff r ()
- sendMessageChecked :: forall r q. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> ProcessId -> Dynamic -> Eff r Bool
- sendMessageAs :: forall o r q. (HasCallStack, SetMember Process (Process q) r, Typeable o) => SchedulerProxy q -> ProcessId -> o -> Eff r ()
- sendShutdown :: forall r q. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> ProcessId -> Eff r ()
- sendShutdownChecked :: forall r q. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> ProcessId -> Eff r Bool
- spawn :: forall r q. (HasCallStack, SetMember Process (Process q) r) => Eff (Process q ': q) () -> Eff r ProcessId
- spawn_ :: forall r q. (HasCallStack, SetMember Process (Process q) r) => Eff (Process q ': q) () -> Eff r ()
- receiveMessage :: forall r q. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Eff r Dynamic
- receiveMessageAs :: forall a r q. (HasCallStack, Typeable a, SetMember Process (Process q) r) => SchedulerProxy q -> Eff r a
- receiveLoop :: forall r q. (SetMember Process (Process q) r, HasCallStack) => SchedulerProxy q -> (Either (Maybe String) Dynamic -> Eff r ()) -> Eff r ()
- self :: (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Eff r ProcessId
- exitNormally :: forall r q a. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Eff r a
- exitWithError :: forall r q a. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> String -> Eff r a
- raiseError :: forall r q b. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> String -> Eff r b
- catchRaisedError :: forall r q w. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> (String -> Eff r w) -> Eff r w -> Eff r w
- ignoreProcessError :: (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Eff r a -> Eff r (Either String a)
- fromProcessId :: Iso' ProcessId Int
- newtype Server api = Server {}
- data Synchronicity
- data family Api (api :: Type) (reply :: Synchronicity)
- fromServer :: forall api api. Iso (Server api) (Server api) ProcessId ProcessId
- proxyAsServer :: proxy api -> ProcessId -> Server api
- asServer :: forall api. ProcessId -> Server api
- type ServesApi o r q = (Typeable o, SetMember Process (Process q) r, Member (Reader (Server o)) r)
- castChecked :: forall r q o. (HasCallStack, SetMember Process (Process q) r, Typeable o, Typeable (Api o Asynchronous)) => SchedulerProxy q -> Server o -> Api o Asynchronous -> Eff r Bool
- cast :: forall r q o. (HasCallStack, SetMember Process (Process q) r, Typeable o, Typeable (Api o Asynchronous)) => SchedulerProxy q -> Server o -> Api o Asynchronous -> Eff r ()
- call :: forall result api r q. (SetMember Process (Process q) r, Typeable api, Typeable (Api api (Synchronous result)), Typeable result, HasCallStack) => SchedulerProxy q -> Server api -> Api api (Synchronous result) -> Eff r result
- registerServer :: Server o -> Eff (Reader (Server o) ': r) a -> Eff r a
- callRegistered :: (Typeable reply, ServesApi o r q) => SchedulerProxy q -> Api o (Synchronous reply) -> Eff r reply
- callRegisteredA :: forall r q o f reply. (Alternative f, Typeable f, Typeable reply, ServesApi o r q) => SchedulerProxy q -> Api o (Synchronous (f reply)) -> Eff r (f reply)
- castRegistered :: (Typeable o, ServesApi o r q) => SchedulerProxy q -> Api o Asynchronous -> Eff r ()
- data UnhandledRequest
- data ApiHandler p r where
- ApiHandler :: {..} -> ApiHandler p r
- serve :: forall r q p. (Typeable p, SetMember Process (Process q) r, HasCallStack) => SchedulerProxy q -> ApiHandler p r -> Eff r ()
- unhandledCallError :: forall p x r q. (Show (Api p (Synchronous x)), Typeable p, HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Api p (Synchronous x) -> (x -> Eff r ()) -> Eff r ()
- unhandledCastError :: forall p r q. (Show (Api p Asynchronous), Typeable p, HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Api p Asynchronous -> Eff r ()
- defaultTermination :: forall q r. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Maybe String -> Eff r ()
- serveBoth :: forall r q p1 p2. (Typeable p1, Typeable p2, SetMember Process (Process q) r, HasCallStack) => SchedulerProxy q -> ApiHandler p1 r -> ApiHandler p2 r -> Eff r ()
- serve3 :: forall r q p1 p2 p3. (Typeable p1, Typeable p2, Typeable p3, SetMember Process (Process q) r, HasCallStack) => SchedulerProxy q -> ApiHandler p1 r -> ApiHandler p2 r -> ApiHandler p3 r -> Eff r ()
- tryApiHandler :: forall r q p. (Typeable p, SetMember Process (Process q) r, HasCallStack) => SchedulerProxy q -> ApiHandler p r -> Dynamic -> Eff (Exc UnhandledRequest ': r) ()
- catchUnhandled :: forall r a. (Member (Exc UnhandledRequest) r, HasCallStack) => Eff r a -> (Dynamic -> Eff r a) -> Eff r a
- ensureAllHandled :: forall r q. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Eff (Exc UnhandledRequest ': r) () -> Eff r ()
- requestFromDynamic :: forall r a. (HasCallStack, Typeable a, Member (Exc UnhandledRequest) r) => Dynamic -> Eff r a
- exitUnhandled :: forall r q. (SetMember Process (Process q) r, HasCallStack) => SchedulerProxy q -> Dynamic -> Eff r ()
- data CallbackObserver o
- data Observers o
- data SomeObserver o where
- SomeObserver :: (Show (Server p), Typeable p, Observer p o) => Server p -> SomeObserver o
- class (Typeable o, Typeable (Observation o)) => Observable o where
- data Observation o
- class (Typeable p, Observable o) => Observer p o where
- notifyObserver :: (SetMember Process (Process q) r, Observable o, Observer p o, HasCallStack) => SchedulerProxy q -> Server p -> Server o -> Observation o -> Eff r ()
- registerObserver :: (SetMember Process (Process q) r, Observable o, Observer p o, HasCallStack) => SchedulerProxy q -> Server p -> Server o -> Eff r ()
- forgetObserver :: (SetMember Process (Process q) r, Observable o, Observer p o) => SchedulerProxy q -> Server p -> Server o -> Eff r ()
- notifySomeObserver :: (SetMember Process (Process q) r, Observable o, HasCallStack) => SchedulerProxy q -> Server o -> Observation o -> SomeObserver o -> Eff r ()
- manageObservers :: Eff (State (Observers o) ': r) a -> Eff r a
- addObserver :: (SetMember Process (Process q) r, Member (State (Observers o)) r, Observable o) => SomeObserver o -> Eff r ()
- removeObserver :: (SetMember Process (Process q) r, Member (State (Observers o)) r, Observable o) => SomeObserver o -> Eff r ()
- notifyObservers :: forall o r q. (Observable o, SetMember Process (Process q) r, Member (State (Observers o)) r) => SchedulerProxy q -> Observation o -> Eff r ()
- spawnCallbackObserver :: forall o r q. (SetMember Process (Process q) r, Typeable o, Show (Observation o), Observable o, Member (Logs LogMessage) q) => SchedulerProxy q -> (Server o -> Observation o -> Eff (Process q ': q) Bool) -> Eff r (Server (CallbackObserver o))
- spawnLoggingObserver :: forall o r q. (SetMember Process (Process q) r, Typeable o, Show (Observation o), Observable o, Member (Logs LogMessage) q) => SchedulerProxy q -> Eff r (Server (CallbackObserver o))
- type SchedulerIO = '[Reader SchedulerVar, Logs LogMessage, Lift IO]
- type HasSchedulerIO r = (HasCallStack, SetMember Lift (Lift IO) r, Member (Logs LogMessage) r, Member (Reader SchedulerVar) r)
- data SchedulerError
- forkIoScheduler :: SchedulerProxy SchedulerIO
- schedule :: Eff (ConsProcess SchedulerIO) () -> LogChannel LogMessage -> IO ()
- defaultMain :: Eff (ConsProcess SchedulerIO) () -> IO ()
- defaultMainWithLogChannel :: LogChannel LogMessage -> Eff (ConsProcess SchedulerIO) () -> IO ()
- schedulePure :: Eff (ConsProcess '[]) a -> Either String a
- module Control.Eff.Log
- module Control.Eff.Loop
Concurrent Processes with Message Passing Concurrency
Each process is identified by a single process id, that stays constant throughout the life cycle of a process. Also, message sending relies on these values to address messages to processes.
Instances
Bounded ProcessId Source # | |
Enum ProcessId Source # | |
Defined in Control.Eff.Concurrent.Process succ :: ProcessId -> ProcessId # pred :: ProcessId -> ProcessId # fromEnum :: ProcessId -> Int # enumFrom :: ProcessId -> [ProcessId] # enumFromThen :: ProcessId -> ProcessId -> [ProcessId] # enumFromTo :: ProcessId -> ProcessId -> [ProcessId] # enumFromThenTo :: ProcessId -> ProcessId -> ProcessId -> [ProcessId] # | |
Eq ProcessId Source # | |
Integral ProcessId Source # | |
Defined in Control.Eff.Concurrent.Process | |
Num ProcessId Source # | |
Defined in Control.Eff.Concurrent.Process | |
Ord ProcessId Source # | |
Defined in Control.Eff.Concurrent.Process | |
Read ProcessId Source # | |
Real ProcessId Source # | |
Defined in Control.Eff.Concurrent.Process toRational :: ProcessId -> Rational # | |
Show ProcessId Source # | |
data SchedulerProxy :: [Type -> Type] -> Type where Source #
Every function for Process
things needs such a proxy value
for the low-level effect list, i.e. the effects identified by
r
in
, this might be dependent on the
scheduler implementation.Process
r : r
SchedulerProxy :: SchedulerProxy q | Tell the typechecker what effects we have below |
SP :: SchedulerProxy q | Like |
type ConsProcess r = Process r ': r Source #
Cons Process
onto a list of effects.
data ResumeProcess v where Source #
Every Process
action returns it's actual result wrapped in this type. It
will allow to signal errors as well as pass on normal results such as
incoming messages.
ShutdownRequested :: ResumeProcess v | The process is required to exit. |
OnError :: String -> ResumeProcess v | The process is required to exit from an error condition, that cannot be recovered from. |
ResumeWith :: a -> ResumeProcess a | The process may resume to do work, using the given result. |
RetryLastAction :: ResumeProcess v | This indicates that the action did not complete, and maybe retried |
Instances
data Process (r :: [Type -> Type]) b where Source #
The process effect is the basis for message passing concurrency. This effect describes an interface for concurrent, communicating isolated processes identified uniquely by a process-id.
Processes can raise exceptions that can be caught, exit gracefully or with an error, or be killed by other processes, with the option of ignoring the shutdown request.
Process Scheduling is implemented in different modules. All scheduler implementations should follow some basic rules:
- fair scheduling
- sending a message does not block
- receiving a message does block
- spawning a child blocks only a very moment
- a newly spawned process shall be scheduled before the parent process after
- the spawn
- when the first process exists, all process should be killed immediately
YieldProcess :: Process r (ResumeProcess ()) | In cooperative schedulers, this will give processing time to the scheduler. Every other operation implicitly serves the same purpose. |
SelfPid :: Process r (ResumeProcess ProcessId) | Return the current |
Spawn :: Eff (Process r ': r) () -> Process r (ResumeProcess ProcessId) | Start a new process, the new process will execute an effect, the function
will return immediately with a |
Shutdown :: Process r a | Process exit, this is the same as if the function that was applied to a spawn function returned. |
ExitWithError :: String -> Process r b | Exit the process due to an error, this cannot be caught. |
RaiseError :: String -> Process r b | Raise an error, that can be handled. |
SendShutdown :: ProcessId -> Process r (ResumeProcess Bool) | Request that another a process exits. The targeted process is interrupted
and gets a |
SendMessage :: ProcessId -> Dynamic -> Process r (ResumeProcess Bool) | Send a message to a process addressed by the |
ReceiveMessage :: Process r (ResumeProcess Dynamic) | Receive a message. This should block until an a message was received. The
pure function may convert the incoming message into something, and the
result is returned as |
executeAndResume :: forall r q v. (SetMember Process (Process q) r, HasCallStack) => Process q (ResumeProcess v) -> Eff r v Source #
Execute a Process
action and resume the process, retry the action or exit
the process when a shutdown was requested.
executeAndCatch :: forall q r v. (SetMember Process (Process q) r, HasCallStack) => SchedulerProxy q -> Eff r (ResumeProcess v) -> Eff r (Either String v) Source #
Execute a and action and resume the process, retry the action, shutdown the process or return an error.
thisSchedulerProxy :: Eff (Process r ': r) (SchedulerProxy r) Source #
Return a SchedulerProxy
for a Process
effect.
yieldProcess :: forall r q. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Eff r () Source #
Use executeAndResume
to execute YieldProcess
. Refer to YieldProcess
for more information.
sendMessage :: forall r q. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> ProcessId -> Dynamic -> Eff r () Source #
Send a message to a process addressed by the ProcessId
.
See SendMessage
.
sendMessageChecked :: forall r q. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> ProcessId -> Dynamic -> Eff r Bool Source #
Send a message to a process addressed by the ProcessId
.
See SendMessage
. Return True
if the process existed.
I you don't care, just sendMessage
instead.
sendMessageAs :: forall o r q. (HasCallStack, SetMember Process (Process q) r, Typeable o) => SchedulerProxy q -> ProcessId -> o -> Eff r () Source #
Send a message to a process addressed by the ProcessId
.
See SendMessage
.
sendShutdown :: forall r q. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> ProcessId -> Eff r () Source #
Exit a process addressed by the ProcessId
.
See SendShutdown
.
sendShutdownChecked :: forall r q. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> ProcessId -> Eff r Bool Source #
Like sendShutdown
, but also return True
iff the process to exit exists.
spawn :: forall r q. (HasCallStack, SetMember Process (Process q) r) => Eff (Process q ': q) () -> Eff r ProcessId Source #
Start a new process, the new process will execute an effect, the function
will return immediately with a ProcessId
.
spawn_ :: forall r q. (HasCallStack, SetMember Process (Process q) r) => Eff (Process q ': q) () -> Eff r () Source #
Like spawn
but return ()
.
receiveMessage :: forall r q. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Eff r Dynamic Source #
Block until a message was received.
receiveMessageAs :: forall a r q. (HasCallStack, Typeable a, SetMember Process (Process q) r) => SchedulerProxy q -> Eff r a Source #
Receive and cast the message to some Typeable
instance.
receiveLoop :: forall r q. (SetMember Process (Process q) r, HasCallStack) => SchedulerProxy q -> (Either (Maybe String) Dynamic -> Eff r ()) -> Eff r () Source #
Enter a loop to receive messages and pass them to a callback, until the
function returns False
.
self :: (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Eff r ProcessId Source #
Returns the ProcessId
of the current process.
exitNormally :: forall r q a. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Eff r a Source #
Exit the process.
exitWithError :: forall r q a. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> String -> Eff r a Source #
Exit the process with an error.
raiseError :: forall r q b. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> String -> Eff r b Source #
Thrown an error, can be caught by catchRaisedError
.
catchRaisedError :: forall r q w. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> (String -> Eff r w) -> Eff r w -> Eff r w Source #
Catch and handle an error raised by raiseError
. Works independent of the
handler implementation.
ignoreProcessError :: (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Eff r a -> Eff r (Either String a) Source #
Like catchRaisedError
it catches raiseError
, but instead of invoking a
user provided handler, the result is wrapped into an Either
.
Data Types and Functions for APIs (aka Protocols)
Instances
Eq (Server api) Source # | |
Ord (Server api) Source # | |
Defined in Control.Eff.Concurrent.Api | |
Read (Server api) Source # | |
Typeable api => Show (Server api) Source # | |
data Synchronicity Source #
The (promoted) constructors of this type specify (at the type level) the
reply behavior of a specific constructor of an Api
instance.
Synchronous Type | Specify that handling a request is a blocking operation
with a specific return type, e.g. |
Asynchronous | Non-blocking, asynchronous, request handling |
data family Api (api :: Type) (reply :: Synchronicity) Source #
This data family defines an API, a communication interface description between at least two processes. The processes act as servers or client(s) regarding a specific instance of this type.
The first parameter is usually a user defined phantom type that identifies
the Api
instance.
The second parameter specifies if a specific constructor of an (GADT-like)
Api
instance is Synchronous
, i.e. returns a result and blocks the caller
or if it is Asynchronous
Example:
data BookShop deriving Typeable data instance Api BookShop r where RentBook :: BookId -> Api BookShop ('Synchronous (Either RentalError RentalId)) BringBack :: RentalId -> Api BookShop 'Asynchronous type BookId = Int type RentalId = Int type RentalError = String
Instances
Show (Observation o) => Show (Api (CallbackObserver o) r) # | |
Defined in Control.Eff.Concurrent.Api.Observer | |
data Api (CallbackObserver o) r Source # | |
Defined in Control.Eff.Concurrent.Api.Observer |
proxyAsServer :: proxy api -> ProcessId -> Server api Source #
Client Functions for Consuming APIs
type ServesApi o r q = (Typeable o, SetMember Process (Process q) r, Member (Reader (Server o)) r) Source #
castChecked :: forall r q o. (HasCallStack, SetMember Process (Process q) r, Typeable o, Typeable (Api o Asynchronous)) => SchedulerProxy q -> Server o -> Api o Asynchronous -> Eff r Bool Source #
Send an Api
request that has no return value and return as fast as
possible. The type signature enforces that the corresponding Api
clause is
Asynchronous
. Return True
if the message was sent to the process. Note
that this is totally not the same as that the request was successfully
handled. If that is important, use call
instead.
cast :: forall r q o. (HasCallStack, SetMember Process (Process q) r, Typeable o, Typeable (Api o Asynchronous)) => SchedulerProxy q -> Server o -> Api o Asynchronous -> Eff r () Source #
Send an Api
request that has no return value and return as fast as
possible. The type signature enforces that the corresponding Api
clause is
Asynchronous
.
call :: forall result api r q. (SetMember Process (Process q) r, Typeable api, Typeable (Api api (Synchronous result)), Typeable result, HasCallStack) => SchedulerProxy q -> Server api -> Api api (Synchronous result) -> Eff r result Source #
Send an Api
request and wait for the server to return a result value.
The type signature enforces that the corresponding Api
clause is
Synchronous
.
registerServer :: Server o -> Eff (Reader (Server o) ': r) a -> Eff r a Source #
Run a reader effect that contains the one server handling a specific
Api
instance.
callRegistered :: (Typeable reply, ServesApi o r q) => SchedulerProxy q -> Api o (Synchronous reply) -> Eff r reply Source #
Like call
but take the Server
from the reader provided by
registerServer
.
callRegisteredA :: forall r q o f reply. (Alternative f, Typeable f, Typeable reply, ServesApi o r q) => SchedulerProxy q -> Api o (Synchronous (f reply)) -> Eff r (f reply) Source #
Like callRegistered
but also catch errors raised if e.g. the server
crashed. By allowing Alternative
instances to contain the reply,
application level errors can be combined with errors rising from inter
process communication.
castRegistered :: (Typeable o, ServesApi o r q) => SchedulerProxy q -> Api o Asynchronous -> Eff r () Source #
Like cast
but take the Server
from the reader provided by
registerServer
.
Server Functions for Providing APIs
data UnhandledRequest Source #
An exception that is used by the mechanism that chains together multiple
different ApiHandler
allowing a single process to implement multiple
Api
s. This exception is thrown by requestFromDynamic
. This
exception can be caught with catchUnhandled
, this way, several
distinct ApiHandler
can be tried until one fits or until the
exitUnhandled
is invoked.
data ApiHandler p r where Source #
A record of callbacks, handling requests sent to a server Process
, all
belonging to a specific Api
family instance.
ApiHandler :: {..} -> ApiHandler p r | |
|
serve :: forall r q p. (Typeable p, SetMember Process (Process q) r, HasCallStack) => SchedulerProxy q -> ApiHandler p r -> Eff r () Source #
Receive and process incoming requests until the process exits, using an ApiHandler
.
unhandledCallError :: forall p x r q. (Show (Api p (Synchronous x)), Typeable p, HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Api p (Synchronous x) -> (x -> Eff r ()) -> Eff r () Source #
A default handler to use in _handleCall
in ApiHandler
. It will call
raiseError
with a nice error message.
unhandledCastError :: forall p r q. (Show (Api p Asynchronous), Typeable p, HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Api p Asynchronous -> Eff r () Source #
A default handler to use in _handleCast
in ApiHandler
. It will call
raiseError
with a nice error message.
defaultTermination :: forall q r. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Maybe String -> Eff r () Source #
Exit the process either normally of the error message is Nothing
or with exitWithError
otherwise.
serveBoth :: forall r q p1 p2. (Typeable p1, Typeable p2, SetMember Process (Process q) r, HasCallStack) => SchedulerProxy q -> ApiHandler p1 r -> ApiHandler p2 r -> Eff r () Source #
serve
two ApiHandler
s at once. The first handler is used for
termination handling.
serve3 :: forall r q p1 p2 p3. (Typeable p1, Typeable p2, Typeable p3, SetMember Process (Process q) r, HasCallStack) => SchedulerProxy q -> ApiHandler p1 r -> ApiHandler p2 r -> ApiHandler p3 r -> Eff r () Source #
serve
three ApiHandler
s at once. The first handler is used for
termination handling.
tryApiHandler :: forall r q p. (Typeable p, SetMember Process (Process q) r, HasCallStack) => SchedulerProxy q -> ApiHandler p r -> Dynamic -> Eff (Exc UnhandledRequest ': r) () Source #
The basic building block of the combination of ApiHandler
s is this
function, which can not only be passed to receiveLoop
, but also throws an
UnhandledRequest
exception if requestFromDynamic
failed, such that multiple
invokation of this function for different ApiHandler
s can be tried, by using catchUnhandled
.
tryApiHandler px handlers1 message `catchUnhandled` tryApiHandler px handlers2 `catchUnhandled` tryApiHandler px handlers3
catchUnhandled :: forall r a. (Member (Exc UnhandledRequest) r, HasCallStack) => Eff r a -> (Dynamic -> Eff r a) -> Eff r a Source #
If requestFromDynamic
failes to cast the message to a Request
for a
certain ApiHandler
it throws an UnhandledRequest
exception. That
exception is caught by this function and the raw message is given to the
handler function. This is the basis for chaining ApiHandler
s.
ensureAllHandled :: forall r q. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Eff (Exc UnhandledRequest ': r) () -> Eff r () Source #
Catch UnhandledRequest
s and terminate the process with an error, if necessary.
requestFromDynamic :: forall r a. (HasCallStack, Typeable a, Member (Exc UnhandledRequest) r) => Dynamic -> Eff r a Source #
Cast a Dynamic
value, and if that fails, throw an UnhandledRequest
error.
exitUnhandled :: forall r q. (SetMember Process (Process q) r, HasCallStack) => SchedulerProxy q -> Dynamic -> Eff r () Source #
If an incoming message could not be casted to a Request
corresponding to
an ApiHandler
(e.g. with requestFromDynamic
) one should use this function to
exit the process with a corresponding error.
Observer Functions for Events and Event Listener
data CallbackObserver o Source #
An Observer
that schedules the observations to an effectful callback.
Instances
Show (Observation o) => Show (Api (CallbackObserver o) r) Source # | |
Defined in Control.Eff.Concurrent.Api.Observer | |
Observable o => Observer (CallbackObserver o) o Source # | |
Defined in Control.Eff.Concurrent.Api.Observer observationMessage :: Server o -> Observation o -> Api (CallbackObserver o) Asynchronous Source # | |
data Api (CallbackObserver o) r Source # | |
Defined in Control.Eff.Concurrent.Api.Observer |
data SomeObserver o where Source #
An existential wrapper around a Server
of an Observer
.
Needed to support different types of observers to observe the
same Observable
in a general fashion.
SomeObserver :: (Show (Server p), Typeable p, Observer p o) => Server p -> SomeObserver o |
Instances
Eq (SomeObserver o) Source # | |
Defined in Control.Eff.Concurrent.Api.Observer (==) :: SomeObserver o -> SomeObserver o -> Bool # (/=) :: SomeObserver o -> SomeObserver o -> Bool # | |
Ord (SomeObserver o) Source # | |
Defined in Control.Eff.Concurrent.Api.Observer compare :: SomeObserver o -> SomeObserver o -> Ordering # (<) :: SomeObserver o -> SomeObserver o -> Bool # (<=) :: SomeObserver o -> SomeObserver o -> Bool # (>) :: SomeObserver o -> SomeObserver o -> Bool # (>=) :: SomeObserver o -> SomeObserver o -> Bool # max :: SomeObserver o -> SomeObserver o -> SomeObserver o # min :: SomeObserver o -> SomeObserver o -> SomeObserver o # | |
Show (SomeObserver o) Source # | |
Defined in Control.Eff.Concurrent.Api.Observer showsPrec :: Int -> SomeObserver o -> ShowS # show :: SomeObserver o -> String # showList :: [SomeObserver o] -> ShowS # |
class (Typeable o, Typeable (Observation o)) => Observable o where Source #
data Observation o Source #
Type of observations visible on this observable
registerObserverMessage :: SomeObserver o -> Api o Asynchronous Source #
Return the Api
value for the cast_
that registeres an observer
forgetObserverMessage :: SomeObserver o -> Api o Asynchronous Source #
Return the Api
value for the cast_
that de-registeres an observer
class (Typeable p, Observable o) => Observer p o where Source #
An Api
index that support observation of the
another Api
that is Observable
.
observationMessage :: Server o -> Observation o -> Api p Asynchronous Source #
Wrap the Observation
and the ProcessId
(i.e. the Server
)
that caused the observation into a Api
value that the
Observable
understands.
Instances
Observable o => Observer (CallbackObserver o) o Source # | |
Defined in Control.Eff.Concurrent.Api.Observer observationMessage :: Server o -> Observation o -> Api (CallbackObserver o) Asynchronous Source # |
notifyObserver :: (SetMember Process (Process q) r, Observable o, Observer p o, HasCallStack) => SchedulerProxy q -> Server p -> Server o -> Observation o -> Eff r () Source #
Send an Observation
to an Observer
registerObserver :: (SetMember Process (Process q) r, Observable o, Observer p o, HasCallStack) => SchedulerProxy q -> Server p -> Server o -> Eff r () Source #
Send the registerObserverMessage
forgetObserver :: (SetMember Process (Process q) r, Observable o, Observer p o) => SchedulerProxy q -> Server p -> Server o -> Eff r () Source #
Send the forgetObserverMessage
notifySomeObserver :: (SetMember Process (Process q) r, Observable o, HasCallStack) => SchedulerProxy q -> Server o -> Observation o -> SomeObserver o -> Eff r () Source #
Send an Observation
to SomeObserver
.
manageObservers :: Eff (State (Observers o) ': r) a -> Eff r a Source #
Keep track of registered Observer
s Observers can be added and removed,
and an Observation
can be sent to all registerd observers at once.
addObserver :: (SetMember Process (Process q) r, Member (State (Observers o)) r, Observable o) => SomeObserver o -> Eff r () Source #
Add an Observer
to the Observers
managed by manageObservers
.
removeObserver :: (SetMember Process (Process q) r, Member (State (Observers o)) r, Observable o) => SomeObserver o -> Eff r () Source #
Delete an Observer
from the Observers
managed by manageObservers
.
notifyObservers :: forall o r q. (Observable o, SetMember Process (Process q) r, Member (State (Observers o)) r) => SchedulerProxy q -> Observation o -> Eff r () Source #
Send an Observation
to all SomeObserver
s in the Observers
state.
spawnCallbackObserver :: forall o r q. (SetMember Process (Process q) r, Typeable o, Show (Observation o), Observable o, Member (Logs LogMessage) q) => SchedulerProxy q -> (Server o -> Observation o -> Eff (Process q ': q) Bool) -> Eff r (Server (CallbackObserver o)) Source #
Start a new process for an Observer
that schedules
all observations to an effectful callback.
spawnLoggingObserver :: forall o r q. (SetMember Process (Process q) r, Typeable o, Show (Observation o), Observable o, Member (Logs LogMessage) q) => SchedulerProxy q -> Eff r (Server (CallbackObserver o)) Source #
Use spawnCallbackObserver
to create a universal logging observer,
using the Show
instance of the Observation
.
| Start a new process for an Observer
that schedules
all observations to an effectful callback.
Since: extensible-effects-concurrent-0.3.0.0
Scheduler Process Effect Handler
Concurrent Scheduler
type SchedulerIO = '[Reader SchedulerVar, Logs LogMessage, Lift IO] Source #
The concrete list of Eff
ects for this scheduler implementation.
See HasSchedulerIO
type HasSchedulerIO r = (HasCallStack, SetMember Lift (Lift IO) r, Member (Logs LogMessage) r, Member (Reader SchedulerVar) r) Source #
data SchedulerError Source #
A sum-type with errors that can occur when scheduleing messages.
ProcessNotFound ProcessId | No process info was found for a |
ProcessRaisedError String | A process called |
ProcessExitError String | A process called |
ProcessShuttingDown | A process exits. |
SchedulerShuttingDown | An action was not performed while the scheduler was exiting. |
Instances
Show SchedulerError Source # | |
Defined in Control.Eff.Concurrent.Process.ForkIOScheduler showsPrec :: Int -> SchedulerError -> ShowS # show :: SchedulerError -> String # showList :: [SchedulerError] -> ShowS # | |
Exception SchedulerError Source # | |
schedule :: Eff (ConsProcess SchedulerIO) () -> LogChannel LogMessage -> IO () Source #
This is the main entry point to running a message passing concurrency
application. This function takes a Process
on top of the SchedulerIO
effect and a LogChannel
for concurrent logging.
defaultMain :: Eff (ConsProcess SchedulerIO) () -> IO () Source #
Start the message passing concurrency system then execute a Process
on
top of SchedulerIO
effect. All logging is sent to standard output.
defaultMainWithLogChannel :: LogChannel LogMessage -> Eff (ConsProcess SchedulerIO) () -> IO () Source #
Start the message passing concurrency system then execute a Process
on
top of SchedulerIO
effect. All logging is sent to standard output.
Single Threaded Scheduler
schedulePure :: Eff (ConsProcess '[]) a -> Either String a Source #
Like schedule
but pure. The yield
effect is just return ()
.
schedulePure == runIdentity .
scheduleM
(Identity . run) (return ())
Since: extensible-effects-concurrent-0.3.0.2
Utilities
Logging Effect
module Control.Eff.Log
Preventing Space Leaks
module Control.Eff.Loop