Safe Haskell | None |
---|---|
Language | Haskell2010 |
Launch- and Dispatch messages to processes.
A pool has an Input
for Multiplexed
messages,
and dispatches incoming messges to concurrent
processes using user defined
.MessageBox
es
The pool starts and stops the processes and creates the message boxes.
The user supplied PoolWorkerCallback
usually runs a loop that
messages
from the receive
sMessageBox
created by the pool for that worker.
When a worker process dies, e.g. because the
PoolWorkerCallback
returns, the pool
process will also cancel
the process (just to make sure...)
and cleanup the internal Broker
.
Synopsis
- type ResourceCleaner k a m = k -> a -> RIO m ()
- type ResourceCreator k w a m = k -> Maybe w -> RIO m a
- data Multiplexed k w
- = Initialize k !(Maybe w)
- | Dispatch k w
- data ResourceUpdate a
- = KeepResource
- | UpdateResource a
- | RemoveResource !(Maybe a)
- type MessageHandler k w a m = k -> w -> a -> RIO m (ResourceUpdate a)
- type Demultiplexer w' k w = w' -> Multiplexed k w
- data BrokerConfig k w' w a m = MkBrokerConfig {
- demultiplexer :: !(Demultiplexer w' k w)
- messageDispatcher :: !(MessageHandler k w a m)
- resourceCreator :: !(ResourceCreator k w a m)
- resourceCleaner :: !(ResourceCleaner k a m)
- data BrokerResult = MkBrokerResult
- spawnBroker :: forall brokerBoxArg k w' w a m. (HasLogFunc m, Ord k, Display k, IsMessageBoxArg brokerBoxArg) => brokerBoxArg -> BrokerConfig k w' w a m -> RIO m (Either SomeException (Input (MessageBox brokerBoxArg) w', Async BrokerResult))
- data Pool poolBox k w = MkPool {
- poolInput :: !(Input (MessageBox poolBox) (Multiplexed k (Maybe w)))
- poolAsync :: !(Async BrokerResult)
- newtype PoolWorkerCallback workerBox w k m = MkPoolWorkerCallback {
- runPoolWorkerCallback :: k -> MessageBox workerBox w -> RIO m ()
- spawnPool :: forall k w poolBox workerBox m. (IsMessageBoxArg poolBox, IsMessageBoxArg workerBox, Ord k, Display k, HasLogFunc m) => poolBox -> workerBox -> PoolWorkerCallback workerBox w k m -> RIO m (Either SomeException (Pool poolBox k w))
- removePoolWorkerMessage :: k -> Multiplexed k (Maybe w)
- module UnliftIO.MessageBox
Documentation
A process that receives messages and dispatches them to a callback. Each message must contain a key that identifies a resource. That resource is created and cleaned by user supplied callback functions.
type ResourceCleaner k a m = k -> a -> RIO m () Source #
User supplied callback called _with exceptions masked_
when the MessageHandler
returns RemoveResource
(Sync-) Exceptions thrown from this function are caught,
and do not prevent the removal of the resource, also the
broker continues.
k
is the key for the resource associated to an incoming messagea
specifies the resource type.m
is the monad of the returned action.
type ResourceCreator k w a m = k -> Maybe w -> RIO m a Source #
User supplied callback to create and initialize a resource. (Sync-) Exceptions thrown from this function are caught, and the broker continues.
k
is the key for the resource associated to an incoming messagew
is the type of the demultiplexed messages.a
specifies the resource type.m
is the monad of the returned action.
data Multiplexed k w Source #
The action that the broker has to take for in incoming message.
k
is the key for the resource associated to an incoming messagew
is the type of the demultiplexed messages.
Initialize k !(Maybe w) | The message is an initialization message, that requires the creation of a new resouce for the given key. When the resource is created, then maybe additionally a message will also be dispatched. |
Dispatch k w | Dispatch a message using an existing resource. Silently ignore if no resource for the key exists. |
data ResourceUpdate a Source #
This value indicates in what state a worker is in after the
MessageHandler
action was executed.
KeepResource | The resources is still required. |
UpdateResource a | The resource is still required but must be updated. |
RemoveResource !(Maybe a) | The resource is obsolete and can
be removed from the broker.
The broker will call |
type MessageHandler k w a m = k -> w -> a -> RIO m (ResourceUpdate a) Source #
User supplied callback to use the Multiplexed
message and
the associated resource.
(Sync-) Exceptions thrown from this function are caught and lead
to immediate cleanup of the resource but the broker continues.
- Type
k
is the key for the resource associated to an incoming message - Type
w
is the type of incoming, demultiplexed, messages. - Type
a
specifies the resource type. - Type
m
is the base monad
type Demultiplexer w' k w = w' -> Multiplexed k w Source #
User supplied callback to extract the key and the Multiplexed
from a message.
(Sync-) Exceptions thrown from this function are caught and lead
to dropping of the incoming message, while the broker continues.
k
is the key for the resource associated to an incoming messagew'
is the type of incoming messages.w
is the type of the demultiplexed messages.
data BrokerConfig k w' w a m Source #
The broker configuration, used by spawnBroker
.
k
is the key for the resource associated to an incoming messagew'
is the type of incoming messages.w
is the type of the demultiplexed messages.a
specifies the resource type.m
is the base monad
MkBrokerConfig | |
|
data BrokerResult Source #
This is just what the Async
returned from
spawnBroker
returns, it's current purpose is to
make code easier to read.
Instead of some Async ()
that could be anything,
there is Async BrokerResult
.
Instances
Eq BrokerResult Source # | |
Defined in RIO.ProcessPool.Broker (==) :: BrokerResult -> BrokerResult -> Bool # (/=) :: BrokerResult -> BrokerResult -> Bool # | |
Show BrokerResult Source # | |
Defined in RIO.ProcessPool.Broker showsPrec :: Int -> BrokerResult -> ShowS # show :: BrokerResult -> String # showList :: [BrokerResult] -> ShowS # |
spawnBroker :: forall brokerBoxArg k w' w a m. (HasLogFunc m, Ord k, Display k, IsMessageBoxArg brokerBoxArg) => brokerBoxArg -> BrokerConfig k w' w a m -> RIO m (Either SomeException (Input (MessageBox brokerBoxArg) w', Async BrokerResult)) Source #
Spawn a broker with a new MessageBox
,
and return its message Input
channel as well as
the Async
handle of the spawned process, needed to
stop the broker process.
k
is the key for the resource associated to an incoming messagew'
is the type of incoming messages.w
is the type of the demultiplexed messages.a
specifies the resource type.m
is the base monad
A process that receives messages and dispatches them to
other processes.
Building directly on RIO.ProcessPool.Broker, it provides
a central message box Input
, from which messages are
are delivered to the corresponding message box Input
s.
data Pool poolBox k w Source #
A record containing the message box Input
of the
Broker
and the Async
value required to cancel
the pools broker process.
MkPool | |
|
newtype PoolWorkerCallback workerBox w k m Source #
The function that processes a
MessageBox
of a worker for a specific key.
MkPoolWorkerCallback | |
|
spawnPool :: forall k w poolBox workerBox m. (IsMessageBoxArg poolBox, IsMessageBoxArg workerBox, Ord k, Display k, HasLogFunc m) => poolBox -> workerBox -> PoolWorkerCallback workerBox w k m -> RIO m (Either SomeException (Pool poolBox k w)) Source #
Start a Pool
.
Start a process that receives messages sent to the
poolInput
and dispatches them to the Input
of
pool member processes. If necessary the
pool worker processes are started.
Each pool worker process is started using async
and
executes the PoolWorkerCallback
.
When the callback returns, the process will exit.
Internally the pool uses the async
function to wrap
the callback.
When a Multiplixed
Dispatch
message is received with
a Nothing
then the worker is
and the
worker is removed from the map.cancel
led
Such a message is automatically sent after the PoolWorkerCallback
has returned, even when an exception was thrown. See
finally
.
removePoolWorkerMessage :: k -> Multiplexed k (Maybe w) Source #
This message will cancel
the worker
with the given key.
If the PoolWorkerCallback
wants to do cleanup
it should use finally
or onException
.
Re-export.
module UnliftIO.MessageBox