module RIO.ProcessPool.Pool
( Pool (..),
spawnPool,
PoolWorkerCallback (..),
removePoolWorkerMessage,
)
where
import RIO
import RIO.ProcessPool.Broker
( BrokerConfig (MkBrokerConfig),
BrokerResult,
Multiplexed (Dispatch),
ResourceUpdate (KeepResource, RemoveResource),
spawnBroker,
)
import UnliftIO.MessageBox.Class
( IsInput (deliver),
IsMessageBox (Input, newInput),
IsMessageBoxArg (MessageBox, newMessageBox),
)
spawnPool ::
forall k w poolBox workerBox m.
( IsMessageBoxArg poolBox,
IsMessageBoxArg workerBox,
Ord k,
Display k,
HasLogFunc m
) =>
poolBox ->
workerBox ->
PoolWorkerCallback workerBox w k m ->
RIO
m
( Either
SomeException
(Pool poolBox k w)
)
spawnPool :: poolBox
-> workerBox
-> PoolWorkerCallback workerBox w k m
-> RIO m (Either SomeException (Pool poolBox k w))
spawnPool poolBox
poolBox workerBox
workerBoxArg PoolWorkerCallback workerBox w k m
poolMemberImpl = do
MVar (Input (MessageBox poolBox) (Multiplexed k (Maybe w)))
brInRef <- RIO m (MVar (Input (MessageBox poolBox) (Multiplexed k (Maybe w))))
forall (m :: * -> *) a. MonadIO m => m (MVar a)
newEmptyMVar
let brCfg :: BrokerConfig
k (Multiplexed k (Maybe w)) (Maybe w) (PoolWorker workerBox w) m
brCfg =
Demultiplexer (Multiplexed k (Maybe w)) k (Maybe w)
-> MessageHandler k (Maybe w) (PoolWorker workerBox w) m
-> ResourceCreator k (Maybe w) (PoolWorker workerBox w) m
-> ResourceCleaner k (PoolWorker workerBox w) m
-> BrokerConfig
k (Multiplexed k (Maybe w)) (Maybe w) (PoolWorker workerBox w) m
forall k w' w a m.
Demultiplexer w' k w
-> MessageHandler k w a m
-> ResourceCreator k w a m
-> ResourceCleaner k a m
-> BrokerConfig k w' w a m
MkBrokerConfig
Demultiplexer (Multiplexed k (Maybe w)) k (Maybe w)
forall a. a -> a
id
MessageHandler k (Maybe w) (PoolWorker workerBox w) m
forall m b k w.
(HasLogFunc m, IsInput (Input (MessageBox b)), Display k) =>
k
-> Maybe w
-> PoolWorker b w
-> RIO m (ResourceUpdate (PoolWorker b w))
dispatchToWorker
(workerBox
-> MVar (Input (MessageBox poolBox) (Multiplexed k (Maybe w)))
-> PoolWorkerCallback workerBox w k m
-> ResourceCreator k (Maybe w) (PoolWorker workerBox w) m
forall k w (poolBoxIn :: * -> *) workerBox m.
(IsMessageBoxArg workerBox, HasLogFunc m, IsInput poolBoxIn,
Display k) =>
workerBox
-> MVar (poolBoxIn (Multiplexed k (Maybe w)))
-> PoolWorkerCallback workerBox w k m
-> k
-> Maybe (Maybe w)
-> RIO m (PoolWorker workerBox w)
spawnWorker workerBox
workerBoxArg MVar (Input (MessageBox poolBox) (Multiplexed k (Maybe w)))
brInRef PoolWorkerCallback workerBox w k m
poolMemberImpl)
ResourceCleaner k (PoolWorker workerBox w) m
forall k workerBox w m. k -> PoolWorker workerBox w -> RIO m ()
removeWorker
poolBox
-> BrokerConfig
k (Multiplexed k (Maybe w)) (Maybe w) (PoolWorker workerBox w) m
-> RIO
m
(Either
SomeException
(Input (MessageBox poolBox) (Multiplexed k (Maybe w)),
Async BrokerResult))
forall brokerBoxArg k w' w a m.
(HasLogFunc m, Ord k, Display k, IsMessageBoxArg brokerBoxArg) =>
brokerBoxArg
-> BrokerConfig k w' w a m
-> RIO
m
(Either
SomeException
(Input (MessageBox brokerBoxArg) w', Async BrokerResult))
spawnBroker poolBox
poolBox BrokerConfig
k (Multiplexed k (Maybe w)) (Maybe w) (PoolWorker workerBox w) m
brCfg
RIO
m
(Either
SomeException
(Input (MessageBox poolBox) (Multiplexed k (Maybe w)),
Async BrokerResult))
-> (Either
SomeException
(Input (MessageBox poolBox) (Multiplexed k (Maybe w)),
Async BrokerResult)
-> RIO m (Either SomeException (Pool poolBox k w)))
-> RIO m (Either SomeException (Pool poolBox k w))
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ((Input (MessageBox poolBox) (Multiplexed k (Maybe w)),
Async BrokerResult)
-> RIO m (Pool poolBox k w))
-> Either
SomeException
(Input (MessageBox poolBox) (Multiplexed k (Maybe w)),
Async BrokerResult)
-> RIO m (Either SomeException (Pool poolBox k w))
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
traverse
( \(Input (MessageBox poolBox) (Multiplexed k (Maybe w))
brIn, Async BrokerResult
brA) -> do
MVar (Input (MessageBox poolBox) (Multiplexed k (Maybe w)))
-> Input (MessageBox poolBox) (Multiplexed k (Maybe w)) -> RIO m ()
forall (m :: * -> *) a. MonadIO m => MVar a -> a -> m ()
putMVar MVar (Input (MessageBox poolBox) (Multiplexed k (Maybe w)))
brInRef Input (MessageBox poolBox) (Multiplexed k (Maybe w))
brIn
Pool poolBox k w -> RIO m (Pool poolBox k w)
forall (m :: * -> *) a. Monad m => a -> m a
return MkPool :: forall poolBox k w.
Input (MessageBox poolBox) (Multiplexed k (Maybe w))
-> Async BrokerResult -> Pool poolBox k w
MkPool {poolInput :: Input (MessageBox poolBox) (Multiplexed k (Maybe w))
poolInput = Input (MessageBox poolBox) (Multiplexed k (Maybe w))
brIn, poolAsync :: Async BrokerResult
poolAsync = Async BrokerResult
brA}
)
removePoolWorkerMessage :: k -> Multiplexed k (Maybe w)
removePoolWorkerMessage :: k -> Multiplexed k (Maybe w)
removePoolWorkerMessage !k
k = k -> Maybe w -> Multiplexed k (Maybe w)
forall k w. k -> w -> Multiplexed k w
Dispatch k
k Maybe w
forall a. Maybe a
Nothing
newtype PoolWorkerCallback workerBox w k m = MkPoolWorkerCallback
{ PoolWorkerCallback workerBox w k m
-> k -> MessageBox workerBox w -> RIO m ()
runPoolWorkerCallback :: k -> MessageBox workerBox w -> RIO m ()
}
data Pool poolBox k w = MkPool
{
Pool poolBox k w
-> Input (MessageBox poolBox) (Multiplexed k (Maybe w))
poolInput :: !(Input (MessageBox poolBox) (Multiplexed k (Maybe w))),
Pool poolBox k w -> Async BrokerResult
poolAsync :: !(Async BrokerResult)
}
data PoolWorker workerBox w = MkPoolWorker
{ PoolWorker workerBox w -> Input (MessageBox workerBox) w
poolWorkerIn :: !(Input (MessageBox workerBox) w),
PoolWorker workerBox w -> Async ()
poolWorkerAsync :: !(Async ())
}
dispatchToWorker ::
(HasLogFunc m, IsInput (Input (MessageBox b)), Display k) =>
k ->
Maybe w ->
PoolWorker b w ->
RIO m (ResourceUpdate (PoolWorker b w))
dispatchToWorker :: k
-> Maybe w
-> PoolWorker b w
-> RIO m (ResourceUpdate (PoolWorker b w))
dispatchToWorker k
k Maybe w
pMsg PoolWorker b w
pm =
case Maybe w
pMsg of
Just w
w -> w -> RIO m (ResourceUpdate (PoolWorker b w))
helper w
w
Maybe w
Nothing -> ResourceUpdate (PoolWorker b w)
-> RIO m (ResourceUpdate (PoolWorker b w))
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe (PoolWorker b w) -> ResourceUpdate (PoolWorker b w)
forall a. Maybe a -> ResourceUpdate a
RemoveResource Maybe (PoolWorker b w)
forall a. Maybe a
Nothing)
where
helper :: w -> RIO m (ResourceUpdate (PoolWorker b w))
helper w
msg = do
Bool
ok <- Input (MessageBox b) w -> w -> RIO m Bool
forall (input :: * -> *) (m :: * -> *) a.
(IsInput input, MonadUnliftIO m) =>
input a -> a -> m Bool
deliver (PoolWorker b w -> Input (MessageBox b) w
forall workerBox w.
PoolWorker workerBox w -> Input (MessageBox workerBox) w
poolWorkerIn PoolWorker b w
pm) w
msg
if Bool -> Bool
not Bool
ok
then do
Utf8Builder -> RIO m ()
forall (m :: * -> *) env.
(MonadIO m, MonadReader env m, HasLogFunc env, HasCallStack) =>
Utf8Builder -> m ()
logError (Utf8Builder
"failed to deliver message to pool worker: " Utf8Builder -> Utf8Builder -> Utf8Builder
forall a. Semigroup a => a -> a -> a
<> k -> Utf8Builder
forall a. Display a => a -> Utf8Builder
display k
k)
ResourceUpdate (PoolWorker b w)
-> RIO m (ResourceUpdate (PoolWorker b w))
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe (PoolWorker b w) -> ResourceUpdate (PoolWorker b w)
forall a. Maybe a -> ResourceUpdate a
RemoveResource Maybe (PoolWorker b w)
forall a. Maybe a
Nothing)
else ResourceUpdate (PoolWorker b w)
-> RIO m (ResourceUpdate (PoolWorker b w))
forall (m :: * -> *) a. Monad m => a -> m a
return ResourceUpdate (PoolWorker b w)
forall a. ResourceUpdate a
KeepResource
spawnWorker ::
forall k w poolBoxIn workerBox m.
( IsMessageBoxArg workerBox,
HasLogFunc m,
IsInput poolBoxIn,
Display k
) =>
workerBox ->
MVar (poolBoxIn (Multiplexed k (Maybe w))) ->
PoolWorkerCallback workerBox w k m ->
k ->
Maybe (Maybe w) ->
RIO m (PoolWorker workerBox w)
spawnWorker :: workerBox
-> MVar (poolBoxIn (Multiplexed k (Maybe w)))
-> PoolWorkerCallback workerBox w k m
-> k
-> Maybe (Maybe w)
-> RIO m (PoolWorker workerBox w)
spawnWorker workerBox
workerBox MVar (poolBoxIn (Multiplexed k (Maybe w)))
brInRef PoolWorkerCallback workerBox w k m
pmCb k
this Maybe (Maybe w)
_mw = do
MVar (Maybe (Input (MessageBox workerBox) w))
inputRef <- RIO m (MVar (Maybe (Input (MessageBox workerBox) w)))
forall (m :: * -> *) a. MonadIO m => m (MVar a)
newEmptyMVar
Async ()
a <- RIO m () -> RIO m (Async ())
forall (m :: * -> *) a. MonadUnliftIO m => m a -> m (Async a)
async (MVar (Maybe (Input (MessageBox workerBox) w)) -> RIO m ()
go MVar (Maybe (Input (MessageBox workerBox) w))
inputRef RIO m () -> RIO m () -> RIO m ()
forall (m :: * -> *) a b. MonadUnliftIO m => m a -> m b -> m a
`finally` RIO m ()
enqueueCleanup)
Maybe (Input (MessageBox workerBox) w)
boxInM <- MVar (Maybe (Input (MessageBox workerBox) w))
-> RIO m (Maybe (Input (MessageBox workerBox) w))
forall (m :: * -> *) a. MonadIO m => MVar a -> m a
takeMVar MVar (Maybe (Input (MessageBox workerBox) w))
inputRef
case Maybe (Input (MessageBox workerBox) w)
boxInM of
Maybe (Input (MessageBox workerBox) w)
Nothing -> do
Async () -> RIO m ()
forall (m :: * -> *) a. MonadIO m => Async a -> m ()
cancel Async ()
a
StringException -> RIO m (PoolWorker workerBox w)
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO (HasCallStack => String -> StringException
String -> StringException
stringException String
"failed to spawnWorker")
Just Input (MessageBox workerBox) w
boxIn ->
PoolWorker workerBox w -> RIO m (PoolWorker workerBox w)
forall (m :: * -> *) a. Monad m => a -> m a
return MkPoolWorker :: forall workerBox w.
Input (MessageBox workerBox) w
-> Async () -> PoolWorker workerBox w
MkPoolWorker {poolWorkerIn :: Input (MessageBox workerBox) w
poolWorkerIn = Input (MessageBox workerBox) w
boxIn, poolWorkerAsync :: Async ()
poolWorkerAsync = Async ()
a}
where
go :: MVar (Maybe (Input (MessageBox workerBox) w)) -> RIO m ()
go MVar (Maybe (Input (MessageBox workerBox) w))
inputRef = do
(MessageBox workerBox w
b, Input (MessageBox workerBox) w
boxIn) <-
RIO m (MessageBox workerBox w, Input (MessageBox workerBox) w)
-> (SomeException -> RIO m ())
-> RIO m (MessageBox workerBox w, Input (MessageBox workerBox) w)
forall (m :: * -> *) e a b.
(MonadUnliftIO m, Exception e) =>
m a -> (e -> m b) -> m a
withException
( do
MessageBox workerBox w
b <- workerBox -> RIO m (MessageBox workerBox w)
forall argument (m :: * -> *) a.
(IsMessageBoxArg argument, MonadUnliftIO m) =>
argument -> m (MessageBox argument a)
newMessageBox workerBox
workerBox
Input (MessageBox workerBox) w
boxIn <- MessageBox workerBox w -> RIO m (Input (MessageBox workerBox) w)
forall (box :: * -> *) (m :: * -> *) a.
(IsMessageBox box, MonadUnliftIO m) =>
box a -> m (Input box a)
newInput MessageBox workerBox w
b
(MessageBox workerBox w, Input (MessageBox workerBox) w)
-> RIO m (MessageBox workerBox w, Input (MessageBox workerBox) w)
forall (m :: * -> *) a. Monad m => a -> m a
return (MessageBox workerBox w
b, Input (MessageBox workerBox) w
boxIn)
)
(\(SomeException
ex :: SomeException) -> do
Utf8Builder -> RIO m ()
forall (m :: * -> *) env.
(MonadIO m, MonadReader env m, HasLogFunc env, HasCallStack) =>
Utf8Builder -> m ()
logError
( Utf8Builder
"failed to create the message box for the new pool worker: "
Utf8Builder -> Utf8Builder -> Utf8Builder
forall a. Semigroup a => a -> a -> a
<> k -> Utf8Builder
forall a. Display a => a -> Utf8Builder
display k
this
Utf8Builder -> Utf8Builder -> Utf8Builder
forall a. Semigroup a => a -> a -> a
<> Utf8Builder
" exception caught: "
Utf8Builder -> Utf8Builder -> Utf8Builder
forall a. Semigroup a => a -> a -> a
<> SomeException -> Utf8Builder
forall a. Display a => a -> Utf8Builder
display SomeException
ex
)
MVar (Maybe (Input (MessageBox workerBox) w))
-> Maybe (Input (MessageBox workerBox) w) -> RIO m ()
forall (m :: * -> *) a. MonadIO m => MVar a -> a -> m ()
putMVar MVar (Maybe (Input (MessageBox workerBox) w))
inputRef Maybe (Input (MessageBox workerBox) w)
forall a. Maybe a
Nothing
)
MVar (Maybe (Input (MessageBox workerBox) w))
-> Maybe (Input (MessageBox workerBox) w) -> RIO m ()
forall (m :: * -> *) a. MonadIO m => MVar a -> a -> m ()
putMVar MVar (Maybe (Input (MessageBox workerBox) w))
inputRef (Input (MessageBox workerBox) w
-> Maybe (Input (MessageBox workerBox) w)
forall a. a -> Maybe a
Just Input (MessageBox workerBox) w
boxIn)
PoolWorkerCallback workerBox w k m
-> k -> MessageBox workerBox w -> RIO m ()
forall workerBox w k m.
PoolWorkerCallback workerBox w k m
-> k -> MessageBox workerBox w -> RIO m ()
runPoolWorkerCallback PoolWorkerCallback workerBox w k m
pmCb k
this MessageBox workerBox w
b
enqueueCleanup :: RIO m ()
enqueueCleanup =
MVar (poolBoxIn (Multiplexed k (Maybe w)))
-> RIO m (Maybe (poolBoxIn (Multiplexed k (Maybe w))))
forall (m :: * -> *) a. MonadIO m => MVar a -> m (Maybe a)
tryReadMVar MVar (poolBoxIn (Multiplexed k (Maybe w)))
brInRef
RIO m (Maybe (poolBoxIn (Multiplexed k (Maybe w))))
-> (Maybe (poolBoxIn (Multiplexed k (Maybe w))) -> RIO m ())
-> RIO m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (poolBoxIn (Multiplexed k (Maybe w)) -> RIO m ())
-> Maybe (poolBoxIn (Multiplexed k (Maybe w))) -> RIO m ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_
( \poolBoxIn (Multiplexed k (Maybe w))
brIn ->
RIO m Bool -> RIO m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (poolBoxIn (Multiplexed k (Maybe w))
-> Multiplexed k (Maybe w) -> RIO m Bool
forall (input :: * -> *) (m :: * -> *) a.
(IsInput input, MonadUnliftIO m) =>
input a -> a -> m Bool
deliver poolBoxIn (Multiplexed k (Maybe w))
brIn (k -> Multiplexed k (Maybe w)
forall k w. k -> Multiplexed k (Maybe w)
removePoolWorkerMessage k
this))
)
removeWorker ::
k ->
PoolWorker workerBox w ->
RIO m ()
removeWorker :: k -> PoolWorker workerBox w -> RIO m ()
removeWorker k
_k =
RIO m () -> RIO m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (RIO m () -> RIO m ())
-> (PoolWorker workerBox w -> RIO m ())
-> PoolWorker workerBox w
-> RIO m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Async () -> RIO m ()
forall (m :: * -> *) a. MonadIO m => Async a -> m ()
cancel (Async () -> RIO m ())
-> (PoolWorker workerBox w -> Async ())
-> PoolWorker workerBox w
-> RIO m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. PoolWorker workerBox w -> Async ()
forall workerBox w. PoolWorker workerBox w -> Async ()
poolWorkerAsync