extensible-effects-concurrent-0.3.0.2: Message passing concurrency as extensible-effect

Safe HaskellNone
LanguageHaskell2010

Control.Eff.Concurrent.Process

Description

The message passing effect.

This module describes an abstract message passing effect, and a process effect, mimicking Erlang's process and message semantics.

Two scheduler implementations for the Process effect are provided:

Synopsis

Documentation

newtype ProcessId Source #

Each process is identified by a single process id, that stays constant throughout the life cycle of a process. Also, message sending relies on these values to address messages to processes.

Constructors

ProcessId 

Fields

Instances

Bounded ProcessId Source # 
Enum ProcessId Source # 
Eq ProcessId Source # 
Integral ProcessId Source # 
Num ProcessId Source # 
Ord ProcessId Source # 
Read ProcessId Source # 
Real ProcessId Source # 
Show ProcessId Source # 

data Process (r :: [Type -> Type]) b where Source #

The process effect is the basis for message passing concurrency. This effect describes an interface for concurrent, communicating isolated processes identified uniquely by a process-id.

Processes can raise exceptions that can be caught, exit gracefully or with an error, or be killed by other processes, with the option of ignoring the shutdown request.

Process Scheduling is implemented in different modules. All scheduler implementations should follow some basic rules:

  • fair scheduling
  • sending a message does not block
  • receiving a message does block
  • spawning a child blocks only a very moment
  • a newly spawned process shall be scheduled before the parent process after
  • the spawn
  • when the first process exists, all process should be killed immediately

Constructors

YieldProcess :: Process r (ResumeProcess ())

In cooperative schedulers, this will give processing time to the scheduler. Every other operation implicitly serves the same purpose.

SelfPid :: Process r (ResumeProcess ProcessId)

Return the current ProcessId

Spawn :: Eff (Process r ': r) () -> Process r (ResumeProcess ProcessId)

Start a new process, the new process will execute an effect, the function will return immediately with a ProcessId.

Shutdown :: Process r a

Process exit, this is the same as if the function that was applied to a spawn function returned.

ExitWithError :: String -> Process r b

Exit the process due to an error, this cannot be caught.

RaiseError :: String -> Process r b

Raise an error, that can be handled.

SendShutdown :: ProcessId -> Process r (ResumeProcess Bool)

Request that another a process exits. The targeted process is interrupted and gets a ShutdownRequested, the target process may decide to ignore the shutdown requests.

SendMessage :: ProcessId -> Dynamic -> Process r (ResumeProcess Bool)

Send a message to a process addressed by the ProcessId. Sending a message should **always succeed** and return **immediately**, even if the destination process does not exist, or does not accept messages of the given type.

ReceiveMessage :: Process r (ResumeProcess Dynamic)

Receive a message. This should block until an a message was received. The pure function may convert the incoming message into something, and the result is returned as ProcessMessage value. Another reason why this function returns, is if a process control message was sent to the process. This can only occur from inside the runtime system, aka the effect handler implementation. (Currently there is one in ForkIOScheduler.)

type ConsProcess r = Process r ': r Source #

Cons Process onto a list of effects.

data ResumeProcess v where Source #

Every Process action returns it's actual result wrapped in this type. It will allow to signal errors as well as pass on normal results such as incoming messages.

Constructors

ShutdownRequested :: ResumeProcess v

The process is required to exit.

OnError :: String -> ResumeProcess v

The process is required to exit from an error condition, that cannot be recovered from.

ResumeWith :: a -> ResumeProcess a

The process may resume to do work, using the given result.

RetryLastAction :: ResumeProcess v

This indicates that the action did not complete, and maybe retried

Instances

Functor ResumeProcess Source # 

Methods

fmap :: (a -> b) -> ResumeProcess a -> ResumeProcess b #

(<$) :: a -> ResumeProcess b -> ResumeProcess a #

Foldable ResumeProcess Source # 

Methods

fold :: Monoid m => ResumeProcess m -> m #

foldMap :: Monoid m => (a -> m) -> ResumeProcess a -> m #

foldr :: (a -> b -> b) -> b -> ResumeProcess a -> b #

foldr' :: (a -> b -> b) -> b -> ResumeProcess a -> b #

foldl :: (b -> a -> b) -> b -> ResumeProcess a -> b #

foldl' :: (b -> a -> b) -> b -> ResumeProcess a -> b #

foldr1 :: (a -> a -> a) -> ResumeProcess a -> a #

foldl1 :: (a -> a -> a) -> ResumeProcess a -> a #

toList :: ResumeProcess a -> [a] #

null :: ResumeProcess a -> Bool #

length :: ResumeProcess a -> Int #

elem :: Eq a => a -> ResumeProcess a -> Bool #

maximum :: Ord a => ResumeProcess a -> a #

minimum :: Ord a => ResumeProcess a -> a #

sum :: Num a => ResumeProcess a -> a #

product :: Num a => ResumeProcess a -> a #

Traversable ResumeProcess Source # 

Methods

traverse :: Applicative f => (a -> f b) -> ResumeProcess a -> f (ResumeProcess b) #

sequenceA :: Applicative f => ResumeProcess (f a) -> f (ResumeProcess a) #

mapM :: Monad m => (a -> m b) -> ResumeProcess a -> m (ResumeProcess b) #

sequence :: Monad m => ResumeProcess (m a) -> m (ResumeProcess a) #

Eq v => Eq (ResumeProcess v) Source # 
Ord v => Ord (ResumeProcess v) Source # 
Show v => Show (ResumeProcess v) Source # 

data SchedulerProxy :: [Type -> Type] -> Type where Source #

Every function for Process things needs such a proxy value for the low-level effect list, i.e. the effects identified by r in Process r : r, this might be dependent on the scheduler implementation.

Constructors

SchedulerProxy :: SchedulerProxy q

Tell the typechecker what effects we have below Process

SP :: SchedulerProxy q

Like SchedulerProxy but shorter

executeAndCatch :: forall q r v. (SetMember Process (Process q) r, HasCallStack) => SchedulerProxy q -> Eff r (ResumeProcess v) -> Eff r (Either String v) Source #

Execute a and action and resume the process, retry the action, shutdown the process or return an error.

yieldProcess :: forall r q. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Eff r () Source #

Use executeAndResume to execute YieldProcess. Refer to YieldProcess for more information.

sendMessage :: forall r q. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> ProcessId -> Dynamic -> Eff r () Source #

Send a message to a process addressed by the ProcessId. See SendMessage.

sendMessageAs :: forall o r q. (HasCallStack, SetMember Process (Process q) r, Typeable o) => SchedulerProxy q -> ProcessId -> o -> Eff r () Source #

Send a message to a process addressed by the ProcessId. See SendMessage.

sendMessageChecked :: forall r q. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> ProcessId -> Dynamic -> Eff r Bool Source #

Send a message to a process addressed by the ProcessId. See SendMessage. Return True if the process existed. I you don't care, just sendMessage instead.

spawn :: forall r q. (HasCallStack, SetMember Process (Process q) r) => Eff (Process q ': q) () -> Eff r ProcessId Source #

Start a new process, the new process will execute an effect, the function will return immediately with a ProcessId.

spawn_ :: forall r q. (HasCallStack, SetMember Process (Process q) r) => Eff (Process q ': q) () -> Eff r () Source #

Like spawn but return ().

receiveMessage :: forall r q. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Eff r Dynamic Source #

Block until a message was received.

receiveMessageAs :: forall a r q. (HasCallStack, Typeable a, SetMember Process (Process q) r) => SchedulerProxy q -> Eff r a Source #

Receive and cast the message to some Typeable instance.

receiveLoop :: forall r q. (SetMember Process (Process q) r, HasCallStack) => SchedulerProxy q -> (Either (Maybe String) Dynamic -> Eff r ()) -> Eff r () Source #

Enter a loop to receive messages and pass them to a callback, until the function returns False.

self :: (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Eff r ProcessId Source #

Returns the ProcessId of the current process.

sendShutdown :: forall r q. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> ProcessId -> Eff r () Source #

Exit a process addressed by the ProcessId. See SendShutdown.

sendShutdownChecked :: forall r q. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> ProcessId -> Eff r Bool Source #

Like sendShutdown, but also return True iff the process to exit exists.

exitWithError :: forall r q a. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> String -> Eff r a Source #

Exit the process with an error.

exitNormally :: forall r q a. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Eff r a Source #

Exit the process.

raiseError :: forall r q b. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> String -> Eff r b Source #

Thrown an error, can be caught by catchRaisedError.

catchRaisedError :: forall r q w. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> (String -> Eff r w) -> Eff r w -> Eff r w Source #

Catch and handle an error raised by raiseError. Works independent of the handler implementation.

ignoreProcessError :: (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Eff r a -> Eff r (Either String a) Source #

Like catchRaisedError it catches raiseError, but instead of invoking a user provided handler, the result is wrapped into an Either.