-- Hoogle documentation, generated by Haddock -- See Hoogle, http://www.haskell.org/hoogle/ -- | A library for process pools coupled with asynchronous message queues -- -- Please see the README on GitHub at -- https://github.com/sheyll/rio-process-pool#readme @package rio-process-pool @version 1.0.1 -- | A broker extracts a key value from incoming messages and -- creates, keeps and destroys a resource for each key. -- -- The demultiplexed messages and their resources are passed to a custom -- MessageHandler/ -- -- The user provides a Demultiplexer is a pure function that -- returns a key for the resource associated to the message and -- potientially changes the message. -- -- The demultiplexer may also return a value indicating that a new -- resource must be created, or that a message shall be ignored. -- -- The broker is run in a seperate process using async. The usual -- way to stop a broker is to cancel it. -- -- When cancelling a broker, the resource cleanup actions for all -- resources will be called with async exceptions masked. -- -- In order to prevent the resource map filling up with dead -- resources, the user of this module has to ensure that whenever a -- resource is not required anymore, a message will be sent to the -- broker, that will cause the MessageHandler to be executed for -- the resource, which will in turn return, return RemoveResource. module RIO.ProcessPool.Broker -- | Spawn a broker with a new MessageBox, and return its message -- Input channel as well as the Async handle of the spawned -- process, needed to stop the broker process. -- -- spawnBroker :: forall brokerBoxArg k w' w a m. (HasLogFunc m, Ord k, Display k, IsMessageBoxArg brokerBoxArg) => brokerBoxArg -> BrokerConfig k w' w a m -> RIO m (Either SomeException (Input (MessageBox brokerBoxArg) w', Async BrokerResult)) -- | The broker configuration, used by spawnBroker. -- -- data BrokerConfig k w' w a m MkBrokerConfig :: !Demultiplexer w' k w -> !MessageHandler k w a m -> !ResourceCreator k w a m -> !ResourceCleaner k a m -> BrokerConfig k w' w a m [demultiplexer] :: BrokerConfig k w' w a m -> !Demultiplexer w' k w [messageDispatcher] :: BrokerConfig k w' w a m -> !MessageHandler k w a m [resourceCreator] :: BrokerConfig k w' w a m -> !ResourceCreator k w a m [resourceCleaner] :: BrokerConfig k w' w a m -> !ResourceCleaner k a m -- | This is just what the Async returned from spawnBroker -- returns, it's current purpose is to make code easier to read. -- -- Instead of some Async () that could be anything, there is -- Async BrokerResult. data BrokerResult MkBrokerResult :: BrokerResult -- | User supplied callback to create and initialize a resource. (Sync-) -- Exceptions thrown from this function are caught, and the broker -- continues. -- -- type ResourceCreator k w a m = k -> Maybe w -> RIO m a -- | User supplied callback to extract the key and the Multiplexed -- from a message. (Sync-) Exceptions thrown from this function are -- caught and lead to dropping of the incoming message, while the broker -- continues. -- -- type Demultiplexer w' k w = w' -> Multiplexed k w -- | User supplied callback called _with exceptions masked_ when the -- MessageHandler returns RemoveResource (Sync-) Exceptions -- thrown from this function are caught, and do not prevent the removal -- of the resource, also the broker continues. -- -- type ResourceCleaner k a m = k -> a -> RIO m () -- | User supplied callback to use the Multiplexed message and the -- associated resource. (Sync-) Exceptions thrown from this function are -- caught and lead to immediate cleanup of the resource but the broker -- continues. -- -- type MessageHandler k w a m = k -> w -> a -> RIO m (ResourceUpdate a) -- | The action that the broker has to take for in incoming message. -- -- data Multiplexed k w -- | The message is an initialization message, that requires the creation -- of a new resouce for the given key. When the resource is created, then -- maybe additionally a message will also be dispatched. Initialize :: k -> !Maybe w -> Multiplexed k w -- | Dispatch a message using an existing resource. Silently ignore if no -- resource for the key exists. Dispatch :: k -> w -> Multiplexed k w -- | This value indicates in what state a worker is in after the -- MessageHandler action was executed. data ResourceUpdate a -- | The resources is still required. KeepResource :: ResourceUpdate a -- | The resource is still required but must be updated. UpdateResource :: a -> ResourceUpdate a -- | The resource is obsolete and can be removed from the broker. The -- broker will call ResourceCleaner either on the current, or an -- updated resource value. RemoveResource :: !Maybe a -> ResourceUpdate a instance GHC.Classes.Eq RIO.ProcessPool.Broker.BrokerResult instance GHC.Show.Show RIO.ProcessPool.Broker.BrokerResult -- | Launch- and Dispatch messages to processes. -- -- A pool has an Input for Multiplexed messages, and -- dispatches incoming messges to concurrent processes using user defined -- MessageBoxes. -- -- The pool starts and stops the processes and creates the message boxes. -- -- The user supplied PoolWorkerCallback usually runs a loop that -- receives messages from the MessageBox created -- by the pool for that worker. -- -- When a worker process dies, e.g. because the PoolWorkerCallback -- returns, the pool process will also cancel the process (just to -- make sure...) and cleanup the internal Broker. module RIO.ProcessPool.Pool -- | A record containing the message box Input of the -- Broker and the Async value required to cancel -- the pools broker process. data Pool poolBox k w MkPool :: !Input (MessageBox poolBox) (Multiplexed k (Maybe w)) -> !Async BrokerResult -> Pool poolBox k w -- | Message sent to this input are dispatched to workers. If the message -- is an Initialize message, a new async process will be -- started. If the message value is Nothing, the processes is -- killed. [poolInput] :: Pool poolBox k w -> !Input (MessageBox poolBox) (Multiplexed k (Maybe w)) -- | The async of the internal Broker. [poolAsync] :: Pool poolBox k w -> !Async BrokerResult -- | Start a Pool. -- -- Start a process that receives messages sent to the poolInput -- and dispatches them to the Input of pool member -- processes. If necessary the pool worker processes are started. -- -- Each pool worker process is started using async and executes -- the PoolWorkerCallback. -- -- When the callback returns, the process will exit. -- -- Internally the pool uses the async function to wrap the -- callback. -- -- When a Multiplixed Dispatch message is received with a -- Nothing then the worker is cancelled and the -- worker is removed from the map. -- -- Such a message is automatically sent after the -- PoolWorkerCallback has returned, even when an exception was -- thrown. See finally. spawnPool :: forall k w poolBox workerBox m. (IsMessageBoxArg poolBox, IsMessageBoxArg workerBox, Ord k, Display k, HasLogFunc m) => poolBox -> workerBox -> PoolWorkerCallback workerBox w k m -> RIO m (Either SomeException (Pool poolBox k w)) -- | The function that processes a MessageBox of a worker for a -- specific key. newtype PoolWorkerCallback workerBox w k m MkPoolWorkerCallback :: (k -> MessageBox workerBox w -> RIO m ()) -> PoolWorkerCallback workerBox w k m [runPoolWorkerCallback] :: PoolWorkerCallback workerBox w k m -> k -> MessageBox workerBox w -> RIO m () -- | This message will cancel the worker with the given key. If the -- PoolWorkerCallback wants to do cleanup it should use -- finally or onException. removePoolWorkerMessage :: k -> Multiplexed k (Maybe w) -- | Launch- and Dispatch messages to processes. -- -- A pool has an Input for Multiplexed messages, and -- dispatches incoming messges to concurrent processes using user defined -- MessageBoxes. -- -- The pool starts and stops the processes and creates the message boxes. -- -- The user supplied PoolWorkerCallback usually runs a loop that -- receives messages from the MessageBox created -- by the pool for that worker. -- -- When a worker process dies, e.g. because the PoolWorkerCallback -- returns, the pool process will also cancel the process (just -- to make sure...) and cleanup the internal Broker. module RIO.ProcessPool -- | User supplied callback called _with exceptions masked_ when the -- MessageHandler returns RemoveResource (Sync-) Exceptions -- thrown from this function are caught, and do not prevent the removal -- of the resource, also the broker continues. -- -- type ResourceCleaner k a m = k -> a -> RIO m () -- | User supplied callback to create and initialize a resource. (Sync-) -- Exceptions thrown from this function are caught, and the broker -- continues. -- -- type ResourceCreator k w a m = k -> Maybe w -> RIO m a -- | The action that the broker has to take for in incoming message. -- -- data Multiplexed k w -- | The message is an initialization message, that requires the creation -- of a new resouce for the given key. When the resource is created, then -- maybe additionally a message will also be dispatched. Initialize :: k -> !Maybe w -> Multiplexed k w -- | Dispatch a message using an existing resource. Silently ignore if no -- resource for the key exists. Dispatch :: k -> w -> Multiplexed k w -- | This value indicates in what state a worker is in after the -- MessageHandler action was executed. data ResourceUpdate a -- | The resources is still required. KeepResource :: ResourceUpdate a -- | The resource is still required but must be updated. UpdateResource :: a -> ResourceUpdate a -- | The resource is obsolete and can be removed from the broker. The -- broker will call ResourceCleaner either on the current, or an -- updated resource value. RemoveResource :: !Maybe a -> ResourceUpdate a -- | User supplied callback to use the Multiplexed message and the -- associated resource. (Sync-) Exceptions thrown from this function are -- caught and lead to immediate cleanup of the resource but the broker -- continues. -- -- type MessageHandler k w a m = k -> w -> a -> RIO m (ResourceUpdate a) -- | User supplied callback to extract the key and the Multiplexed -- from a message. (Sync-) Exceptions thrown from this function are -- caught and lead to dropping of the incoming message, while the broker -- continues. -- -- type Demultiplexer w' k w = w' -> Multiplexed k w -- | The broker configuration, used by spawnBroker. -- -- data BrokerConfig k w' w a m MkBrokerConfig :: !Demultiplexer w' k w -> !MessageHandler k w a m -> !ResourceCreator k w a m -> !ResourceCleaner k a m -> BrokerConfig k w' w a m [demultiplexer] :: BrokerConfig k w' w a m -> !Demultiplexer w' k w [messageDispatcher] :: BrokerConfig k w' w a m -> !MessageHandler k w a m [resourceCreator] :: BrokerConfig k w' w a m -> !ResourceCreator k w a m [resourceCleaner] :: BrokerConfig k w' w a m -> !ResourceCleaner k a m -- | This is just what the Async returned from spawnBroker -- returns, it's current purpose is to make code easier to read. -- -- Instead of some Async () that could be anything, there is -- Async BrokerResult. data BrokerResult MkBrokerResult :: BrokerResult -- | Spawn a broker with a new MessageBox, and return its message -- Input channel as well as the Async handle of the spawned -- process, needed to stop the broker process. -- -- spawnBroker :: forall brokerBoxArg k w' w a m. (HasLogFunc m, Ord k, Display k, IsMessageBoxArg brokerBoxArg) => brokerBoxArg -> BrokerConfig k w' w a m -> RIO m (Either SomeException (Input (MessageBox brokerBoxArg) w', Async BrokerResult)) -- | A record containing the message box Input of the -- Broker and the Async value required to cancel -- the pools broker process. data Pool poolBox k w MkPool :: !Input (MessageBox poolBox) (Multiplexed k (Maybe w)) -> !Async BrokerResult -> Pool poolBox k w -- | Message sent to this input are dispatched to workers. If the message -- is an Initialize message, a new async process will be -- started. If the message value is Nothing, the processes is -- killed. [poolInput] :: Pool poolBox k w -> !Input (MessageBox poolBox) (Multiplexed k (Maybe w)) -- | The async of the internal Broker. [poolAsync] :: Pool poolBox k w -> !Async BrokerResult -- | The function that processes a MessageBox of a worker for a -- specific key. newtype PoolWorkerCallback workerBox w k m MkPoolWorkerCallback :: (k -> MessageBox workerBox w -> RIO m ()) -> PoolWorkerCallback workerBox w k m [runPoolWorkerCallback] :: PoolWorkerCallback workerBox w k m -> k -> MessageBox workerBox w -> RIO m () -- | Start a Pool. -- -- Start a process that receives messages sent to the poolInput -- and dispatches them to the Input of pool member -- processes. If necessary the pool worker processes are started. -- -- Each pool worker process is started using async and executes -- the PoolWorkerCallback. -- -- When the callback returns, the process will exit. -- -- Internally the pool uses the async function to wrap the -- callback. -- -- When a Multiplixed Dispatch message is received with a -- Nothing then the worker is cancelled and the -- worker is removed from the map. -- -- Such a message is automatically sent after the -- PoolWorkerCallback has returned, even when an exception was -- thrown. See finally. spawnPool :: forall k w poolBox workerBox m. (IsMessageBoxArg poolBox, IsMessageBoxArg workerBox, Ord k, Display k, HasLogFunc m) => poolBox -> workerBox -> PoolWorkerCallback workerBox w k m -> RIO m (Either SomeException (Pool poolBox k w)) -- | This message will cancel the worker with the given key. If the -- PoolWorkerCallback wants to do cleanup it should use -- finally or onException. removePoolWorkerMessage :: k -> Multiplexed k (Maybe w)