-- Hoogle documentation, generated by Haddock
-- See Hoogle, http://www.haskell.org/hoogle/
-- | A library for process pools coupled with asynchronous message queues
--
-- Please see the README on GitHub at
-- https://github.com/sheyll/rio-process-pool#readme
@package rio-process-pool
@version 1.0.0
-- | A broker extracts a key value from incoming messages and
-- creates, keeps and destroys a resource for each key.
--
-- The demultiplexed messages and their resources are passed to a custom
-- MessageHandler/
--
-- The user provides a Demultiplexer is a pure function that
-- returns a key for the resource associated to the message and
-- potientially changes the message.
--
-- The demultiplexer may also return a value indicating that a new
-- resource must be created, or that a message shall be ignored.
--
-- The broker is run in a seperate process using async. The usual
-- way to stop a broker is to cancel it.
--
-- When cancelling a broker, the resource cleanup actions for all
-- resources will be called with async exceptions masked.
--
-- In order to prevent the resource map filling up with dead
-- resources, the user of this module has to ensure that whenever a
-- resource is not required anymore, a message will be sent to the
-- broker, that will cause the MessageHandler to be executed for
-- the resource, which will in turn return, return RemoveResource.
module RIO.ProcessPool.Broker
-- | Spawn a broker with a new MessageBox, and return its message
-- Input channel as well as the Async handle of the spawned
-- process, needed to stop the broker process.
--
--
-- - k is the key for the resource associated to an
-- incoming message
-- - w' is the type of incoming messages.
-- - w is the type of the demultiplexed messages.
-- - a specifies the resource type.
-- - m is the base monad
--
spawnBroker :: forall brokerBoxArg k w' w a m. (HasLogFunc m, Ord k, Display k, IsMessageBoxArg brokerBoxArg) => brokerBoxArg -> BrokerConfig k w' w a m -> RIO m (Either SomeException (Input (MessageBox brokerBoxArg) w', Async BrokerResult))
-- | The broker configuration, used by spawnBroker.
--
--
-- - k is the key for the resource associated to an
-- incoming message
-- - w' is the type of incoming messages.
-- - w is the type of the demultiplexed messages.
-- - a specifies the resource type.
-- - m is the base monad
--
data BrokerConfig k w' w a m
MkBrokerConfig :: !Demultiplexer w' k w -> !MessageHandler k w a m -> !ResourceCreator k w a m -> !ResourceCleaner k a m -> BrokerConfig k w' w a m
[demultiplexer] :: BrokerConfig k w' w a m -> !Demultiplexer w' k w
[messageDispatcher] :: BrokerConfig k w' w a m -> !MessageHandler k w a m
[resourceCreator] :: BrokerConfig k w' w a m -> !ResourceCreator k w a m
[resourceCleaner] :: BrokerConfig k w' w a m -> !ResourceCleaner k a m
-- | This is just what the Async returned from spawnBroker
-- returns, it's current purpose is to make code easier to read.
--
-- Instead of some Async () that could be anything, there is
-- Async BrokerResult.
data BrokerResult
MkBrokerResult :: BrokerResult
-- | User supplied callback to create and initialize a resource. (Sync-)
-- Exceptions thrown from this function are caught, and the broker
-- continues.
--
--
-- - k is the key for the resource associated to an
-- incoming message
-- - w is the type of the demultiplexed messages.
-- - a specifies the resource type.
-- - m is the monad of the returned action.
--
type ResourceCreator k w a m = k -> Maybe w -> RIO m a
-- | User supplied callback to extract the key and the Multiplexed
-- from a message. (Sync-) Exceptions thrown from this function are
-- caught and lead to dropping of the incoming message, while the broker
-- continues.
--
--
-- - k is the key for the resource associated to an
-- incoming message
-- - w' is the type of incoming messages.
-- - w is the type of the demultiplexed messages.
--
type Demultiplexer w' k w = w' -> Multiplexed k w
-- | User supplied callback called _with exceptions masked_ when the
-- MessageHandler returns RemoveResource (Sync-) Exceptions
-- thrown from this function are caught, and do not prevent the removal
-- of the resource, also the broker continues.
--
--
-- - k is the key for the resource associated to an
-- incoming message
-- - a specifies the resource type.
-- - m is the monad of the returned action.
--
type ResourceCleaner k a m = k -> a -> RIO m ()
-- | User supplied callback to use the Multiplexed message and the
-- associated resource. (Sync-) Exceptions thrown from this function are
-- caught and lead to immediate cleanup of the resource but the broker
-- continues.
--
--
-- - Type k is the key for the resource associated to
-- an incoming message
-- - Type w is the type of incoming, demultiplexed,
-- messages.
-- - Type a specifies the resource type.
-- - Type m is the base monad
--
type MessageHandler k w a m = k -> w -> a -> RIO m (ResourceUpdate a)
-- | The action that the broker has to take for in incoming message.
--
--
-- - k is the key for the resource associated to an
-- incoming message
-- - w is the type of the demultiplexed messages.
--
data Multiplexed k w
-- | The message is an initialization message, that requires the creation
-- of a new resouce for the given key. When the resource is created, then
-- maybe additionally a message will also be dispatched.
Initialize :: k -> !Maybe w -> Multiplexed k w
-- | Dispatch a message using an existing resource. Silently ignore if no
-- resource for the key exists.
Dispatch :: k -> w -> Multiplexed k w
-- | This value indicates in what state a worker is in after the
-- MessageHandler action was executed.
data ResourceUpdate a
-- | The resources is still required.
KeepResource :: ResourceUpdate a
-- | The resource is still required but must be updated.
UpdateResource :: a -> ResourceUpdate a
-- | The resource is obsolete and can be removed from the broker. The
-- broker will call ResourceCleaner either on the current, or an
-- updated resource value.
RemoveResource :: !Maybe a -> ResourceUpdate a
instance GHC.Classes.Eq RIO.ProcessPool.Broker.BrokerResult
instance GHC.Show.Show RIO.ProcessPool.Broker.BrokerResult
-- | Launch- and Dispatch messages to processes.
--
-- A pool has an Input for Multiplexed messages, and
-- dispatches incoming messges to concurrent processes using user defined
-- MessageBoxes.
--
-- The pool starts and stops the processes and creates the message boxes.
--
-- The user supplied PoolWorkerCallback usually runs a loop that
-- receives messages from the MessageBox created
-- by the pool for that worker.
--
-- When a worker process dies, e.g. because the PoolWorkerCallback
-- returns, the pool process will also cancel the process (just to
-- make sure...) and cleanup the internal Broker.
module RIO.ProcessPool.Pool
-- | A record containing the message box Input of the
-- Broker and the Async value required to cancel
-- the pools broker process.
data Pool poolBox k w
MkPool :: !Input (MessageBox poolBox) (Multiplexed k (Maybe w)) -> !Async BrokerResult -> Pool poolBox k w
-- | Message sent to this input are dispatched to workers. If the message
-- is an Initialize message, a new async process will be
-- started. If the message value is Nothing, the processes is
-- killed.
[poolInput] :: Pool poolBox k w -> !Input (MessageBox poolBox) (Multiplexed k (Maybe w))
-- | The async of the internal Broker.
[poolAsync] :: Pool poolBox k w -> !Async BrokerResult
-- | Start a Pool.
--
-- Start a process that receives messages sent to the poolInput
-- and dispatches them to the Input of pool member
-- processes. If necessary the pool worker processes are started.
--
-- Each pool worker process is started using async and executes
-- the PoolWorkerCallback.
--
-- When the callback returns, the process will exit.
--
-- Internally the pool uses the async function to wrap the
-- callback.
--
-- When a Multiplixed Dispatch message is received with a
-- Nothing then the worker is cancelled and the
-- worker is removed from the map.
--
-- Such a message is automatically sent after the
-- PoolWorkerCallback has returned, even when an exception was
-- thrown. See finally.
spawnPool :: forall k w poolBox workerBox m. (IsMessageBoxArg poolBox, IsMessageBoxArg workerBox, Ord k, Display k, HasLogFunc m) => poolBox -> workerBox -> PoolWorkerCallback workerBox w k m -> RIO m (Either SomeException (Pool poolBox k w))
-- | The function that processes a MessageBox of a worker for a
-- specific key.
newtype PoolWorkerCallback workerBox w k m
MkPoolWorkerCallback :: (k -> MessageBox workerBox w -> RIO m ()) -> PoolWorkerCallback workerBox w k m
[runPoolWorkerCallback] :: PoolWorkerCallback workerBox w k m -> k -> MessageBox workerBox w -> RIO m ()
-- | This message will cancel the worker with the given key. If the
-- PoolWorkerCallback wants to do cleanup it should use
-- finally or onException.
removePoolWorkerMessage :: k -> Multiplexed k (Maybe w)
-- | Launch- and Dispatch messages to processes.
--
-- A pool has an Input for Multiplexed messages, and
-- dispatches incoming messges to concurrent processes using user defined
-- MessageBoxes.
--
-- The pool starts and stops the processes and creates the message boxes.
--
-- The user supplied PoolWorkerCallback usually runs a loop that
-- receives messages from the MessageBox created
-- by the pool for that worker.
--
-- When a worker process dies, e.g. because the PoolWorkerCallback
-- returns, the pool process will also cancel the process (just
-- to make sure...) and cleanup the internal Broker.
module RIO.ProcessPool
-- | User supplied callback called _with exceptions masked_ when the
-- MessageHandler returns RemoveResource (Sync-) Exceptions
-- thrown from this function are caught, and do not prevent the removal
-- of the resource, also the broker continues.
--
--
-- - k is the key for the resource associated to an
-- incoming message
-- - a specifies the resource type.
-- - m is the monad of the returned action.
--
type ResourceCleaner k a m = k -> a -> RIO m ()
-- | User supplied callback to create and initialize a resource. (Sync-)
-- Exceptions thrown from this function are caught, and the broker
-- continues.
--
--
-- - k is the key for the resource associated to an
-- incoming message
-- - w is the type of the demultiplexed messages.
-- - a specifies the resource type.
-- - m is the monad of the returned action.
--
type ResourceCreator k w a m = k -> Maybe w -> RIO m a
-- | The action that the broker has to take for in incoming message.
--
--
-- - k is the key for the resource associated to an
-- incoming message
-- - w is the type of the demultiplexed messages.
--
data Multiplexed k w
-- | The message is an initialization message, that requires the creation
-- of a new resouce for the given key. When the resource is created, then
-- maybe additionally a message will also be dispatched.
Initialize :: k -> !Maybe w -> Multiplexed k w
-- | Dispatch a message using an existing resource. Silently ignore if no
-- resource for the key exists.
Dispatch :: k -> w -> Multiplexed k w
-- | This value indicates in what state a worker is in after the
-- MessageHandler action was executed.
data ResourceUpdate a
-- | The resources is still required.
KeepResource :: ResourceUpdate a
-- | The resource is still required but must be updated.
UpdateResource :: a -> ResourceUpdate a
-- | The resource is obsolete and can be removed from the broker. The
-- broker will call ResourceCleaner either on the current, or an
-- updated resource value.
RemoveResource :: !Maybe a -> ResourceUpdate a
-- | User supplied callback to use the Multiplexed message and the
-- associated resource. (Sync-) Exceptions thrown from this function are
-- caught and lead to immediate cleanup of the resource but the broker
-- continues.
--
--
-- - Type k is the key for the resource associated to
-- an incoming message
-- - Type w is the type of incoming, demultiplexed,
-- messages.
-- - Type a specifies the resource type.
-- - Type m is the base monad
--
type MessageHandler k w a m = k -> w -> a -> RIO m (ResourceUpdate a)
-- | User supplied callback to extract the key and the Multiplexed
-- from a message. (Sync-) Exceptions thrown from this function are
-- caught and lead to dropping of the incoming message, while the broker
-- continues.
--
--
-- - k is the key for the resource associated to an
-- incoming message
-- - w' is the type of incoming messages.
-- - w is the type of the demultiplexed messages.
--
type Demultiplexer w' k w = w' -> Multiplexed k w
-- | The broker configuration, used by spawnBroker.
--
--
-- - k is the key for the resource associated to an
-- incoming message
-- - w' is the type of incoming messages.
-- - w is the type of the demultiplexed messages.
-- - a specifies the resource type.
-- - m is the base monad
--
data BrokerConfig k w' w a m
MkBrokerConfig :: !Demultiplexer w' k w -> !MessageHandler k w a m -> !ResourceCreator k w a m -> !ResourceCleaner k a m -> BrokerConfig k w' w a m
[demultiplexer] :: BrokerConfig k w' w a m -> !Demultiplexer w' k w
[messageDispatcher] :: BrokerConfig k w' w a m -> !MessageHandler k w a m
[resourceCreator] :: BrokerConfig k w' w a m -> !ResourceCreator k w a m
[resourceCleaner] :: BrokerConfig k w' w a m -> !ResourceCleaner k a m
-- | This is just what the Async returned from spawnBroker
-- returns, it's current purpose is to make code easier to read.
--
-- Instead of some Async () that could be anything, there is
-- Async BrokerResult.
data BrokerResult
MkBrokerResult :: BrokerResult
-- | Spawn a broker with a new MessageBox, and return its message
-- Input channel as well as the Async handle of the spawned
-- process, needed to stop the broker process.
--
--
-- - k is the key for the resource associated to an
-- incoming message
-- - w' is the type of incoming messages.
-- - w is the type of the demultiplexed messages.
-- - a specifies the resource type.
-- - m is the base monad
--
spawnBroker :: forall brokerBoxArg k w' w a m. (HasLogFunc m, Ord k, Display k, IsMessageBoxArg brokerBoxArg) => brokerBoxArg -> BrokerConfig k w' w a m -> RIO m (Either SomeException (Input (MessageBox brokerBoxArg) w', Async BrokerResult))
-- | A record containing the message box Input of the
-- Broker and the Async value required to cancel
-- the pools broker process.
data Pool poolBox k w
MkPool :: !Input (MessageBox poolBox) (Multiplexed k (Maybe w)) -> !Async BrokerResult -> Pool poolBox k w
-- | Message sent to this input are dispatched to workers. If the message
-- is an Initialize message, a new async process will be
-- started. If the message value is Nothing, the processes is
-- killed.
[poolInput] :: Pool poolBox k w -> !Input (MessageBox poolBox) (Multiplexed k (Maybe w))
-- | The async of the internal Broker.
[poolAsync] :: Pool poolBox k w -> !Async BrokerResult
-- | The function that processes a MessageBox of a worker for a
-- specific key.
newtype PoolWorkerCallback workerBox w k m
MkPoolWorkerCallback :: (k -> MessageBox workerBox w -> RIO m ()) -> PoolWorkerCallback workerBox w k m
[runPoolWorkerCallback] :: PoolWorkerCallback workerBox w k m -> k -> MessageBox workerBox w -> RIO m ()
-- | Start a Pool.
--
-- Start a process that receives messages sent to the poolInput
-- and dispatches them to the Input of pool member
-- processes. If necessary the pool worker processes are started.
--
-- Each pool worker process is started using async and executes
-- the PoolWorkerCallback.
--
-- When the callback returns, the process will exit.
--
-- Internally the pool uses the async function to wrap the
-- callback.
--
-- When a Multiplixed Dispatch message is received with a
-- Nothing then the worker is cancelled and the
-- worker is removed from the map.
--
-- Such a message is automatically sent after the
-- PoolWorkerCallback has returned, even when an exception was
-- thrown. See finally.
spawnPool :: forall k w poolBox workerBox m. (IsMessageBoxArg poolBox, IsMessageBoxArg workerBox, Ord k, Display k, HasLogFunc m) => poolBox -> workerBox -> PoolWorkerCallback workerBox w k m -> RIO m (Either SomeException (Pool poolBox k w))
-- | This message will cancel the worker with the given key. If the
-- PoolWorkerCallback wants to do cleanup it should use
-- finally or onException.
removePoolWorkerMessage :: k -> Multiplexed k (Maybe w)