extensible-effects-concurrent-0.6.2: Message passing concurrency as extensible-effect

Safe HaskellNone
LanguageHaskell2010

Control.Eff.Concurrent

Contents

Description

Erlang style processes with message passing concurrency based on (more) extensible-effects.

Synopsis

Concurrent Processes with Message Passing Concurrency

newtype ProcessId Source #

Each process is identified by a single process id, that stays constant throughout the life cycle of a process. Also, message sending relies on these values to address messages to processes.

Constructors

ProcessId 

Fields

Instances
Bounded ProcessId Source # 
Instance details

Defined in Control.Eff.Concurrent.Process

Enum ProcessId Source # 
Instance details

Defined in Control.Eff.Concurrent.Process

Eq ProcessId Source # 
Instance details

Defined in Control.Eff.Concurrent.Process

Integral ProcessId Source # 
Instance details

Defined in Control.Eff.Concurrent.Process

Num ProcessId Source # 
Instance details

Defined in Control.Eff.Concurrent.Process

Ord ProcessId Source # 
Instance details

Defined in Control.Eff.Concurrent.Process

Read ProcessId Source # 
Instance details

Defined in Control.Eff.Concurrent.Process

Real ProcessId Source # 
Instance details

Defined in Control.Eff.Concurrent.Process

Show ProcessId Source # 
Instance details

Defined in Control.Eff.Concurrent.Process

data SchedulerProxy :: [Type -> Type] -> Type where Source #

Every function for Process things needs such a proxy value for the low-level effect list, i.e. the effects identified by r in Process r : r, this might be dependent on the scheduler implementation.

Constructors

SchedulerProxy :: SchedulerProxy q

Tell the typechecker what effects we have below Process

SP :: SchedulerProxy q

Like SchedulerProxy but shorter

type ConsProcess r = Process r ': r Source #

Cons Process onto a list of effects.

data ResumeProcess v where Source #

Every Process action returns it's actual result wrapped in this type. It will allow to signal errors as well as pass on normal results such as incoming messages.

Constructors

ShutdownRequested :: ResumeProcess v

The process is required to exit.

OnError :: String -> ResumeProcess v

The process is required to exit from an error condition, that cannot be recovered from.

ResumeWith :: a -> ResumeProcess a

The process may resume to do work, using the given result.

RetryLastAction :: ResumeProcess v

This indicates that the action did not complete, and maybe retried

Instances
Functor ResumeProcess Source # 
Instance details

Defined in Control.Eff.Concurrent.Process

Methods

fmap :: (a -> b) -> ResumeProcess a -> ResumeProcess b #

(<$) :: a -> ResumeProcess b -> ResumeProcess a #

Foldable ResumeProcess Source # 
Instance details

Defined in Control.Eff.Concurrent.Process

Methods

fold :: Monoid m => ResumeProcess m -> m #

foldMap :: Monoid m => (a -> m) -> ResumeProcess a -> m #

foldr :: (a -> b -> b) -> b -> ResumeProcess a -> b #

foldr' :: (a -> b -> b) -> b -> ResumeProcess a -> b #

foldl :: (b -> a -> b) -> b -> ResumeProcess a -> b #

foldl' :: (b -> a -> b) -> b -> ResumeProcess a -> b #

foldr1 :: (a -> a -> a) -> ResumeProcess a -> a #

foldl1 :: (a -> a -> a) -> ResumeProcess a -> a #

toList :: ResumeProcess a -> [a] #

null :: ResumeProcess a -> Bool #

length :: ResumeProcess a -> Int #

elem :: Eq a => a -> ResumeProcess a -> Bool #

maximum :: Ord a => ResumeProcess a -> a #

minimum :: Ord a => ResumeProcess a -> a #

sum :: Num a => ResumeProcess a -> a #

product :: Num a => ResumeProcess a -> a #

Traversable ResumeProcess Source # 
Instance details

Defined in Control.Eff.Concurrent.Process

Methods

traverse :: Applicative f => (a -> f b) -> ResumeProcess a -> f (ResumeProcess b) #

sequenceA :: Applicative f => ResumeProcess (f a) -> f (ResumeProcess a) #

mapM :: Monad m => (a -> m b) -> ResumeProcess a -> m (ResumeProcess b) #

sequence :: Monad m => ResumeProcess (m a) -> m (ResumeProcess a) #

NFData1 ResumeProcess Source # 
Instance details

Defined in Control.Eff.Concurrent.Process

Methods

liftRnf :: (a -> ()) -> ResumeProcess a -> () #

Eq v => Eq (ResumeProcess v) Source # 
Instance details

Defined in Control.Eff.Concurrent.Process

Ord v => Ord (ResumeProcess v) Source # 
Instance details

Defined in Control.Eff.Concurrent.Process

Show v => Show (ResumeProcess v) Source # 
Instance details

Defined in Control.Eff.Concurrent.Process

Generic (ResumeProcess v) Source # 
Instance details

Defined in Control.Eff.Concurrent.Process

Associated Types

type Rep (ResumeProcess v) :: * -> * #

NFData a => NFData (ResumeProcess a) Source # 
Instance details

Defined in Control.Eff.Concurrent.Process

Methods

rnf :: ResumeProcess a -> () #

Generic1 ResumeProcess Source # 
Instance details

Defined in Control.Eff.Concurrent.Process

Associated Types

type Rep1 ResumeProcess :: k -> * #

type Rep (ResumeProcess v) Source # 
Instance details

Defined in Control.Eff.Concurrent.Process

type Rep (ResumeProcess v) = D1 (MetaData "ResumeProcess" "Control.Eff.Concurrent.Process" "extensible-effects-concurrent-0.6.2-Dn0edfhM7NrCsGRg2jlKcC" False) ((C1 (MetaCons "ShutdownRequested" PrefixI False) (U1 :: * -> *) :+: C1 (MetaCons "OnError" PrefixI False) (S1 (MetaSel (Nothing :: Maybe Symbol) NoSourceUnpackedness NoSourceStrictness DecidedLazy) (Rec0 String))) :+: (C1 (MetaCons "ResumeWith" PrefixI False) (S1 (MetaSel (Nothing :: Maybe Symbol) NoSourceUnpackedness NoSourceStrictness DecidedLazy) (Rec0 v)) :+: C1 (MetaCons "RetryLastAction" PrefixI False) (U1 :: * -> *)))
type Rep1 ResumeProcess Source # 
Instance details

Defined in Control.Eff.Concurrent.Process

type Rep1 ResumeProcess = D1 (MetaData "ResumeProcess" "Control.Eff.Concurrent.Process" "extensible-effects-concurrent-0.6.2-Dn0edfhM7NrCsGRg2jlKcC" False) ((C1 (MetaCons "ShutdownRequested" PrefixI False) (U1 :: * -> *) :+: C1 (MetaCons "OnError" PrefixI False) (S1 (MetaSel (Nothing :: Maybe Symbol) NoSourceUnpackedness NoSourceStrictness DecidedLazy) (Rec0 String))) :+: (C1 (MetaCons "ResumeWith" PrefixI False) (S1 (MetaSel (Nothing :: Maybe Symbol) NoSourceUnpackedness NoSourceStrictness DecidedLazy) Par1) :+: C1 (MetaCons "RetryLastAction" PrefixI False) (U1 :: * -> *)))

data Process (r :: [Type -> Type]) b where Source #

The process effect is the basis for message passing concurrency. This effect describes an interface for concurrent, communicating isolated processes identified uniquely by a process-id.

Processes can raise exceptions that can be caught, exit gracefully or with an error, or be killed by other processes, with the option of ignoring the shutdown request.

Process Scheduling is implemented in different modules. All scheduler implementations should follow some basic rules:

  • fair scheduling
  • sending a message does not block
  • receiving a message does block
  • spawning a child blocks only a very moment
  • a newly spawned process shall be scheduled before the parent process after
  • the spawn
  • when the first process exists, all process should be killed immediately

Constructors

YieldProcess :: Process r (ResumeProcess ())

In cooperative schedulers, this will give processing time to the scheduler. Every other operation implicitly serves the same purpose.

SelfPid :: Process r (ResumeProcess ProcessId)

Return the current ProcessId

Spawn :: Eff (Process r ': r) () -> Process r (ResumeProcess ProcessId)

Start a new process, the new process will execute an effect, the function will return immediately with a ProcessId.

Shutdown :: Process r a

Process exit, this is the same as if the function that was applied to a spawn function returned.

ExitWithError :: String -> Process r b

Exit the process due to an error, this cannot be caught.

RaiseError :: String -> Process r b

Raise an error, that can be handled.

SendShutdown :: ProcessId -> Process r (ResumeProcess Bool)

Request that another a process exits. The targeted process is interrupted and gets a ShutdownRequested, the target process may decide to ignore the shutdown requests.

SendMessage :: ProcessId -> Dynamic -> Process r (ResumeProcess Bool)

Send a message to a process addressed by the ProcessId. Sending a message should **always succeed** and return **immediately**, even if the destination process does not exist, or does not accept messages of the given type.

ReceiveMessage :: Process r (ResumeProcess Dynamic)

Receive a message. This should block until an a message was received. The pure function may convert the incoming message into something, and the result is returned as ProcessMessage value. Another reason why this function returns, is if a process control message was sent to the process. This can only occur from inside the runtime system, aka the effect handler implementation. (Currently there is one in ForkIOScheduler.)

executeAndResume :: forall r q v. (SetMember Process (Process q) r, HasCallStack) => Process q (ResumeProcess v) -> Eff r v Source #

Execute a Process action and resume the process, retry the action or exit the process when a shutdown was requested.

executeAndCatch :: forall q r v. (SetMember Process (Process q) r, HasCallStack) => SchedulerProxy q -> Eff r (ResumeProcess v) -> Eff r (Either String v) Source #

Execute a and action and resume the process, retry the action, shutdown the process or return an error.

yieldProcess :: forall r q. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Eff r () Source #

Use executeAndResume to execute YieldProcess. Refer to YieldProcess for more information.

sendMessage :: forall r q. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> ProcessId -> Dynamic -> Eff r () Source #

Send a message to a process addressed by the ProcessId. See SendMessage.

sendMessageChecked :: forall r q. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> ProcessId -> Dynamic -> Eff r Bool Source #

Send a message to a process addressed by the ProcessId. See SendMessage. Return True if the process existed. I you don't care, just sendMessage instead.

sendMessageAs :: forall o r q. (HasCallStack, SetMember Process (Process q) r, Typeable o) => SchedulerProxy q -> ProcessId -> o -> Eff r () Source #

Send a message to a process addressed by the ProcessId. See SendMessage.

sendShutdown :: forall r q. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> ProcessId -> Eff r () Source #

Exit a process addressed by the ProcessId. See SendShutdown.

sendShutdownChecked :: forall r q. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> ProcessId -> Eff r Bool Source #

Like sendShutdown, but also return True iff the process to exit exists.

spawn :: forall r q. (HasCallStack, SetMember Process (Process q) r) => Eff (Process q ': q) () -> Eff r ProcessId Source #

Start a new process, the new process will execute an effect, the function will return immediately with a ProcessId.

spawn_ :: forall r q. (HasCallStack, SetMember Process (Process q) r) => Eff (Process q ': q) () -> Eff r () Source #

Like spawn but return ().

receiveMessage :: forall r q. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Eff r Dynamic Source #

Block until a message was received.

receiveMessageAs :: forall a r q. (HasCallStack, Typeable a, SetMember Process (Process q) r) => SchedulerProxy q -> Eff r a Source #

Receive and cast the message to some Typeable instance.

receiveLoop :: forall r q. (SetMember Process (Process q) r, HasCallStack) => SchedulerProxy q -> (Either (Maybe String) Dynamic -> Eff r ()) -> Eff r () Source #

Enter a loop to receive messages and pass them to a callback, until the function returns False.

self :: (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Eff r ProcessId Source #

Returns the ProcessId of the current process.

exitNormally :: forall r q a. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Eff r a Source #

Exit the process.

exitWithError :: forall r q a. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> String -> Eff r a Source #

Exit the process with an error.

raiseError :: forall r q b. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> String -> Eff r b Source #

Thrown an error, can be caught by catchRaisedError.

catchRaisedError :: forall r q w. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> (String -> Eff r w) -> Eff r w -> Eff r w Source #

Catch and handle an error raised by raiseError. Works independent of the handler implementation.

ignoreProcessError :: (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Eff r a -> Eff r (Either String a) Source #

Like catchRaisedError it catches raiseError, but instead of invoking a user provided handler, the result is wrapped into an Either.

Data Types and Functions for APIs (aka Protocols)

newtype Server api Source #

This is a tag-type that wraps around a ProcessId and holds an Api index type.

Constructors

Server 
Instances
Eq (Server api) Source # 
Instance details

Defined in Control.Eff.Concurrent.Api

Methods

(==) :: Server api -> Server api -> Bool #

(/=) :: Server api -> Server api -> Bool #

Ord (Server api) Source # 
Instance details

Defined in Control.Eff.Concurrent.Api

Methods

compare :: Server api -> Server api -> Ordering #

(<) :: Server api -> Server api -> Bool #

(<=) :: Server api -> Server api -> Bool #

(>) :: Server api -> Server api -> Bool #

(>=) :: Server api -> Server api -> Bool #

max :: Server api -> Server api -> Server api #

min :: Server api -> Server api -> Server api #

Read (Server api) Source # 
Instance details

Defined in Control.Eff.Concurrent.Api

Typeable api => Show (Server api) Source # 
Instance details

Defined in Control.Eff.Concurrent.Api

Methods

showsPrec :: Int -> Server api -> ShowS #

show :: Server api -> String #

showList :: [Server api] -> ShowS #

data Synchronicity Source #

The (promoted) constructors of this type specify (at the type level) the reply behavior of a specific constructor of an Api instance.

Constructors

Synchronous Type

Specify that handling a request is a blocking operation with a specific return type, e.g. ('Synchronous (Either RentalError RentalId))

Asynchronous

Non-blocking, asynchronous, request handling

data family Api (api :: Type) (reply :: Synchronicity) Source #

This data family defines an API, a communication interface description between at least two processes. The processes act as servers or client(s) regarding a specific instance of this type.

The first parameter is usually a user defined phantom type that identifies the Api instance.

The second parameter specifies if a specific constructor of an (GADT-like) Api instance is Synchronous, i.e. returns a result and blocks the caller or if it is Asynchronous

Example:

data BookShop deriving Typeable

data instance Api BookShop r where
  RentBook  :: BookId   -> Api BookShop ('Synchronous (Either RentalError RentalId))
  BringBack :: RentalId -> Api BookShop 'Asynchronous

type BookId = Int
type RentalId = Int
type RentalError = String

fromServer :: forall api api. Iso (Server api) (Server api) ProcessId ProcessId Source #

proxyAsServer :: proxy api -> ProcessId -> Server api Source #

Tag a ProcessId with an Api type index to mark it a Server process handling that API

asServer :: forall api. ProcessId -> Server api Source #

Tag a ProcessId with an Api type index to mark it a Server process handling that API

Client Functions for Consuming APIs

type ServesApi o r q = (Typeable o, SetMember Process (Process q) r, Member (Reader (Server o)) r) Source #

Instead of passing around a Server value and passing to functions like cast or call, a Server can provided by a Reader effect, if there is only a single server for a given Api instance. This type alias is convenience to express that an effect has Process and a reader for a Server.

castChecked :: forall r q o. (HasCallStack, SetMember Process (Process q) r, Typeable o, Typeable (Api o Asynchronous)) => SchedulerProxy q -> Server o -> Api o Asynchronous -> Eff r Bool Source #

Send an Api request that has no return value and return as fast as possible. The type signature enforces that the corresponding Api clause is Asynchronous. Return True if the message was sent to the process. Note that this is totally not the same as that the request was successfully handled. If that is important, use call instead.

cast :: forall r q o. (HasCallStack, SetMember Process (Process q) r, Typeable o, Typeable (Api o Asynchronous)) => SchedulerProxy q -> Server o -> Api o Asynchronous -> Eff r () Source #

Send an Api request that has no return value and return as fast as possible. The type signature enforces that the corresponding Api clause is Asynchronous.

call :: forall result api r q. (SetMember Process (Process q) r, Typeable api, Typeable (Api api (Synchronous result)), Typeable result, HasCallStack) => SchedulerProxy q -> Server api -> Api api (Synchronous result) -> Eff r result Source #

Send an Api request and wait for the server to return a result value.

The type signature enforces that the corresponding Api clause is Synchronous.

registerServer :: Server o -> Eff (Reader (Server o) ': r) a -> Eff r a Source #

Run a reader effect that contains the one server handling a specific Api instance.

callRegistered :: (Typeable reply, ServesApi o r q) => SchedulerProxy q -> Api o (Synchronous reply) -> Eff r reply Source #

Like call but take the Server from the reader provided by registerServer.

callRegisteredA :: forall r q o f reply. (Alternative f, Typeable f, Typeable reply, ServesApi o r q) => SchedulerProxy q -> Api o (Synchronous (f reply)) -> Eff r (f reply) Source #

Like callRegistered but also catch errors raised if e.g. the server crashed. By allowing Alternative instances to contain the reply, application level errors can be combined with errors rising from inter process communication.

castRegistered :: (Typeable o, ServesApi o r q) => SchedulerProxy q -> Api o Asynchronous -> Eff r () Source #

Like cast but take the Server from the reader provided by registerServer.

Server Functions for Providing APIs

data UnhandledRequest Source #

An exception that is used by the mechanism that chains together multiple different ApiHandler allowing a single process to implement multiple Apis. This exception is thrown by requestFromDynamic. This exception can be caught with catchUnhandled, this way, several distinct ApiHandler can be tried until one fits or until the exitUnhandled is invoked.

data ApiHandler p r where Source #

A record of callbacks, handling requests sent to a server Process, all belonging to a specific Api family instance.

Constructors

ApiHandler :: {..} -> ApiHandler p r 

Fields

serve :: forall r q p. (Typeable p, SetMember Process (Process q) r, HasCallStack) => SchedulerProxy q -> ApiHandler p r -> Eff r () Source #

Receive and process incoming requests until the process exits, using an ApiHandler.

unhandledCallError :: forall p x r q. (Show (Api p (Synchronous x)), Typeable p, HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Api p (Synchronous x) -> (x -> Eff r ()) -> Eff r () Source #

A default handler to use in _handleCall in ApiHandler. It will call raiseError with a nice error message.

unhandledCastError :: forall p r q. (Show (Api p Asynchronous), Typeable p, HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Api p Asynchronous -> Eff r () Source #

A default handler to use in _handleCast in ApiHandler. It will call raiseError with a nice error message.

defaultTermination :: forall q r. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Maybe String -> Eff r () Source #

Exit the process either normally of the error message is Nothing or with exitWithError otherwise.

serveBoth :: forall r q p1 p2. (Typeable p1, Typeable p2, SetMember Process (Process q) r, HasCallStack) => SchedulerProxy q -> ApiHandler p1 r -> ApiHandler p2 r -> Eff r () Source #

serve two ApiHandlers at once. The first handler is used for termination handling.

serve3 :: forall r q p1 p2 p3. (Typeable p1, Typeable p2, Typeable p3, SetMember Process (Process q) r, HasCallStack) => SchedulerProxy q -> ApiHandler p1 r -> ApiHandler p2 r -> ApiHandler p3 r -> Eff r () Source #

serve three ApiHandlers at once. The first handler is used for termination handling.

tryApiHandler :: forall r q p. (Typeable p, SetMember Process (Process q) r, HasCallStack) => SchedulerProxy q -> ApiHandler p r -> Dynamic -> Eff (Exc UnhandledRequest ': r) () Source #

The basic building block of the combination of ApiHandlers is this function, which can not only be passed to receiveLoop, but also throws an UnhandledRequest exception if requestFromDynamic failed, such that multiple invokation of this function for different ApiHandlers can be tried, by using catchUnhandled.

tryApiHandler px handlers1 message
  `catchUnhandled`
tryApiHandler px handlers2
  `catchUnhandled`
tryApiHandler px handlers3

catchUnhandled :: forall r a. (Member (Exc UnhandledRequest) r, HasCallStack) => Eff r a -> (Dynamic -> Eff r a) -> Eff r a Source #

If requestFromDynamic failes to cast the message to a Request for a certain ApiHandler it throws an UnhandledRequest exception. That exception is caught by this function and the raw message is given to the handler function. This is the basis for chaining ApiHandlers.

ensureAllHandled :: forall r q. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Eff (Exc UnhandledRequest ': r) () -> Eff r () Source #

Catch UnhandledRequests and terminate the process with an error, if necessary.

requestFromDynamic :: forall r a. (HasCallStack, Typeable a, Member (Exc UnhandledRequest) r) => Dynamic -> Eff r a Source #

Cast a Dynamic value, and if that fails, throw an UnhandledRequest error.

exitUnhandled :: forall r q. (SetMember Process (Process q) r, HasCallStack) => SchedulerProxy q -> Dynamic -> Eff r () Source #

If an incoming message could not be casted to a Request corresponding to an ApiHandler (e.g. with requestFromDynamic) one should use this function to exit the process with a corresponding error.

Observer Functions for Events and Event Listener

data CallbackObserver o Source #

An Observer that schedules the observations to an effectful callback.

data Observers o Source #

Internal state for manageobservers

data SomeObserver o where Source #

An existential wrapper around a Server of an Observer. Needed to support different types of observers to observe the same Observable in a general fashion.

Constructors

SomeObserver :: (Show (Server p), Typeable p, Observer p o) => Server p -> SomeObserver o 

class (Typeable o, Typeable (Observation o)) => Observable o where Source #

An Api index that supports registration and de-registration of Observers.

Associated Types

data Observation o Source #

Type of observations visible on this observable

Methods

registerObserverMessage :: SomeObserver o -> Api o Asynchronous Source #

Return the Api value for the cast_ that registeres an observer

forgetObserverMessage :: SomeObserver o -> Api o Asynchronous Source #

Return the Api value for the cast_ that de-registeres an observer

class (Typeable p, Observable o) => Observer p o where Source #

An Api index that support observation of the another Api that is Observable.

Minimal complete definition

observationMessage

Methods

observationMessage :: Server o -> Observation o -> Api p Asynchronous Source #

Wrap the Observation and the ProcessId (i.e. the Server) that caused the observation into a Api value that the Observable understands.

manageObservers :: Eff (State (Observers o) ': r) a -> Eff r a Source #

Keep track of registered Observers Observers can be added and removed, and an Observation can be sent to all registerd observers at once.

notifyObservers :: forall o r q. (Observable o, SetMember Process (Process q) r, Member (State (Observers o)) r) => SchedulerProxy q -> Observation o -> Eff r () Source #

Send an Observation to all SomeObservers in the Observers state.

spawnCallbackObserver :: forall o r q. (SetMember Process (Process q) r, Typeable o, Show (Observation o), Observable o, Member (Logs LogMessage) q) => SchedulerProxy q -> (Server o -> Observation o -> Eff (Process q ': q) Bool) -> Eff r (Server (CallbackObserver o)) Source #

Start a new process for an Observer that schedules all observations to an effectful callback.

spawnLoggingObserver :: forall o r q. (SetMember Process (Process q) r, Typeable o, Show (Observation o), Observable o, Member (Logs LogMessage) q) => SchedulerProxy q -> Eff r (Server (CallbackObserver o)) Source #

Use spawnCallbackObserver to create a universal logging observer, using the Show instance of the Observation. | Start a new process for an Observer that schedules all observations to an effectful callback.

Since: extensible-effects-concurrent-0.3.0.0

Scheduler Process Effect Handler

Concurrent Scheduler

type SchedulerIO = '[Reader SchedulerVar, Logs LogMessage, Lift IO] Source #

The concrete list of Effects for this scheduler implementation. See HasSchedulerIO

type HasSchedulerIO r = (HasCallStack, SetMember Lift (Lift IO) r, Member (Logs LogMessage) r, Member (Reader SchedulerVar) r) Source #

An alias for the constraints for the effects essential to this scheduler implementation, i.e. these effects allow spawning new Processes. See SchedulerIO

data SchedulerError Source #

A sum-type with errors that can occur when scheduleing messages.

Constructors

ProcessNotFound ProcessId

No process info was found for a ProcessId during internal processing. NOTE: This is **ONLY** caused by internal errors, probably by an incorrect MessagePassing handler in this module. **Sending a message to a process ALWAYS succeeds!** Even if the process does not exist.

ProcessRaisedError String

A process called raiseError.

ProcessExitError String

A process called exitWithError.

ProcessShuttingDown

A process exits.

SchedulerShuttingDown

An action was not performed while the scheduler was exiting.

schedule :: Eff (ConsProcess SchedulerIO) () -> LogChannel LogMessage -> IO () Source #

This is the main entry point to running a message passing concurrency application. This function takes a Process on top of the SchedulerIO effect and a LogChannel for concurrent logging.

defaultMain :: Eff (ConsProcess SchedulerIO) () -> IO () Source #

Start the message passing concurrency system then execute a Process on top of SchedulerIO effect. All logging is sent to standard output.

defaultMainWithLogChannel :: LogChannel LogMessage -> Eff (ConsProcess SchedulerIO) () -> IO () Source #

Start the message passing concurrency system then execute a Process on top of SchedulerIO effect. All logging is sent to standard output.

Single Threaded Scheduler

schedulePure :: Eff (ConsProcess '[]) a -> Either String a Source #

Like schedule but pure. The yield effect is just return (). schedulePure == runIdentity . scheduleM (Identity . run) (return ())

Since: extensible-effects-concurrent-0.3.0.2

Utilities

Logging Effect

Preventing Space Leaks