| Safe Haskell | None |
|---|---|
| Language | Haskell2010 |
RIO.ProcessPool
Description
Launch- and Dispatch messages to processes.
A pool has an Input for Multiplexed messages,
and dispatches incoming messges to concurrent
processes using user defined .MessageBoxes
The pool starts and stops the processes and creates the message boxes.
The user supplied PoolWorkerCallback
usually runs a loop that messages
from the receivesMessageBox created by the pool for that worker.
When a worker process dies, e.g. because the
PoolWorkerCallback returns, the pool
process will also cancel the process (just to make sure...)
and cleanup the internal Broker.
Synopsis
- type ResourceCleaner k a m = k -> a -> RIO m ()
- type ResourceCreator k w a m = k -> Maybe w -> RIO m a
- data Multiplexed k w
- = Initialize k !(Maybe w)
- | Dispatch k w
- data ResourceUpdate a
- = KeepResource
- | UpdateResource a
- | RemoveResource !(Maybe a)
- type MessageHandler k w a m = k -> w -> a -> RIO m (ResourceUpdate a)
- type Demultiplexer w' k w = w' -> Multiplexed k w
- data BrokerConfig k w' w a m = MkBrokerConfig {
- demultiplexer :: !(Demultiplexer w' k w)
- messageDispatcher :: !(MessageHandler k w a m)
- resourceCreator :: !(ResourceCreator k w a m)
- resourceCleaner :: !(ResourceCleaner k a m)
- data BrokerResult = MkBrokerResult
- spawnBroker :: forall brokerBoxArg k w' w a m. (HasLogFunc m, Ord k, Display k, IsMessageBoxArg brokerBoxArg) => brokerBoxArg -> BrokerConfig k w' w a m -> RIO m (Either SomeException (Input (MessageBox brokerBoxArg) w', Async BrokerResult))
- data Pool poolBox k w = MkPool {
- poolInput :: !(Input (MessageBox poolBox) (Multiplexed k (Maybe w)))
- poolAsync :: !(Async BrokerResult)
- newtype PoolWorkerCallback workerBox w k m = MkPoolWorkerCallback {
- runPoolWorkerCallback :: k -> MessageBox workerBox w -> RIO m ()
- spawnPool :: forall k w poolBox workerBox m. (IsMessageBoxArg poolBox, IsMessageBoxArg workerBox, Ord k, Display k, HasLogFunc m) => poolBox -> workerBox -> PoolWorkerCallback workerBox w k m -> RIO m (Either SomeException (Pool poolBox k w))
- removePoolWorkerMessage :: k -> Multiplexed k (Maybe w)
- module UnliftIO.MessageBox
Documentation
A process that receives messages and dispatches them to a callback. Each message must contain a key that identifies a resource. That resource is created and cleaned by user supplied callback functions.
type ResourceCleaner k a m = k -> a -> RIO m () Source #
User supplied callback called _with exceptions masked_
when the MessageHandler returns RemoveResource
(Sync-) Exceptions thrown from this function are caught,
and do not prevent the removal of the resource, also the
broker continues.
kis the key for the resource associated to an incoming messageaspecifies the resource type.mis the monad of the returned action.
type ResourceCreator k w a m = k -> Maybe w -> RIO m a Source #
User supplied callback to create and initialize a resource. (Sync-) Exceptions thrown from this function are caught, and the broker continues.
kis the key for the resource associated to an incoming messagewis the type of the demultiplexed messages.aspecifies the resource type.mis the monad of the returned action.
data Multiplexed k w Source #
The action that the broker has to take for in incoming message.
kis the key for the resource associated to an incoming messagewis the type of the demultiplexed messages.
Constructors
| Initialize k !(Maybe w) | The message is an initialization message, that requires the creation of a new resouce for the given key. When the resource is created, then maybe additionally a message will also be dispatched. |
| Dispatch k w | Dispatch a message using an existing resource. Silently ignore if no resource for the key exists. |
data ResourceUpdate a Source #
This value indicates in what state a worker is in after the
MessageHandler action was executed.
Constructors
| KeepResource | The resources is still required. |
| UpdateResource a | The resource is still required but must be updated. |
| RemoveResource !(Maybe a) | The resource is obsolete and can
be removed from the broker.
The broker will call |
type MessageHandler k w a m = k -> w -> a -> RIO m (ResourceUpdate a) Source #
User supplied callback to use the Multiplexed message and
the associated resource.
(Sync-) Exceptions thrown from this function are caught and lead
to immediate cleanup of the resource but the broker continues.
- Type
kis the key for the resource associated to an incoming message - Type
wis the type of incoming, demultiplexed, messages. - Type
aspecifies the resource type. - Type
mis the base monad
type Demultiplexer w' k w = w' -> Multiplexed k w Source #
User supplied callback to extract the key and the Multiplexed
from a message.
(Sync-) Exceptions thrown from this function are caught and lead
to dropping of the incoming message, while the broker continues.
kis the key for the resource associated to an incoming messagew'is the type of incoming messages.wis the type of the demultiplexed messages.
data BrokerConfig k w' w a m Source #
The broker configuration, used by spawnBroker.
kis the key for the resource associated to an incoming messagew'is the type of incoming messages.wis the type of the demultiplexed messages.aspecifies the resource type.mis the base monad
Constructors
| MkBrokerConfig | |
Fields
| |
data BrokerResult Source #
This is just what the Async returned from
spawnBroker returns, it's current purpose is to
make code easier to read.
Instead of some Async () that could be anything,
there is Async BrokerResult.
Constructors
| MkBrokerResult |
Instances
| Eq BrokerResult Source # | |
Defined in RIO.ProcessPool.Broker | |
| Show BrokerResult Source # | |
Defined in RIO.ProcessPool.Broker Methods showsPrec :: Int -> BrokerResult -> ShowS # show :: BrokerResult -> String # showList :: [BrokerResult] -> ShowS # | |
spawnBroker :: forall brokerBoxArg k w' w a m. (HasLogFunc m, Ord k, Display k, IsMessageBoxArg brokerBoxArg) => brokerBoxArg -> BrokerConfig k w' w a m -> RIO m (Either SomeException (Input (MessageBox brokerBoxArg) w', Async BrokerResult)) Source #
Spawn a broker with a new MessageBox,
and return its message Input channel as well as
the Async handle of the spawned process, needed to
stop the broker process.
kis the key for the resource associated to an incoming messagew'is the type of incoming messages.wis the type of the demultiplexed messages.aspecifies the resource type.mis the base monad
A process that receives messages and dispatches them to
other processes.
Building directly on RIO.ProcessPool.Broker, it provides
a central message box Input, from which messages are
are delivered to the corresponding message box Inputs.
data Pool poolBox k w Source #
A record containing the message box Input of the
Broker and the Async value required to cancel
the pools broker process.
Constructors
| MkPool | |
Fields
| |
newtype PoolWorkerCallback workerBox w k m Source #
The function that processes a
MessageBox of a worker for a specific key.
Constructors
| MkPoolWorkerCallback | |
Fields
| |
spawnPool :: forall k w poolBox workerBox m. (IsMessageBoxArg poolBox, IsMessageBoxArg workerBox, Ord k, Display k, HasLogFunc m) => poolBox -> workerBox -> PoolWorkerCallback workerBox w k m -> RIO m (Either SomeException (Pool poolBox k w)) Source #
Start a Pool.
Start a process that receives messages sent to the
poolInput and dispatches them to the Input of
pool member processes. If necessary the
pool worker processes are started.
Each pool worker process is started using async and
executes the PoolWorkerCallback.
When the callback returns, the process will exit.
Internally the pool uses the async function to wrap
the callback.
When a Multiplixed Dispatch message is received with
a Nothing then the worker is and the
worker is removed from the map.cancelled
Such a message is automatically sent after the PoolWorkerCallback
has returned, even when an exception was thrown. See
finally.
removePoolWorkerMessage :: k -> Multiplexed k (Maybe w) Source #
This message will cancel the worker
with the given key.
If the PoolWorkerCallback wants to do cleanup
it should use finally or onException.
Re-export.
module UnliftIO.MessageBox