| Safe Haskell | None |
|---|---|
| Language | Haskell2010 |
Control.Eff.Concurrent.Process
Contents
- Process Effect
- Process Effect Aliases
- Process Effect Constraints
- Process Info
- Process Timer
- Message Data
- ProcessId Type
- Process State
- Yielding
- Waiting/Sleeping
- Sending Messages
- Utilities
- Receiving Messages
- Selecting Messages to Receive
- Process State Reflection
- Spawning
- Process Operation Execution
- Exits and Interrupts
- Control Flow
- Typed ProcessIds: Receiver
Description
The message passing effect.
This module describes an abstract message passing effect, and a process effect, mimicking Erlang's process and message semantics.
Two scheduler implementations for the Process effect are provided:
- A scheduler using
forkIO, i.e. relying on the multi threaded GHC runtime: Control.Eff.Concurrent.Process.ForkIOScheduler - And a pure(rer) coroutine based scheduler in: Control.Eff.Concurrent.Process.SingleThreadedScheduler
Synopsis
- data Process (r :: [Type -> Type]) b where
- FlushMessages :: Process r (ResumeProcess [StrictDynamic])
- YieldProcess :: Process r (ResumeProcess ())
- Delay :: Timeout -> Process r (ResumeProcess ())
- SelfPid :: Process r (ResumeProcess ProcessId)
- Spawn :: ProcessTitle -> Eff (Process r ': r) () -> Process r (ResumeProcess ProcessId)
- SpawnLink :: ProcessTitle -> Eff (Process r ': r) () -> Process r (ResumeProcess ProcessId)
- Shutdown :: Interrupt NoRecovery -> Process r a
- SendShutdown :: ProcessId -> Interrupt NoRecovery -> Process r (ResumeProcess ())
- SendInterrupt :: ProcessId -> Interrupt Recoverable -> Process r (ResumeProcess ())
- SendMessage :: ProcessId -> StrictDynamic -> Process r (ResumeProcess ())
- ReceiveSelectedMessage :: forall r a. MessageSelector a -> Process r (ResumeProcess a)
- MakeReference :: Process r (ResumeProcess Int)
- Monitor :: ProcessId -> Process r (ResumeProcess MonitorReference)
- Demonitor :: MonitorReference -> Process r (ResumeProcess ())
- Link :: ProcessId -> Process r (ResumeProcess ())
- Unlink :: ProcessId -> Process r (ResumeProcess ())
- UpdateProcessDetails :: ProcessDetails -> Process r (ResumeProcess ())
- GetProcessState :: ProcessId -> Process r (ResumeProcess (Maybe (ProcessTitle, ProcessDetails, ProcessState)))
- type SafeProcesses r = Process r ': r
- type Processes e = Interrupts ': SafeProcesses e
- type HasProcesses e inner = (HasSafeProcesses e inner, Member Interrupts e)
- type HasSafeProcesses e inner = SetMember Process (Process inner) e
- newtype ProcessTitle = MkProcessTitle {}
- fromProcessTitle :: Lens' ProcessTitle Text
- newtype ProcessDetails = MkProcessDetails {}
- fromProcessDetails :: Lens' ProcessDetails Text
- newtype Timeout = TimeoutMicros {}
- data StrictDynamic
- toStrictDynamic :: (Typeable a, NFData a) => a -> StrictDynamic
- fromStrictDynamic :: Typeable a => StrictDynamic -> Maybe a
- unwrapStrictDynamic :: StrictDynamic -> Dynamic
- newtype Serializer message = MkSerializer {
- runSerializer :: message -> StrictDynamic
- newtype ProcessId = ProcessId {}
- fromProcessId :: Iso' ProcessId Int
- data ProcessState
- = ProcessBooting
- | ProcessIdle
- | ProcessBusy
- | ProcessBusySleeping
- | ProcessBusyUpdatingDetails
- | ProcessBusySending
- | ProcessBusySendingShutdown
- | ProcessBusySendingInterrupt
- | ProcessBusyReceiving
- | ProcessBusyLinking
- | ProcessBusyUnlinking
- | ProcessBusyMonitoring
- | ProcessBusyDemonitoring
- | ProcessInterrupted
- | ProcessShuttingDown
- yieldProcess :: forall r q. (HasProcesses r q, HasCallStack) => Eff r ()
- delay :: forall r q. (HasProcesses r q, HasCallStack) => Timeout -> Eff r ()
- sendMessage :: forall o r q. (HasProcesses r q, HasCallStack, Typeable o, NFData o) => ProcessId -> o -> Eff r ()
- sendAnyMessage :: forall r q. (HasCallStack, HasProcesses r q) => ProcessId -> StrictDynamic -> Eff r ()
- makeReference :: (HasCallStack, HasProcesses r q) => Eff r Int
- receiveMessage :: forall a r q. (HasCallStack, Typeable a, NFData a, Show a, HasProcesses r q) => Eff r a
- receiveSelectedMessage :: forall r q a. (HasCallStack, Show a, HasProcesses r q) => MessageSelector a -> Eff r a
- flushMessages :: forall r q. (HasCallStack, HasProcesses r q) => Eff r [StrictDynamic]
- receiveAnyMessage :: forall r q. (HasCallStack, HasProcesses r q) => Eff r StrictDynamic
- receiveLoop :: forall r q a endOfLoopResult. (HasSafeProcesses r q, HasCallStack, NFData a, Typeable a) => (Either (Interrupt Recoverable) a -> Eff r (Maybe endOfLoopResult)) -> Eff r endOfLoopResult
- receiveSelectedLoop :: forall r q a endOfLoopResult. (HasSafeProcesses r q, HasCallStack) => MessageSelector a -> (Either (Interrupt Recoverable) a -> Eff r (Maybe endOfLoopResult)) -> Eff r endOfLoopResult
- receiveAnyLoop :: forall r q endOfLoopResult. (HasSafeProcesses r q, HasCallStack) => (Either (Interrupt Recoverable) StrictDynamic -> Eff r (Maybe endOfLoopResult)) -> Eff r endOfLoopResult
- data MessageSelector a
- selectMessage :: Typeable t => MessageSelector t
- filterMessage :: Typeable a => (a -> Bool) -> MessageSelector a
- selectMessageWith :: Typeable a => (a -> Maybe b) -> MessageSelector b
- selectDynamicMessage :: (StrictDynamic -> Maybe a) -> MessageSelector a
- selectAnyMessage :: MessageSelector StrictDynamic
- self :: (HasCallStack, HasSafeProcesses r q) => Eff r ProcessId
- isProcessAlive :: forall r q. (HasCallStack, HasProcesses r q) => ProcessId -> Eff r Bool
- getProcessState :: forall r q. (HasCallStack, HasProcesses r q) => ProcessId -> Eff r (Maybe (ProcessTitle, ProcessDetails, ProcessState))
- updateProcessDetails :: forall r q. (HasCallStack, HasProcesses r q) => ProcessDetails -> Eff r ()
- spawn :: forall r q. (HasCallStack, HasProcesses r q) => ProcessTitle -> Eff (Processes q) () -> Eff r ProcessId
- spawn_ :: forall r q. (HasCallStack, HasProcesses r q) => ProcessTitle -> Eff (Processes q) () -> Eff r ()
- spawnLink :: forall r q. (HasCallStack, HasProcesses r q) => ProcessTitle -> Eff (Processes q) () -> Eff r ProcessId
- spawnRaw :: forall r q. (HasCallStack, HasProcesses r q) => ProcessTitle -> Eff (SafeProcesses q) () -> Eff r ProcessId
- spawnRaw_ :: forall r q. (HasCallStack, HasProcesses r q) => ProcessTitle -> Eff (SafeProcesses q) () -> Eff r ()
- data ResumeProcess v where
- Interrupted :: Interrupt Recoverable -> ResumeProcess v
- ResumeWith :: a -> ResumeProcess a
- executeAndResume :: forall q r v. (HasSafeProcesses r q, HasCallStack) => Process q (ResumeProcess v) -> Eff r (Either (Interrupt Recoverable) v)
- executeAndResumeOrExit :: forall r q v. (HasSafeProcesses r q, HasCallStack) => Process q (ResumeProcess v) -> Eff r v
- executeAndResumeOrThrow :: forall q r v. (HasProcesses r q, HasCallStack) => Process q (ResumeProcess v) -> Eff r v
- interrupt :: (HasCallStack, Member Interrupts r) => Interrupt Recoverable -> Eff r a
- sendInterrupt :: forall r q. (HasCallStack, HasProcesses r q) => ProcessId -> Interrupt Recoverable -> Eff r ()
- exitBecause :: forall r q a. (HasCallStack, HasSafeProcesses r q) => Interrupt NoRecovery -> Eff r a
- exitNormally :: forall r q a. (HasCallStack, HasSafeProcesses r q) => Eff r a
- exitWithError :: forall r q a. (HasCallStack, HasSafeProcesses r q) => String -> Eff r a
- sendShutdown :: forall r q. (HasCallStack, HasProcesses r q) => ProcessId -> Interrupt NoRecovery -> Eff r ()
- linkProcess :: forall r q. (HasCallStack, HasProcesses r q) => ProcessId -> Eff r ()
- unlinkProcess :: forall r q. (HasCallStack, HasProcesses r q) => ProcessId -> Eff r ()
- monitor :: forall r q. (HasCallStack, HasProcesses r q) => ProcessId -> Eff r MonitorReference
- demonitor :: forall r q. (HasCallStack, HasProcesses r q) => MonitorReference -> Eff r ()
- data ProcessDown = ProcessDown {}
- selectProcessDown :: MonitorReference -> MessageSelector ProcessDown
- selectProcessDownByProcessId :: ProcessId -> MessageSelector ProcessDown
- becauseProcessIsDown :: ProcessDown -> Interrupt Recoverable
- data MonitorReference = MonitorReference {}
- withMonitor :: (HasCallStack, HasProcesses r q) => ProcessId -> (MonitorReference -> Eff r a) -> Eff r a
- receiveWithMonitor :: (HasCallStack, HasProcesses r q, Typeable a, Show a) => ProcessId -> MessageSelector a -> Eff r (Either ProcessDown a)
- data Interrupt (t :: ExitRecovery) where
- NormalExitRequested :: Interrupt Recoverable
- NormalExitRequestedWith :: forall a. (Typeable a, Show a, NFData a) => a -> Interrupt Recoverable
- OtherProcessNotRunning :: ProcessId -> Interrupt Recoverable
- TimeoutInterrupt :: String -> Interrupt Recoverable
- LinkedProcessCrashed :: ProcessId -> Interrupt Recoverable
- ErrorInterrupt :: String -> Interrupt Recoverable
- InterruptedBy :: forall a. (Typeable a, Show a, NFData a) => a -> Interrupt Recoverable
- ExitNormally :: Interrupt NoRecovery
- ExitNormallyWith :: forall a. (Typeable a, Show a, NFData a) => a -> Interrupt NoRecovery
- ExitUnhandledError :: Text -> Interrupt NoRecovery
- ExitProcessCancelled :: Maybe ProcessId -> Interrupt NoRecovery
- ExitOtherProcessNotRunning :: ProcessId -> Interrupt NoRecovery
- type Interrupts = Exc (Interrupt Recoverable)
- interruptToExit :: Interrupt Recoverable -> Interrupt NoRecovery
- data ExitRecovery
- type RecoverableInterrupt = Interrupt Recoverable
- data ExitSeverity
- = NormalExit
- | Crash
- data SomeExitReason where
- SomeExitReason :: Interrupt x -> SomeExitReason
- toExitRecovery :: Interrupt r -> ExitRecovery
- isRecoverable :: Interrupt x -> Bool
- toExitSeverity :: Interrupt e -> ExitSeverity
- isProcessDownInterrupt :: Maybe ProcessId -> Interrupt r -> Bool
- isCrash :: Interrupt x -> Bool
- toCrashReason :: Interrupt x -> Maybe Text
- fromSomeExitReason :: SomeExitReason -> Either (Interrupt NoRecovery) (Interrupt Recoverable)
- logProcessExit :: forall e x. (Member Logs e, HasCallStack) => Interrupt x -> Eff e ()
- provideInterruptsShutdown :: forall e a. Eff (Processes e) a -> Eff (SafeProcesses e) a
- handleInterrupts :: (HasCallStack, Member Interrupts r) => (Interrupt Recoverable -> Eff r a) -> Eff r a -> Eff r a
- tryUninterrupted :: (HasCallStack, Member Interrupts r) => Eff r a -> Eff r (Either (Interrupt Recoverable) a)
- exitOnInterrupt :: (HasCallStack, HasProcesses r q) => Eff r a -> Eff r a
- logInterrupts :: forall r. (Member Logs r, HasCallStack, Member Interrupts r) => Eff r () -> Eff r ()
- provideInterrupts :: HasCallStack => Eff (Interrupts ': r) a -> Eff r (Either (Interrupt Recoverable) a)
- mergeEitherInterruptAndExitReason :: Either (Interrupt Recoverable) (Interrupt NoRecovery) -> Interrupt NoRecovery
- sendToReceiver :: (NFData o, HasProcesses r q) => Receiver o -> o -> Eff r ()
- data Receiver a = (NFData out, Typeable out, Show out) => Receiver {
- _receiverPid :: ProcessId
- _receiverSerializer :: a -> out
- receiverPid :: forall a. Lens' (Receiver a) ProcessId
Process Effect
data Process (r :: [Type -> Type]) b where Source #
The process effect is the basis for message passing concurrency. This effect describes an interface for concurrent, communicating isolated processes identified uniquely by a process-id.
Processes can raise exceptions that can be caught, exit gracefully or with an error, or be killed by other processes, with the option of ignoring the shutdown request.
Process Scheduling is implemented in different modules. All scheduler implementations should follow some basic rules:
- fair scheduling
- sending a message does not block
- receiving a message does block
- spawning a child blocks only a very moment
- a newly spawned process shall be scheduled before the parent process after
- the spawnRaw
- when the first process exists, all process should be killed immediately
Constructors
| FlushMessages :: Process r (ResumeProcess [StrictDynamic]) | Remove all messages from the process' message queue |
| YieldProcess :: Process r (ResumeProcess ()) | In cooperative schedulers, this will give processing time to the scheduler. Every other operation implicitly serves the same purpose. Since: 0.12.0 |
| Delay :: Timeout -> Process r (ResumeProcess ()) | Simply wait until the time in the given Since: 0.30.0 |
| SelfPid :: Process r (ResumeProcess ProcessId) | Return the current |
| Spawn :: ProcessTitle -> Eff (Process r ': r) () -> Process r (ResumeProcess ProcessId) | Start a new process, the new process will execute an effect, the function
will return immediately with a |
| SpawnLink :: ProcessTitle -> Eff (Process r ': r) () -> Process r (ResumeProcess ProcessId) | Start a new process, and Since: 0.12.0 |
| Shutdown :: Interrupt NoRecovery -> Process r a | Shutdown the process; irregardless of the exit reason, this function never returns, |
| SendShutdown :: ProcessId -> Interrupt NoRecovery -> Process r (ResumeProcess ()) | Shutdown another process immediately, the other process has no way of handling this! |
| SendInterrupt :: ProcessId -> Interrupt Recoverable -> Process r (ResumeProcess ()) | Request that another a process interrupts. The targeted process is interrupted
and gets an |
| SendMessage :: ProcessId -> StrictDynamic -> Process r (ResumeProcess ()) | Send a message to a process addressed by the |
| ReceiveSelectedMessage :: forall r a. MessageSelector a -> Process r (ResumeProcess a) | Receive a message that matches a criteria.
This should block until an a message was received. The message is returned
as a |
| MakeReference :: Process r (ResumeProcess Int) | Generate a unique |
| Monitor :: ProcessId -> Process r (ResumeProcess MonitorReference) | Monitor another process. When the monitored process exits a
Since: 0.12.0 |
| Demonitor :: MonitorReference -> Process r (ResumeProcess ()) | Remove a monitor. Since: 0.12.0 |
| Link :: ProcessId -> Process r (ResumeProcess ()) | Connect the calling process to another process, such that
if one of the processes crashes (i.e. You might wonder: Why not tearing down the linked process when exiting
normally?
I thought about this. If a process exits normally, it should have the
opportunity to shutdown stuff explicitly.
And if you want to make sure that there are no dangling child processes
after e.g. a broker crash, you can always use Since: 0.12.0 |
| Unlink :: ProcessId -> Process r (ResumeProcess ()) | Unlink the calling process from the other process. See Since: 0.12.0 |
| UpdateProcessDetails :: ProcessDetails -> Process r (ResumeProcess ()) | Update the |
| GetProcessState :: ProcessId -> Process r (ResumeProcess (Maybe (ProcessTitle, ProcessDetails, ProcessState))) | Get the |
Instances
Process Effect Aliases
type SafeProcesses r = Process r ': r Source #
Cons Process onto a list of effects. This is called SafeProcesses because
the the actions cannot be interrupted in.
type Processes e = Interrupts ': SafeProcesses e Source #
This adds a layer of the Interrupts effect on top of Processes
Process Effect Constraints
type HasProcesses e inner = (HasSafeProcesses e inner, Member Interrupts e) Source #
A constraint for an effect set that requires the presence of Processes.
This constrains the effect list to look like this:
[e1 ... eN, Interrupts, Process [e(N+1) .. e(N+k)], e(N+1) .. e(N+k)]
It constrains e beyond HasSafeProcesses to encompass Interrupts.
Since: 0.27.1
type HasSafeProcesses e inner = SetMember Process (Process inner) e Source #
A constraint for an effect set that requires the presence of SafeProcesses.
This constrains the effect list to look like this:
[e1 ... eN, Process [e(N+1) .. e(N+k)], e(N+1) .. e(N+k)]
It constrains e to support the (only) Process effect.
This is more relaxed that HasProcesses since it does not require Interrupts.
Since: 0.27.1
Process Info
newtype ProcessTitle Source #
A short title for a Process for logging purposes.
Since: 0.24.1
Constructors
| MkProcessTitle | |
Fields | |
Instances
fromProcessTitle :: Lens' ProcessTitle Text Source #
An isomorphism lens for the ProcessTitle
Since: 0.24.1
newtype ProcessDetails Source #
A multi-line text describing the current state of a process for debugging purposes.
Since: 0.24.1
Constructors
| MkProcessDetails | |
Fields | |
Instances
fromProcessDetails :: Lens' ProcessDetails Text Source #
An isomorphism lens for the ProcessDetails
Since: 0.24.1
Process Timer
A number of micro seconds.
Since: 0.12.0
Constructors
| TimeoutMicros | |
Fields | |
Instances
| Enum Timeout Source # | |
| Eq Timeout Source # | |
| Integral Timeout Source # | |
Defined in Control.Eff.Concurrent.Process | |
| Num Timeout Source # | |
| Ord Timeout Source # | |
Defined in Control.Eff.Concurrent.Process | |
| Real Timeout Source # | |
Defined in Control.Eff.Concurrent.Process Methods toRational :: Timeout -> Rational # | |
| Show Timeout Source # | |
| NFData Timeout Source # | |
Defined in Control.Eff.Concurrent.Process | |
Message Data
data StrictDynamic Source #
Data flows between Processes via these messages.
This is just a newtype wrapper around Dynamic.
The reason this type exists is to force construction through the code in this
module, which always evaluates a message to normal form before
sending it to another process.
Since: 0.22.0
Instances
| Show StrictDynamic Source # | |
Defined in Control.Eff.Concurrent.Process Methods showsPrec :: Int -> StrictDynamic -> ShowS # show :: StrictDynamic -> String # showList :: [StrictDynamic] -> ShowS # | |
| NFData StrictDynamic Source # | |
Defined in Control.Eff.Concurrent.Process Methods rnf :: StrictDynamic -> () # | |
toStrictDynamic :: (Typeable a, NFData a) => a -> StrictDynamic Source #
Deeply evaluate the given value and wrap it into a StrictDynamic.
Since: 0.22.0
fromStrictDynamic :: Typeable a => StrictDynamic -> Maybe a Source #
Convert a StrictDynamic back to a value.
Since: 0.22.0
unwrapStrictDynamic :: StrictDynamic -> Dynamic Source #
Convert a StrictDynamic back to an unwrapped Dynamic.
Since: 0.22.0
newtype Serializer message Source #
Serialize a message into a StrictDynamic value to be sent via sendAnyMessage.
This indirection allows, among other things, the composition of
Servers.
Since: 0.24.1
Constructors
| MkSerializer | |
Fields
| |
Instances
| Contravariant Serializer Source # | |
Defined in Control.Eff.Concurrent.Process Methods contramap :: (a -> b) -> Serializer b -> Serializer a # (>$) :: b -> Serializer b -> Serializer a # | |
| Typeable message => Show (Serializer message) Source # | |
Defined in Control.Eff.Concurrent.Process Methods showsPrec :: Int -> Serializer message -> ShowS # show :: Serializer message -> String # showList :: [Serializer message] -> ShowS # | |
| NFData (Serializer message) Source # | |
Defined in Control.Eff.Concurrent.Process Methods rnf :: Serializer message -> () # | |
ProcessId Type
Each process is identified by a single process id, that stays constant throughout the life cycle of a process. Also, message sending relies on these values to address messages to processes.
Constructors
| ProcessId | |
Fields | |
Instances
Process State
data ProcessState Source #
The state that a Process is currently in.
Constructors
| ProcessBooting | The process has just been started but not scheduled yet. |
| ProcessIdle | The process yielded it's time slice |
| ProcessBusy | The process is busy with a non-blocking operation |
| ProcessBusySleeping | |
| ProcessBusyUpdatingDetails | The process is busy with |
| ProcessBusySending | The process is busy with sending a message |
| ProcessBusySendingShutdown | The process is busy with killing |
| ProcessBusySendingInterrupt | The process is busy with killing |
| ProcessBusyReceiving | The process blocked by a |
| ProcessBusyLinking | The process blocked by a |
| ProcessBusyUnlinking | The process blocked by a |
| ProcessBusyMonitoring | The process blocked by a |
| ProcessBusyDemonitoring | The process blocked by a |
| ProcessInterrupted | The process was interrupted |
| ProcessShuttingDown | The process was shutdown or crashed |
Instances
Yielding
yieldProcess :: forall r q. (HasProcesses r q, HasCallStack) => Eff r () Source #
Use executeAndResumeOrExit to execute YieldProcess. Refer to YieldProcess
for more information.
Waiting/Sleeping
delay :: forall r q. (HasProcesses r q, HasCallStack) => Timeout -> Eff r () Source #
Simply block until the time in the Timeout has passed.
Since: 0.30.0
Sending Messages
sendMessage :: forall o r q. (HasProcesses r q, HasCallStack, Typeable o, NFData o) => ProcessId -> o -> Eff r () Source #
Send a message to a process addressed by the ProcessId.
See SendMessage.
The message will be reduced to normal form (rnf) by/in the caller process.
sendAnyMessage :: forall r q. (HasCallStack, HasProcesses r q) => ProcessId -> StrictDynamic -> Eff r () Source #
Send a Dynamic value to a process addressed by the ProcessId.
See SendMessage.
Utilities
makeReference :: (HasCallStack, HasProcesses r q) => Eff r Int Source #
Generate a unique Int for the current process.
Receiving Messages
receiveMessage :: forall a r q. (HasCallStack, Typeable a, NFData a, Show a, HasProcesses r q) => Eff r a Source #
Receive and cast the message to some Typeable instance.
See ReceiveSelectedMessage for more documentation.
This will wait for a message of the return type using receiveSelectedMessage
receiveSelectedMessage :: forall r q a. (HasCallStack, Show a, HasProcesses r q) => MessageSelector a -> Eff r a Source #
Block until a message was received, that is not Nothing after applying
a callback to it.
See ReceiveSelectedMessage for more documentation.
flushMessages :: forall r q. (HasCallStack, HasProcesses r q) => Eff r [StrictDynamic] Source #
Remove and return all messages currently enqueued in the process message queue.
Since: 0.12.0
receiveAnyMessage :: forall r q. (HasCallStack, HasProcesses r q) => Eff r StrictDynamic Source #
Block until a message was received.
See ReceiveSelectedMessage for more documentation.
receiveLoop :: forall r q a endOfLoopResult. (HasSafeProcesses r q, HasCallStack, NFData a, Typeable a) => (Either (Interrupt Recoverable) a -> Eff r (Maybe endOfLoopResult)) -> Eff r endOfLoopResult Source #
Like receiveSelectedLoop but refined to casting to a specific Typeable
using selectMessage.
receiveSelectedLoop :: forall r q a endOfLoopResult. (HasSafeProcesses r q, HasCallStack) => MessageSelector a -> (Either (Interrupt Recoverable) a -> Eff r (Maybe endOfLoopResult)) -> Eff r endOfLoopResult Source #
Enter a loop to receive messages and pass them to a callback, until the
function returns Just a result.
Only the messages of the given type will be received.
If the process is interrupted by an exception of by a SendShutdown from
another process, with an exit reason that satisfies isRecoverable, then
the callback will be invoked with , otherwise the
process will be exited with the same reason using Left InterruptexitBecause.
See also ReceiveSelectedMessage for more documentation.
receiveAnyLoop :: forall r q endOfLoopResult. (HasSafeProcesses r q, HasCallStack) => (Either (Interrupt Recoverable) StrictDynamic -> Eff r (Maybe endOfLoopResult)) -> Eff r endOfLoopResult Source #
Like receiveSelectedLoop but not selective.
See also selectAnyMessage, receiveSelectedLoop.
Selecting Messages to Receive
data MessageSelector a Source #
A function that decided if the next message will be received by
ReceiveSelectedMessage. It conveniently is an instance of
Alternative so the message selector can be combined:
>
> selectInt :: MessageSelector Int
> selectInt = selectMessage
>
> selectString :: MessageSelector String
> selectString = selectMessage
>
> selectIntOrString :: MessageSelector (Either Int String)
> selectIntOrString =
> Left $ selectTimeout| Right $ selectString
Instances
selectMessage :: Typeable t => MessageSelector t Source #
Create a message selector for a value that can be obtained by fromStrictDynamic.
Since: 0.9.1
filterMessage :: Typeable a => (a -> Bool) -> MessageSelector a Source #
Create a message selector from a predicate.
Since: 0.9.1
selectMessageWith :: Typeable a => (a -> Maybe b) -> MessageSelector b Source #
Select a message of type a and apply the given function to it.
If the function returns Just The ReceiveSelectedMessage function will
return the result (sans Maybe).
Since: 0.9.1
selectDynamicMessage :: (StrictDynamic -> Maybe a) -> MessageSelector a Source #
Create a message selector.
Since: 0.9.1
selectAnyMessage :: MessageSelector StrictDynamic Source #
Create a message selector that will match every message. This is lazy
because the result is not forceed.
Since: 0.9.1
Process State Reflection
self :: (HasCallStack, HasSafeProcesses r q) => Eff r ProcessId Source #
Returns the ProcessId of the current process.
isProcessAlive :: forall r q. (HasCallStack, HasProcesses r q) => ProcessId -> Eff r Bool Source #
Return True if the process is alive.
Since: 0.12.0
getProcessState :: forall r q. (HasCallStack, HasProcesses r q) => ProcessId -> Eff r (Maybe (ProcessTitle, ProcessDetails, ProcessState)) Source #
Return the ProcessTitle, ProcessDetails and ProcessState,
for the given process, if the process is alive.
Since: 0.24.1
updateProcessDetails :: forall r q. (HasCallStack, HasProcesses r q) => ProcessDetails -> Eff r () Source #
Replace the ProcessDetails of the process.
Since: 0.24.1
Spawning
spawn :: forall r q. (HasCallStack, HasProcesses r q) => ProcessTitle -> Eff (Processes q) () -> Eff r ProcessId Source #
Start a new process, the new process will execute an effect, the function
will return immediately with a ProcessId. If the new process is
interrupted, the process will Shutdown with the Interrupt
wrapped in interruptToExit. For specific use cases it might be better to use
spawnRaw.
spawn_ :: forall r q. (HasCallStack, HasProcesses r q) => ProcessTitle -> Eff (Processes q) () -> Eff r () Source #
Like spawn but return ().
spawnLink :: forall r q. (HasCallStack, HasProcesses r q) => ProcessTitle -> Eff (Processes q) () -> Eff r ProcessId Source #
spawnRaw :: forall r q. (HasCallStack, HasProcesses r q) => ProcessTitle -> Eff (SafeProcesses q) () -> Eff r ProcessId Source #
Start a new process, the new process will execute an effect, the function
will return immediately with a ProcessId. The spawned process has only the
raw SafeProcesses effects. For non-library code spawn might be better
suited.
spawnRaw_ :: forall r q. (HasCallStack, HasProcesses r q) => ProcessTitle -> Eff (SafeProcesses q) () -> Eff r () Source #
Like spawnRaw but return ().
Process Operation Execution
data ResumeProcess v where Source #
Every Process action returns it's actual result wrapped in this type. It
will allow to signal errors as well as pass on normal results such as
incoming messages.
Constructors
| Interrupted :: Interrupt Recoverable -> ResumeProcess v | The current operation of the process was interrupted with a
|
| ResumeWith :: a -> ResumeProcess a | The process may resume to do work, using the given result. |
Instances
executeAndResume :: forall q r v. (HasSafeProcesses r q, HasCallStack) => Process q (ResumeProcess v) -> Eff r (Either (Interrupt Recoverable) v) Source #
Execute a and action and return the result;
if the process is interrupted by an error or exception, or an explicit
shutdown from another process, or through a crash of a linked process, i.e.
whenever the exit reason satisfies isRecoverable, return the exit reason.
executeAndResumeOrExit :: forall r q v. (HasSafeProcesses r q, HasCallStack) => Process q (ResumeProcess v) -> Eff r v Source #
Execute a Process action and resume the process, exit
the process when an Interrupts was raised. Use executeAndResume to catch
interrupts.
executeAndResumeOrThrow :: forall q r v. (HasProcesses r q, HasCallStack) => Process q (ResumeProcess v) -> Eff r v Source #
Execute a Process action and resume the process, exit
the process when an Interrupts was raised. Use executeAndResume to catch
interrupts.
Exits and Interrupts
Interrupting Processes
interrupt :: (HasCallStack, Member Interrupts r) => Interrupt Recoverable -> Eff r a Source #
Throw an Interrupt, can be handled by handleInterrupts or
exitOnInterrupt or provideInterrupts.
sendInterrupt :: forall r q. (HasCallStack, HasProcesses r q) => ProcessId -> Interrupt Recoverable -> Eff r () Source #
Interrupts a process addressed by the ProcessId. The process might exit,
or it may continue.
| Like sendInterrupt, but also return True iff the process to exit exists.
Exiting Processes
exitBecause :: forall r q a. (HasCallStack, HasSafeProcesses r q) => Interrupt NoRecovery -> Eff r a Source #
Exit the process with a Interrupt.
exitNormally :: forall r q a. (HasCallStack, HasSafeProcesses r q) => Eff r a Source #
Exit the process.
exitWithError :: forall r q a. (HasCallStack, HasSafeProcesses r q) => String -> Eff r a Source #
Exit the process with an error.
sendShutdown :: forall r q. (HasCallStack, HasProcesses r q) => ProcessId -> Interrupt NoRecovery -> Eff r () Source #
Exit a process addressed by the ProcessId. The process will exit,
it might do some cleanup, but is ultimately unrecoverable.
See SendShutdown.
Linking Processes
linkProcess :: forall r q. (HasCallStack, HasProcesses r q) => ProcessId -> Eff r () Source #
Connect the calling process to another process, such that
if one of the processes crashes (i.e. isCrash returns True), the other
is shutdown with the Interrupt LinkedProcessCrashed.
See Link for a discussion on linking.
Since: 0.12.0
unlinkProcess :: forall r q. (HasCallStack, HasProcesses r q) => ProcessId -> Eff r () Source #
Unlink the calling process from the other process.
See Link for a discussion on linking.
Since: 0.12.0
Monitor Processes
monitor :: forall r q. (HasCallStack, HasProcesses r q) => ProcessId -> Eff r MonitorReference Source #
Monitor another process. When the monitored process exits a
ProcessDown is sent to the calling process.
The return value is a unique identifier for that monitor.
There can be multiple monitors on the same process,
and a message for each will be sent.
If the process is already dead, the ProcessDown message
will be sent immediately, without exit reason
Since: 0.12.0
demonitor :: forall r q. (HasCallStack, HasProcesses r q) => MonitorReference -> Eff r () Source #
Remove a monitor created with monitor.
Since: 0.12.0
data ProcessDown Source #
A monitored process exited. This message is sent to a process by the scheduler, when a process that was monitored died.
Since: 0.12.0
Constructors
| ProcessDown | |
Fields
| |
Instances
selectProcessDown :: MonitorReference -> MessageSelector ProcessDown Source #
A MessageSelector for the ProcessDown message of a specific
process.
The parameter is the value obtained by monitor.
Since: 0.12.0
selectProcessDownByProcessId :: ProcessId -> MessageSelector ProcessDown Source #
A MessageSelector for the ProcessDown message. of a specific
process.
In contrast to selectProcessDown this function matches the ProcessId.
Since: 0.28.0
becauseProcessIsDown :: ProcessDown -> Interrupt Recoverable Source #
Make an Interrupt for a ProcessDown message.
For example: doSomething >>= either (interrupt . becauseProcessIsDown) return
Since: 0.12.0
data MonitorReference Source #
A value that contains a unique reference of a process monitoring.
Since: 0.12.0
Constructors
| MonitorReference | |
Fields | |
Instances
withMonitor :: (HasCallStack, HasProcesses r q) => ProcessId -> (MonitorReference -> Eff r a) -> Eff r a Source #
receiveWithMonitor :: (HasCallStack, HasProcesses r q, Typeable a, Show a) => ProcessId -> MessageSelector a -> Eff r (Either ProcessDown a) Source #
A MessageSelector for receiving either a monitor of the
given process or another message.
Since: 0.12.0
Exit and Interrupt Reasons
data Interrupt (t :: ExitRecovery) where Source #
A sum-type with reasons for why a process operation, such as receiving messages, is interrupted in the scheduling loop.
This includes errors, that can occur when scheduling messages.
Since: 0.23.0
Constructors
| NormalExitRequested :: Interrupt Recoverable | A process has finished a unit of work and might exit or work on
something else. This is primarily used for interrupting infinite
server loops, allowing for additional cleanup work before
exiting (e.g. with Since: 0.13.2 |
| NormalExitRequestedWith :: forall a. (Typeable a, Show a, NFData a) => a -> Interrupt Recoverable | Extension of Since: 0.30.0 |
| OtherProcessNotRunning :: ProcessId -> Interrupt Recoverable | A process that should be running was not running. |
| TimeoutInterrupt :: String -> Interrupt Recoverable | A |
| LinkedProcessCrashed :: ProcessId -> Interrupt Recoverable | A linked process is down, see |
| ErrorInterrupt :: String -> Interrupt Recoverable | An exit reason that has an error message and is |
| InterruptedBy :: forall a. (Typeable a, Show a, NFData a) => a -> Interrupt Recoverable | An interrupt with a custom message. Since: 0.30.0 |
| ExitNormally :: Interrupt NoRecovery | A process function returned or exited without any error. |
| ExitNormallyWith :: forall a. (Typeable a, Show a, NFData a) => a -> Interrupt NoRecovery | A process function returned or exited without any error, and with a custom message Since: 0.30.0 |
| ExitUnhandledError :: Text -> Interrupt NoRecovery | An error causes the process to exit immediately.
For example an unexpected runtime exception was thrown, i.e. an exception
derived from |
| ExitProcessCancelled :: Maybe ProcessId -> Interrupt NoRecovery | A process shall exit immediately, without any cleanup was cancelled (e.g. killed, in |
| ExitOtherProcessNotRunning :: ProcessId -> Interrupt NoRecovery | A process that is vital to the crashed process was not running |
Instances
type Interrupts = Exc (Interrupt Recoverable) Source #
Exceptions containing Interrupts.
See handleInterrupts, exitOnInterrupt or provideInterrupts
interruptToExit :: Interrupt Recoverable -> Interrupt NoRecovery Source #
Return either ExitNormally or interruptToExit from a Recoverable Interrupt;
If the Interrupt is NormalExitRequested then return ExitNormally
data ExitRecovery Source #
This kind is used to indicate if a Interrupt can be treated like
a short interrupt which can be handled or ignored.
Constructors
| Recoverable | |
| NoRecovery |
Instances
| Eq ExitRecovery Source # | |
Defined in Control.Eff.Concurrent.Process | |
| Ord ExitRecovery Source # | |
Defined in Control.Eff.Concurrent.Process Methods compare :: ExitRecovery -> ExitRecovery -> Ordering # (<) :: ExitRecovery -> ExitRecovery -> Bool # (<=) :: ExitRecovery -> ExitRecovery -> Bool # (>) :: ExitRecovery -> ExitRecovery -> Bool # (>=) :: ExitRecovery -> ExitRecovery -> Bool # max :: ExitRecovery -> ExitRecovery -> ExitRecovery # min :: ExitRecovery -> ExitRecovery -> ExitRecovery # | |
| Show ExitRecovery Source # | |
Defined in Control.Eff.Concurrent.Process Methods showsPrec :: Int -> ExitRecovery -> ShowS # show :: ExitRecovery -> String # showList :: [ExitRecovery] -> ShowS # | |
| Generic ExitRecovery Source # | |
Defined in Control.Eff.Concurrent.Process Associated Types type Rep ExitRecovery :: Type -> Type # | |
| NFData ExitRecovery Source # | |
Defined in Control.Eff.Concurrent.Process Methods rnf :: ExitRecovery -> () # | |
| type Rep ExitRecovery Source # | |
Defined in Control.Eff.Concurrent.Process | |
type RecoverableInterrupt = Interrupt Recoverable Source #
Interrupts which are Recoverable.
data ExitSeverity Source #
This value indicates whether a process exited in way consistent with the planned behaviour or not.
Constructors
| NormalExit | |
| Crash |
Instances
| Eq ExitSeverity Source # | |
Defined in Control.Eff.Concurrent.Process | |
| Ord ExitSeverity Source # | |
Defined in Control.Eff.Concurrent.Process Methods compare :: ExitSeverity -> ExitSeverity -> Ordering # (<) :: ExitSeverity -> ExitSeverity -> Bool # (<=) :: ExitSeverity -> ExitSeverity -> Bool # (>) :: ExitSeverity -> ExitSeverity -> Bool # (>=) :: ExitSeverity -> ExitSeverity -> Bool # max :: ExitSeverity -> ExitSeverity -> ExitSeverity # min :: ExitSeverity -> ExitSeverity -> ExitSeverity # | |
| Show ExitSeverity Source # | |
Defined in Control.Eff.Concurrent.Process Methods showsPrec :: Int -> ExitSeverity -> ShowS # show :: ExitSeverity -> String # showList :: [ExitSeverity] -> ShowS # | |
| Generic ExitSeverity Source # | |
Defined in Control.Eff.Concurrent.Process Associated Types type Rep ExitSeverity :: Type -> Type # | |
| NFData ExitSeverity Source # | |
Defined in Control.Eff.Concurrent.Process Methods rnf :: ExitSeverity -> () # | |
| type Rep ExitSeverity Source # | |
Defined in Control.Eff.Concurrent.Process | |
data SomeExitReason where Source #
An existential wrapper around Interrupt
Constructors
| SomeExitReason :: Interrupt x -> SomeExitReason |
Instances
| Eq SomeExitReason Source # | |
Defined in Control.Eff.Concurrent.Process Methods (==) :: SomeExitReason -> SomeExitReason -> Bool # (/=) :: SomeExitReason -> SomeExitReason -> Bool # | |
| Ord SomeExitReason Source # | |
Defined in Control.Eff.Concurrent.Process Methods compare :: SomeExitReason -> SomeExitReason -> Ordering # (<) :: SomeExitReason -> SomeExitReason -> Bool # (<=) :: SomeExitReason -> SomeExitReason -> Bool # (>) :: SomeExitReason -> SomeExitReason -> Bool # (>=) :: SomeExitReason -> SomeExitReason -> Bool # max :: SomeExitReason -> SomeExitReason -> SomeExitReason # min :: SomeExitReason -> SomeExitReason -> SomeExitReason # | |
| Show SomeExitReason Source # | |
Defined in Control.Eff.Concurrent.Process Methods showsPrec :: Int -> SomeExitReason -> ShowS # show :: SomeExitReason -> String # showList :: [SomeExitReason] -> ShowS # | |
| NFData SomeExitReason Source # | |
Defined in Control.Eff.Concurrent.Process Methods rnf :: SomeExitReason -> () # | |
toExitRecovery :: Interrupt r -> ExitRecovery Source #
Get the ExitRecovery
isRecoverable :: Interrupt x -> Bool Source #
A predicate for recoverable exit reasons. This predicate defines the
exit reasons which functions such as executeAndResume
toExitSeverity :: Interrupt e -> ExitSeverity Source #
Get the ExitSeverity of a Interrupt.
isProcessDownInterrupt :: Maybe ProcessId -> Interrupt r -> Bool Source #
A predicate for linked process crashes.
isCrash :: Interrupt x -> Bool Source #
A predicate for crashes. A crash happens when a process exits
with an Interrupt other than ExitNormally
toCrashReason :: Interrupt x -> Maybe Text Source #
Print a Interrupt to Just a formatted String when isCrash
is True.
This can be useful in combination with view patterns, e.g.:
logCrash :: Interrupt -> Eff e () logCrash (toCrashReason -> Just reason) = logError reason logCrash _ = return ()
Though this can be improved to:
logCrash = traverse_ logError . toCrashReason
fromSomeExitReason :: SomeExitReason -> Either (Interrupt NoRecovery) (Interrupt Recoverable) Source #
Partition a SomeExitReason back into either a NoRecovery
or a Recoverable Interrupt
logProcessExit :: forall e x. (Member Logs e, HasCallStack) => Interrupt x -> Eff e () Source #
Log the Interrupts
Control Flow
Process Interrupt Recoverable Handling
provideInterruptsShutdown :: forall e a. Eff (Processes e) a -> Eff (SafeProcesses e) a Source #
Handle all Interrupts of an Processes by
wrapping them up in interruptToExit and then do a process Shutdown.
handleInterrupts :: (HasCallStack, Member Interrupts r) => (Interrupt Recoverable -> Eff r a) -> Eff r a -> Eff r a Source #
Handle Interrupts arising during process operations, e.g.
when a linked process crashes while we wait in a receiveSelectedMessage
via a call to interrupt.
tryUninterrupted :: (HasCallStack, Member Interrupts r) => Eff r a -> Eff r (Either (Interrupt Recoverable) a) Source #
Like handleInterrupts, but instead of passing the Interrupt
to a handler function, Either is returned.
Since: 0.13.2
exitOnInterrupt :: (HasCallStack, HasProcesses r q) => Eff r a -> Eff r a Source #
Handle Interrupts arising during process operations, e.g.
when a linked process crashes while we wait in a receiveSelectedMessage
via a call to interrupt.
logInterrupts :: forall r. (Member Logs r, HasCallStack, Member Interrupts r) => Eff r () -> Eff r () Source #
Handle interrupts by logging them with logProcessExit and otherwise
ignoring them.
provideInterrupts :: HasCallStack => Eff (Interrupts ': r) a -> Eff r (Either (Interrupt Recoverable) a) Source #
Handle Interrupts arising during process operations, e.g.
when a linked process crashes while we wait in a receiveSelectedMessage
via a call to interrupt.
mergeEitherInterruptAndExitReason :: Either (Interrupt Recoverable) (Interrupt NoRecovery) -> Interrupt NoRecovery Source #
Wrap all (left) Interrupts into interruptToExit and
return the (right) NoRecovery Interrupts as is.
Typed ProcessIds: Receiver
sendToReceiver :: (NFData o, HasProcesses r q) => Receiver o -> o -> Eff r () Source #
Constructors
| (NFData out, Typeable out, Show out) => Receiver | |
Fields
| |
Instances
| Contravariant Receiver Source # | |
| Eq (Receiver o) Source # | |
| Ord (Receiver o) Source # | |
Defined in Control.Eff.Concurrent.Process | |
| Typeable protocol => Show (Receiver protocol) Source # | |
| NFData (Receiver o) Source # | |
Defined in Control.Eff.Concurrent.Process | |