Safe Haskell | None |
---|---|
Language | Haskell2010 |
The message passing effect.
This module describes an abstract message passing effect, and a process effect, mimicking Erlang's process and message semantics.
Two scheduler implementations for the Process
effect are provided:
- A scheduler using
forkIO
, i.e. relying on the multi threaded GHC runtime: Control.Eff.Concurrent.Process.ForkIOScheduler - And a pure(rer) coroutine based scheduler in: Control.Eff.Concurrent.Process.SingleThreadedScheduler
Synopsis
- newtype ProcessId = ProcessId {}
- fromProcessId :: Iso' ProcessId Int
- data Process (r :: [Type -> Type]) b where
- YieldProcess :: Process r (ResumeProcess ())
- SelfPid :: Process r (ResumeProcess ProcessId)
- Spawn :: Eff (Process r ': r) () -> Process r (ResumeProcess ProcessId)
- Shutdown :: Process r a
- ExitWithError :: String -> Process r b
- RaiseError :: String -> Process r b
- SendShutdown :: ProcessId -> Process r (ResumeProcess Bool)
- SendMessage :: ProcessId -> Dynamic -> Process r (ResumeProcess Bool)
- ReceiveMessage :: Process r (ResumeProcess Dynamic)
- type ConsProcess r = Process r ': r
- data ResumeProcess v where
- ShutdownRequested :: ResumeProcess v
- OnError :: String -> ResumeProcess v
- ResumeWith :: a -> ResumeProcess a
- RetryLastAction :: ResumeProcess v
- data SchedulerProxy :: [Type -> Type] -> Type where
- SchedulerProxy :: SchedulerProxy q
- SP :: SchedulerProxy q
- thisSchedulerProxy :: Eff (Process r ': r) (SchedulerProxy r)
- executeAndCatch :: forall q r v. (SetMember Process (Process q) r, HasCallStack) => SchedulerProxy q -> Eff r (ResumeProcess v) -> Eff r (Either String v)
- executeAndResume :: forall r q v. (SetMember Process (Process q) r, HasCallStack) => Process q (ResumeProcess v) -> Eff r v
- yieldProcess :: forall r q. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Eff r ()
- sendMessage :: forall r q. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> ProcessId -> Dynamic -> Eff r ()
- sendMessageAs :: forall o r q. (HasCallStack, SetMember Process (Process q) r, Typeable o) => SchedulerProxy q -> ProcessId -> o -> Eff r ()
- sendMessageChecked :: forall r q. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> ProcessId -> Dynamic -> Eff r Bool
- spawn :: forall r q. (HasCallStack, SetMember Process (Process q) r) => Eff (Process q ': q) () -> Eff r ProcessId
- spawn_ :: forall r q. (HasCallStack, SetMember Process (Process q) r) => Eff (Process q ': q) () -> Eff r ()
- receiveMessage :: forall r q. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Eff r Dynamic
- receiveMessageAs :: forall a r q. (HasCallStack, Typeable a, SetMember Process (Process q) r) => SchedulerProxy q -> Eff r a
- receiveLoop :: forall r q. (SetMember Process (Process q) r, HasCallStack) => SchedulerProxy q -> (Either (Maybe String) Dynamic -> Eff r ()) -> Eff r ()
- self :: (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Eff r ProcessId
- sendShutdown :: forall r q. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> ProcessId -> Eff r ()
- sendShutdownChecked :: forall r q. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> ProcessId -> Eff r Bool
- exitWithError :: forall r q a. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> String -> Eff r a
- exitNormally :: forall r q a. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Eff r a
- raiseError :: forall r q b. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> String -> Eff r b
- catchRaisedError :: forall r q w. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> (String -> Eff r w) -> Eff r w -> Eff r w
- ignoreProcessError :: (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Eff r a -> Eff r (Either String a)
ProcessId Type
Each process is identified by a single process id, that stays constant throughout the life cycle of a process. Also, message sending relies on these values to address messages to processes.
Instances
Bounded ProcessId Source # | |
Enum ProcessId Source # | |
Defined in Control.Eff.Concurrent.Process succ :: ProcessId -> ProcessId # pred :: ProcessId -> ProcessId # fromEnum :: ProcessId -> Int # enumFrom :: ProcessId -> [ProcessId] # enumFromThen :: ProcessId -> ProcessId -> [ProcessId] # enumFromTo :: ProcessId -> ProcessId -> [ProcessId] # enumFromThenTo :: ProcessId -> ProcessId -> ProcessId -> [ProcessId] # | |
Eq ProcessId Source # | |
Integral ProcessId Source # | |
Defined in Control.Eff.Concurrent.Process | |
Num ProcessId Source # | |
Defined in Control.Eff.Concurrent.Process | |
Ord ProcessId Source # | |
Defined in Control.Eff.Concurrent.Process | |
Read ProcessId Source # | |
Real ProcessId Source # | |
Defined in Control.Eff.Concurrent.Process toRational :: ProcessId -> Rational # | |
Show ProcessId Source # | |
Process Effects
data Process (r :: [Type -> Type]) b where Source #
The process effect is the basis for message passing concurrency. This effect describes an interface for concurrent, communicating isolated processes identified uniquely by a process-id.
Processes can raise exceptions that can be caught, exit gracefully or with an error, or be killed by other processes, with the option of ignoring the shutdown request.
Process Scheduling is implemented in different modules. All scheduler implementations should follow some basic rules:
- fair scheduling
- sending a message does not block
- receiving a message does block
- spawning a child blocks only a very moment
- a newly spawned process shall be scheduled before the parent process after
- the spawn
- when the first process exists, all process should be killed immediately
YieldProcess :: Process r (ResumeProcess ()) | In cooperative schedulers, this will give processing time to the scheduler. Every other operation implicitly serves the same purpose. |
SelfPid :: Process r (ResumeProcess ProcessId) | Return the current |
Spawn :: Eff (Process r ': r) () -> Process r (ResumeProcess ProcessId) | Start a new process, the new process will execute an effect, the function
will return immediately with a |
Shutdown :: Process r a | Process exit, this is the same as if the function that was applied to a spawn function returned. |
ExitWithError :: String -> Process r b | Exit the process due to an error, this cannot be caught. |
RaiseError :: String -> Process r b | Raise an error, that can be handled. |
SendShutdown :: ProcessId -> Process r (ResumeProcess Bool) | Request that another a process exits. The targeted process is interrupted
and gets a |
SendMessage :: ProcessId -> Dynamic -> Process r (ResumeProcess Bool) | Send a message to a process addressed by the |
ReceiveMessage :: Process r (ResumeProcess Dynamic) | Receive a message. This should block until an a message was received. The
pure function may convert the incoming message into something, and the
result is returned as |
type ConsProcess r = Process r ': r Source #
Cons Process
onto a list of effects.
data ResumeProcess v where Source #
Every Process
action returns it's actual result wrapped in this type. It
will allow to signal errors as well as pass on normal results such as
incoming messages.
ShutdownRequested :: ResumeProcess v | The process is required to exit. |
OnError :: String -> ResumeProcess v | The process is required to exit from an error condition, that cannot be recovered from. |
ResumeWith :: a -> ResumeProcess a | The process may resume to do work, using the given result. |
RetryLastAction :: ResumeProcess v | This indicates that the action did not complete, and maybe retried |
Instances
data SchedulerProxy :: [Type -> Type] -> Type where Source #
Every function for Process
things needs such a proxy value
for the low-level effect list, i.e. the effects identified by
r
in
, this might be dependent on the
scheduler implementation.Process
r : r
SchedulerProxy :: SchedulerProxy q | Tell the typechecker what effects we have below |
SP :: SchedulerProxy q | Like |
thisSchedulerProxy :: Eff (Process r ': r) (SchedulerProxy r) Source #
Return a SchedulerProxy
for a Process
effect.
executeAndCatch :: forall q r v. (SetMember Process (Process q) r, HasCallStack) => SchedulerProxy q -> Eff r (ResumeProcess v) -> Eff r (Either String v) Source #
Execute a and action and resume the process, retry the action, shutdown the process or return an error.
executeAndResume :: forall r q v. (SetMember Process (Process q) r, HasCallStack) => Process q (ResumeProcess v) -> Eff r v Source #
Execute a Process
action and resume the process, retry the action or exit
the process when a shutdown was requested.
yieldProcess :: forall r q. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Eff r () Source #
Use executeAndResume
to execute YieldProcess
. Refer to YieldProcess
for more information.
sendMessage :: forall r q. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> ProcessId -> Dynamic -> Eff r () Source #
Send a message to a process addressed by the ProcessId
.
See SendMessage
.
sendMessageAs :: forall o r q. (HasCallStack, SetMember Process (Process q) r, Typeable o) => SchedulerProxy q -> ProcessId -> o -> Eff r () Source #
Send a message to a process addressed by the ProcessId
.
See SendMessage
.
sendMessageChecked :: forall r q. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> ProcessId -> Dynamic -> Eff r Bool Source #
Send a message to a process addressed by the ProcessId
.
See SendMessage
. Return True
if the process existed.
I you don't care, just sendMessage
instead.
spawn :: forall r q. (HasCallStack, SetMember Process (Process q) r) => Eff (Process q ': q) () -> Eff r ProcessId Source #
Start a new process, the new process will execute an effect, the function
will return immediately with a ProcessId
.
spawn_ :: forall r q. (HasCallStack, SetMember Process (Process q) r) => Eff (Process q ': q) () -> Eff r () Source #
Like spawn
but return ()
.
receiveMessage :: forall r q. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Eff r Dynamic Source #
Block until a message was received.
receiveMessageAs :: forall a r q. (HasCallStack, Typeable a, SetMember Process (Process q) r) => SchedulerProxy q -> Eff r a Source #
Receive and cast the message to some Typeable
instance.
receiveLoop :: forall r q. (SetMember Process (Process q) r, HasCallStack) => SchedulerProxy q -> (Either (Maybe String) Dynamic -> Eff r ()) -> Eff r () Source #
Enter a loop to receive messages and pass them to a callback, until the
function returns False
.
self :: (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Eff r ProcessId Source #
Returns the ProcessId
of the current process.
sendShutdown :: forall r q. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> ProcessId -> Eff r () Source #
Exit a process addressed by the ProcessId
.
See SendShutdown
.
sendShutdownChecked :: forall r q. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> ProcessId -> Eff r Bool Source #
Like sendShutdown
, but also return True
iff the process to exit exists.
exitWithError :: forall r q a. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> String -> Eff r a Source #
Exit the process with an error.
exitNormally :: forall r q a. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Eff r a Source #
Exit the process.
raiseError :: forall r q b. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> String -> Eff r b Source #
Thrown an error, can be caught by catchRaisedError
.
catchRaisedError :: forall r q w. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> (String -> Eff r w) -> Eff r w -> Eff r w Source #
Catch and handle an error raised by raiseError
. Works independent of the
handler implementation.
ignoreProcessError :: (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Eff r a -> Eff r (Either String a) Source #
Like catchRaisedError
it catches raiseError
, but instead of invoking a
user provided handler, the result is wrapped into an Either
.