-- | Launch- and Dispatch messages to processes.
--
-- A pool has an 'Input' for 'Multiplexed' messages,
-- and dispatches incoming messges to concurrent
-- processes using user defined @'MessageBox'es@.
--
-- The pool starts and stops the processes and
-- creates the message boxes.
--
-- The user supplied 'PoolWorkerCallback' 
-- usually runs a loop that @'receive's@ messages
-- from the 'MessageBox' created by the pool for that worker.
--
-- When a worker process dies, e.g. because the 
-- 'PoolWorkerCallback' returns, the pool
-- process will also 'cancel' the process (just to make sure...)
-- and cleanup the internal 'Broker'.
module RIO.ProcessPool.Pool
  ( Pool (..),
    spawnPool,
    PoolWorkerCallback (..),
    removePoolWorkerMessage,
  )
where

import RIO
import RIO.ProcessPool.Broker
  ( BrokerConfig (MkBrokerConfig),
    BrokerResult,
    Multiplexed (Dispatch),
    ResourceUpdate (KeepResource, RemoveResource),
    spawnBroker,
  )
import UnliftIO.MessageBox.Class
  ( IsInput (deliver),
    IsMessageBox (Input, newInput),
    IsMessageBoxArg (MessageBox, newMessageBox),
  )

-- | Start a 'Pool'.
--
-- Start a process that receives messages sent to the
-- 'poolInput' and dispatches them to the 'Input' of
-- __pool member__ processes. If necessary the
-- pool worker processes are started.
--
-- Each pool worker process is started using 'async' and
-- executes the 'PoolWorkerCallback'.
--
-- When the callback returns, the process will exit.
--
-- Internally the pool uses the 'async' function to wrap
-- the callback.
--
-- When a 'Multiplixed' 'Dispatch' message is received with
-- a @Nothing@ then the worker is @'cancel'led@ and the
-- worker is removed from the map.
--
-- Such a message is automatically sent after the 'PoolWorkerCallback'
-- has returned, even when an exception was thrown. See
-- 'finally'.
spawnPool ::
  forall k w poolBox workerBox m.
  ( IsMessageBoxArg poolBox,
    IsMessageBoxArg workerBox,
    Ord k,
    Display k,
    HasLogFunc m
  ) =>
  poolBox ->
  workerBox ->
  PoolWorkerCallback workerBox w k m ->
  RIO
    m
    ( Either
        SomeException
        (Pool poolBox k w)
    )
spawnPool :: poolBox
-> workerBox
-> PoolWorkerCallback workerBox w k m
-> RIO m (Either SomeException (Pool poolBox k w))
spawnPool poolBox
poolBox workerBox
workerBoxArg PoolWorkerCallback workerBox w k m
poolMemberImpl = do
  MVar (Input (MessageBox poolBox) (Multiplexed k (Maybe w)))
brInRef <- RIO m (MVar (Input (MessageBox poolBox) (Multiplexed k (Maybe w))))
forall (m :: * -> *) a. MonadIO m => m (MVar a)
newEmptyMVar
  let brCfg :: BrokerConfig
  k (Multiplexed k (Maybe w)) (Maybe w) (PoolWorker workerBox w) m
brCfg =
        Demultiplexer (Multiplexed k (Maybe w)) k (Maybe w)
-> MessageHandler k (Maybe w) (PoolWorker workerBox w) m
-> ResourceCreator k (Maybe w) (PoolWorker workerBox w) m
-> ResourceCleaner k (PoolWorker workerBox w) m
-> BrokerConfig
     k (Multiplexed k (Maybe w)) (Maybe w) (PoolWorker workerBox w) m
forall k w' w a m.
Demultiplexer w' k w
-> MessageHandler k w a m
-> ResourceCreator k w a m
-> ResourceCleaner k a m
-> BrokerConfig k w' w a m
MkBrokerConfig
          Demultiplexer (Multiplexed k (Maybe w)) k (Maybe w)
forall a. a -> a
id
          MessageHandler k (Maybe w) (PoolWorker workerBox w) m
forall m b k w.
(HasLogFunc m, IsInput (Input (MessageBox b)), Display k) =>
k
-> Maybe w
-> PoolWorker b w
-> RIO m (ResourceUpdate (PoolWorker b w))
dispatchToWorker
          (workerBox
-> MVar (Input (MessageBox poolBox) (Multiplexed k (Maybe w)))
-> PoolWorkerCallback workerBox w k m
-> ResourceCreator k (Maybe w) (PoolWorker workerBox w) m
forall k w (poolBoxIn :: * -> *) workerBox m.
(IsMessageBoxArg workerBox, HasLogFunc m, IsInput poolBoxIn,
 Display k) =>
workerBox
-> MVar (poolBoxIn (Multiplexed k (Maybe w)))
-> PoolWorkerCallback workerBox w k m
-> k
-> Maybe (Maybe w)
-> RIO m (PoolWorker workerBox w)
spawnWorker workerBox
workerBoxArg MVar (Input (MessageBox poolBox) (Multiplexed k (Maybe w)))
brInRef PoolWorkerCallback workerBox w k m
poolMemberImpl)
          ResourceCleaner k (PoolWorker workerBox w) m
forall k workerBox w m. k -> PoolWorker workerBox w -> RIO m ()
removeWorker
  poolBox
-> BrokerConfig
     k (Multiplexed k (Maybe w)) (Maybe w) (PoolWorker workerBox w) m
-> RIO
     m
     (Either
        SomeException
        (Input (MessageBox poolBox) (Multiplexed k (Maybe w)),
         Async BrokerResult))
forall brokerBoxArg k w' w a m.
(HasLogFunc m, Ord k, Display k, IsMessageBoxArg brokerBoxArg) =>
brokerBoxArg
-> BrokerConfig k w' w a m
-> RIO
     m
     (Either
        SomeException
        (Input (MessageBox brokerBoxArg) w', Async BrokerResult))
spawnBroker poolBox
poolBox BrokerConfig
  k (Multiplexed k (Maybe w)) (Maybe w) (PoolWorker workerBox w) m
brCfg
    RIO
  m
  (Either
     SomeException
     (Input (MessageBox poolBox) (Multiplexed k (Maybe w)),
      Async BrokerResult))
-> (Either
      SomeException
      (Input (MessageBox poolBox) (Multiplexed k (Maybe w)),
       Async BrokerResult)
    -> RIO m (Either SomeException (Pool poolBox k w)))
-> RIO m (Either SomeException (Pool poolBox k w))
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ((Input (MessageBox poolBox) (Multiplexed k (Maybe w)),
  Async BrokerResult)
 -> RIO m (Pool poolBox k w))
-> Either
     SomeException
     (Input (MessageBox poolBox) (Multiplexed k (Maybe w)),
      Async BrokerResult)
-> RIO m (Either SomeException (Pool poolBox k w))
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
traverse
      ( \(Input (MessageBox poolBox) (Multiplexed k (Maybe w))
brIn, Async BrokerResult
brA) -> do
          MVar (Input (MessageBox poolBox) (Multiplexed k (Maybe w)))
-> Input (MessageBox poolBox) (Multiplexed k (Maybe w)) -> RIO m ()
forall (m :: * -> *) a. MonadIO m => MVar a -> a -> m ()
putMVar MVar (Input (MessageBox poolBox) (Multiplexed k (Maybe w)))
brInRef Input (MessageBox poolBox) (Multiplexed k (Maybe w))
brIn
          Pool poolBox k w -> RIO m (Pool poolBox k w)
forall (m :: * -> *) a. Monad m => a -> m a
return MkPool :: forall poolBox k w.
Input (MessageBox poolBox) (Multiplexed k (Maybe w))
-> Async BrokerResult -> Pool poolBox k w
MkPool {poolInput :: Input (MessageBox poolBox) (Multiplexed k (Maybe w))
poolInput = Input (MessageBox poolBox) (Multiplexed k (Maybe w))
brIn, poolAsync :: Async BrokerResult
poolAsync = Async BrokerResult
brA}
      )

-- | This message will 'cancel' the worker
-- with the given key.
-- If the 'PoolWorkerCallback' wants to do cleanup
-- it should use 'finally' or 'onException'.
removePoolWorkerMessage :: k -> Multiplexed k (Maybe w)
removePoolWorkerMessage :: k -> Multiplexed k (Maybe w)
removePoolWorkerMessage !k
k = k -> Maybe w -> Multiplexed k (Maybe w)
forall k w. k -> w -> Multiplexed k w
Dispatch k
k Maybe w
forall a. Maybe a
Nothing

-- | The function that processes a
-- 'MessageBox' of a worker for a specific /key/.
newtype PoolWorkerCallback workerBox w k m = MkPoolWorkerCallback
  { PoolWorkerCallback workerBox w k m
-> k -> MessageBox workerBox w -> RIO m ()
runPoolWorkerCallback :: k -> MessageBox workerBox w -> RIO m ()
  }

-- | A record containing the message box 'Input' of the
-- 'Broker' and the 'Async' value required to 'cancel'
-- the pools broker process.
data Pool poolBox k w = MkPool
  { -- | Message sent to this input are dispatched to workers.
    -- If the message is an 'Initialize' message, a new 'async'
    -- process will be started.
    -- If the message value is 'Nothing', the processes is killed.
    Pool poolBox k w
-> Input (MessageBox poolBox) (Multiplexed k (Maybe w))
poolInput :: !(Input (MessageBox poolBox) (Multiplexed k (Maybe w))),
    -- | The async of the internal 'Broker'.
    Pool poolBox k w -> Async BrokerResult
poolAsync :: !(Async BrokerResult)
  }

-- | Internal data structure containing a workers
-- message 'Input' and 'Async' value for cancellation.
data PoolWorker workerBox w = MkPoolWorker
  { PoolWorker workerBox w -> Input (MessageBox workerBox) w
poolWorkerIn :: !(Input (MessageBox workerBox) w),
    PoolWorker workerBox w -> Async ()
poolWorkerAsync :: !(Async ())
  }

dispatchToWorker ::
  (HasLogFunc m, IsInput (Input (MessageBox b)), Display k) =>
  k ->
  Maybe w ->
  PoolWorker b w ->
  RIO m (ResourceUpdate (PoolWorker b w))
dispatchToWorker :: k
-> Maybe w
-> PoolWorker b w
-> RIO m (ResourceUpdate (PoolWorker b w))
dispatchToWorker k
k Maybe w
pMsg PoolWorker b w
pm =
  case Maybe w
pMsg of
    Just w
w -> w -> RIO m (ResourceUpdate (PoolWorker b w))
helper w
w
    Maybe w
Nothing -> ResourceUpdate (PoolWorker b w)
-> RIO m (ResourceUpdate (PoolWorker b w))
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe (PoolWorker b w) -> ResourceUpdate (PoolWorker b w)
forall a. Maybe a -> ResourceUpdate a
RemoveResource Maybe (PoolWorker b w)
forall a. Maybe a
Nothing)
  where
    helper :: w -> RIO m (ResourceUpdate (PoolWorker b w))
helper w
msg = do
      Bool
ok <- Input (MessageBox b) w -> w -> RIO m Bool
forall (input :: * -> *) (m :: * -> *) a.
(IsInput input, MonadUnliftIO m) =>
input a -> a -> m Bool
deliver (PoolWorker b w -> Input (MessageBox b) w
forall workerBox w.
PoolWorker workerBox w -> Input (MessageBox workerBox) w
poolWorkerIn PoolWorker b w
pm) w
msg
      if Bool -> Bool
not Bool
ok
        then do
          Utf8Builder -> RIO m ()
forall (m :: * -> *) env.
(MonadIO m, MonadReader env m, HasLogFunc env, HasCallStack) =>
Utf8Builder -> m ()
logError (Utf8Builder
"failed to deliver message to pool worker: " Utf8Builder -> Utf8Builder -> Utf8Builder
forall a. Semigroup a => a -> a -> a
<> k -> Utf8Builder
forall a. Display a => a -> Utf8Builder
display k
k)
          ResourceUpdate (PoolWorker b w)
-> RIO m (ResourceUpdate (PoolWorker b w))
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe (PoolWorker b w) -> ResourceUpdate (PoolWorker b w)
forall a. Maybe a -> ResourceUpdate a
RemoveResource Maybe (PoolWorker b w)
forall a. Maybe a
Nothing)
        else ResourceUpdate (PoolWorker b w)
-> RIO m (ResourceUpdate (PoolWorker b w))
forall (m :: * -> *) a. Monad m => a -> m a
return ResourceUpdate (PoolWorker b w)
forall a. ResourceUpdate a
KeepResource

spawnWorker ::
  forall k w poolBoxIn workerBox m.
  ( IsMessageBoxArg workerBox,
    HasLogFunc m,
    IsInput poolBoxIn,
    Display k
  ) =>
  workerBox ->
  MVar (poolBoxIn (Multiplexed k (Maybe w))) ->
  PoolWorkerCallback workerBox w k m ->
  k ->
  Maybe (Maybe w) ->
  RIO m (PoolWorker workerBox w)
spawnWorker :: workerBox
-> MVar (poolBoxIn (Multiplexed k (Maybe w)))
-> PoolWorkerCallback workerBox w k m
-> k
-> Maybe (Maybe w)
-> RIO m (PoolWorker workerBox w)
spawnWorker workerBox
workerBox MVar (poolBoxIn (Multiplexed k (Maybe w)))
brInRef PoolWorkerCallback workerBox w k m
pmCb k
this Maybe (Maybe w)
_mw = do
  MVar (Maybe (Input (MessageBox workerBox) w))
inputRef <- RIO m (MVar (Maybe (Input (MessageBox workerBox) w)))
forall (m :: * -> *) a. MonadIO m => m (MVar a)
newEmptyMVar
  Async ()
a <- RIO m () -> RIO m (Async ())
forall (m :: * -> *) a. MonadUnliftIO m => m a -> m (Async a)
async (MVar (Maybe (Input (MessageBox workerBox) w)) -> RIO m ()
go MVar (Maybe (Input (MessageBox workerBox) w))
inputRef RIO m () -> RIO m () -> RIO m ()
forall (m :: * -> *) a b. MonadUnliftIO m => m a -> m b -> m a
`finally` RIO m ()
enqueueCleanup)
  Maybe (Input (MessageBox workerBox) w)
boxInM <- MVar (Maybe (Input (MessageBox workerBox) w))
-> RIO m (Maybe (Input (MessageBox workerBox) w))
forall (m :: * -> *) a. MonadIO m => MVar a -> m a
takeMVar MVar (Maybe (Input (MessageBox workerBox) w))
inputRef
  case Maybe (Input (MessageBox workerBox) w)
boxInM of
    Maybe (Input (MessageBox workerBox) w)
Nothing -> do
      Async () -> RIO m ()
forall (m :: * -> *) a. MonadIO m => Async a -> m ()
cancel Async ()
a
      StringException -> RIO m (PoolWorker workerBox w)
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO (HasCallStack => String -> StringException
String -> StringException
stringException String
"failed to spawnWorker")
    Just Input (MessageBox workerBox) w
boxIn ->
      PoolWorker workerBox w -> RIO m (PoolWorker workerBox w)
forall (m :: * -> *) a. Monad m => a -> m a
return MkPoolWorker :: forall workerBox w.
Input (MessageBox workerBox) w
-> Async () -> PoolWorker workerBox w
MkPoolWorker {poolWorkerIn :: Input (MessageBox workerBox) w
poolWorkerIn = Input (MessageBox workerBox) w
boxIn, poolWorkerAsync :: Async ()
poolWorkerAsync = Async ()
a}
  where
    go :: MVar (Maybe (Input (MessageBox workerBox) w)) -> RIO m ()
go MVar (Maybe (Input (MessageBox workerBox) w))
inputRef = do
      (MessageBox workerBox w
b, Input (MessageBox workerBox) w
boxIn) <-
        RIO m (MessageBox workerBox w, Input (MessageBox workerBox) w)
-> (SomeException -> RIO m ())
-> RIO m (MessageBox workerBox w, Input (MessageBox workerBox) w)
forall (m :: * -> *) e a b.
(MonadUnliftIO m, Exception e) =>
m a -> (e -> m b) -> m a
withException
          ( do
              MessageBox workerBox w
b <- workerBox -> RIO m (MessageBox workerBox w)
forall argument (m :: * -> *) a.
(IsMessageBoxArg argument, MonadUnliftIO m) =>
argument -> m (MessageBox argument a)
newMessageBox workerBox
workerBox
              Input (MessageBox workerBox) w
boxIn <- MessageBox workerBox w -> RIO m (Input (MessageBox workerBox) w)
forall (box :: * -> *) (m :: * -> *) a.
(IsMessageBox box, MonadUnliftIO m) =>
box a -> m (Input box a)
newInput MessageBox workerBox w
b
              (MessageBox workerBox w, Input (MessageBox workerBox) w)
-> RIO m (MessageBox workerBox w, Input (MessageBox workerBox) w)
forall (m :: * -> *) a. Monad m => a -> m a
return (MessageBox workerBox w
b, Input (MessageBox workerBox) w
boxIn)
          )
          (\(SomeException
ex :: SomeException) -> do
              Utf8Builder -> RIO m ()
forall (m :: * -> *) env.
(MonadIO m, MonadReader env m, HasLogFunc env, HasCallStack) =>
Utf8Builder -> m ()
logError
                ( Utf8Builder
"failed to create the message box for the new pool worker: "
                    Utf8Builder -> Utf8Builder -> Utf8Builder
forall a. Semigroup a => a -> a -> a
<> k -> Utf8Builder
forall a. Display a => a -> Utf8Builder
display k
this
                    Utf8Builder -> Utf8Builder -> Utf8Builder
forall a. Semigroup a => a -> a -> a
<> Utf8Builder
" exception caught: "
                    Utf8Builder -> Utf8Builder -> Utf8Builder
forall a. Semigroup a => a -> a -> a
<> SomeException -> Utf8Builder
forall a. Display a => a -> Utf8Builder
display SomeException
ex
                )
              MVar (Maybe (Input (MessageBox workerBox) w))
-> Maybe (Input (MessageBox workerBox) w) -> RIO m ()
forall (m :: * -> *) a. MonadIO m => MVar a -> a -> m ()
putMVar MVar (Maybe (Input (MessageBox workerBox) w))
inputRef Maybe (Input (MessageBox workerBox) w)
forall a. Maybe a
Nothing
          )
      MVar (Maybe (Input (MessageBox workerBox) w))
-> Maybe (Input (MessageBox workerBox) w) -> RIO m ()
forall (m :: * -> *) a. MonadIO m => MVar a -> a -> m ()
putMVar MVar (Maybe (Input (MessageBox workerBox) w))
inputRef (Input (MessageBox workerBox) w
-> Maybe (Input (MessageBox workerBox) w)
forall a. a -> Maybe a
Just Input (MessageBox workerBox) w
boxIn)
      PoolWorkerCallback workerBox w k m
-> k -> MessageBox workerBox w -> RIO m ()
forall workerBox w k m.
PoolWorkerCallback workerBox w k m
-> k -> MessageBox workerBox w -> RIO m ()
runPoolWorkerCallback PoolWorkerCallback workerBox w k m
pmCb k
this MessageBox workerBox w
b
    enqueueCleanup :: RIO m ()
enqueueCleanup =
      MVar (poolBoxIn (Multiplexed k (Maybe w)))
-> RIO m (Maybe (poolBoxIn (Multiplexed k (Maybe w))))
forall (m :: * -> *) a. MonadIO m => MVar a -> m (Maybe a)
tryReadMVar MVar (poolBoxIn (Multiplexed k (Maybe w)))
brInRef
        RIO m (Maybe (poolBoxIn (Multiplexed k (Maybe w))))
-> (Maybe (poolBoxIn (Multiplexed k (Maybe w))) -> RIO m ())
-> RIO m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (poolBoxIn (Multiplexed k (Maybe w)) -> RIO m ())
-> Maybe (poolBoxIn (Multiplexed k (Maybe w))) -> RIO m ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_
          ( \poolBoxIn (Multiplexed k (Maybe w))
brIn ->
              RIO m Bool -> RIO m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (poolBoxIn (Multiplexed k (Maybe w))
-> Multiplexed k (Maybe w) -> RIO m Bool
forall (input :: * -> *) (m :: * -> *) a.
(IsInput input, MonadUnliftIO m) =>
input a -> a -> m Bool
deliver poolBoxIn (Multiplexed k (Maybe w))
brIn (k -> Multiplexed k (Maybe w)
forall k w. k -> Multiplexed k (Maybe w)
removePoolWorkerMessage k
this))
          )

removeWorker ::  
  k ->
  PoolWorker workerBox w ->
  RIO m ()
removeWorker :: k -> PoolWorker workerBox w -> RIO m ()
removeWorker k
_k =
  RIO m () -> RIO m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (RIO m () -> RIO m ())
-> (PoolWorker workerBox w -> RIO m ())
-> PoolWorker workerBox w
-> RIO m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Async () -> RIO m ()
forall (m :: * -> *) a. MonadIO m => Async a -> m ()
cancel (Async () -> RIO m ())
-> (PoolWorker workerBox w -> Async ())
-> PoolWorker workerBox w
-> RIO m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. PoolWorker workerBox w -> Async ()
forall workerBox w. PoolWorker workerBox w -> Async ()
poolWorkerAsync