{-# LANGUAGE Strict #-}
module RIO.ProcessPool.Broker
( spawnBroker,
BrokerConfig (..),
BrokerResult (..),
ResourceCreator,
Demultiplexer,
ResourceCleaner,
MessageHandler,
Multiplexed (..),
ResourceUpdate (..),
)
where
import qualified Data.Map.Strict as Map
import RIO
import UnliftIO.MessageBox
( IsMessageBox (Input, newInput, receive),
IsMessageBoxArg (MessageBox, newMessageBox),
)
spawnBroker ::
forall brokerBoxArg k w' w a m.
( HasLogFunc m,
Ord k,
Display k,
IsMessageBoxArg brokerBoxArg
) =>
brokerBoxArg ->
BrokerConfig k w' w a m ->
RIO
m
( Either
SomeException
( Input (MessageBox brokerBoxArg) w',
Async BrokerResult
)
)
spawnBroker :: brokerBoxArg
-> BrokerConfig k w' w a m
-> RIO
m
(Either
SomeException
(Input (MessageBox brokerBoxArg) w', Async BrokerResult))
spawnBroker brokerBoxArg
brokerBoxArg BrokerConfig k w' w a m
config = do
Async
(Either
SomeException
(Input (MessageBox brokerBoxArg) w', Async BrokerResult))
brokerA <- RIO
m
(Either
SomeException
(Input (MessageBox brokerBoxArg) w', Async BrokerResult))
-> RIO
m
(Async
(Either
SomeException
(Input (MessageBox brokerBoxArg) w', Async BrokerResult)))
forall (m :: * -> *) a. MonadUnliftIO m => m a -> m (Async a)
async (RIO
m
(Either
SomeException
(Input (MessageBox brokerBoxArg) w', Async BrokerResult))
-> RIO
m
(Async
(Either
SomeException
(Input (MessageBox brokerBoxArg) w', Async BrokerResult))))
-> RIO
m
(Either
SomeException
(Input (MessageBox brokerBoxArg) w', Async BrokerResult))
-> RIO
m
(Async
(Either
SomeException
(Input (MessageBox brokerBoxArg) w', Async BrokerResult)))
forall a b. (a -> b) -> a -> b
$ do
Either
SomeException
(MessageBox brokerBoxArg w', Input (MessageBox brokerBoxArg) w')
mBrokerBox <-
RIO
m (MessageBox brokerBoxArg w', Input (MessageBox brokerBoxArg) w')
-> RIO
m
(Either
SomeException
(MessageBox brokerBoxArg w', Input (MessageBox brokerBoxArg) w'))
forall (m :: * -> *) a.
MonadUnliftIO m =>
m a -> m (Either SomeException a)
tryAny
( do
MessageBox brokerBoxArg w'
b <- brokerBoxArg -> RIO m (MessageBox brokerBoxArg w')
forall argument (m :: * -> *) a.
(IsMessageBoxArg argument, MonadUnliftIO m) =>
argument -> m (MessageBox argument a)
newMessageBox brokerBoxArg
brokerBoxArg
Input (MessageBox brokerBoxArg) w'
i <- MessageBox brokerBoxArg w'
-> RIO m (Input (MessageBox brokerBoxArg) w')
forall (box :: * -> *) (m :: * -> *) a.
(IsMessageBox box, MonadUnliftIO m) =>
box a -> m (Input box a)
newInput MessageBox brokerBoxArg w'
b
(MessageBox brokerBoxArg w', Input (MessageBox brokerBoxArg) w')
-> RIO
m (MessageBox brokerBoxArg w', Input (MessageBox brokerBoxArg) w')
forall (m :: * -> *) a. Monad m => a -> m a
return (MessageBox brokerBoxArg w'
b, Input (MessageBox brokerBoxArg) w'
i)
)
case Either
SomeException
(MessageBox brokerBoxArg w', Input (MessageBox brokerBoxArg) w')
mBrokerBox of
Left SomeException
er -> Either
SomeException
(Input (MessageBox brokerBoxArg) w', Async BrokerResult)
-> RIO
m
(Either
SomeException
(Input (MessageBox brokerBoxArg) w', Async BrokerResult))
forall (m :: * -> *) a. Monad m => a -> m a
return (SomeException
-> Either
SomeException
(Input (MessageBox brokerBoxArg) w', Async BrokerResult)
forall a b. a -> Either a b
Left SomeException
er)
Right (MessageBox brokerBoxArg w'
brokerBox, Input (MessageBox brokerBoxArg) w'
brokerInp) -> do
Async BrokerResult
aInner <- RIO m (Async BrokerResult) -> RIO m (Async BrokerResult)
forall (m :: * -> *) a. MonadUnliftIO m => m a -> m a
mask_ (RIO m (Async BrokerResult) -> RIO m (Async BrokerResult))
-> RIO m (Async BrokerResult) -> RIO m (Async BrokerResult)
forall a b. (a -> b) -> a -> b
$
((forall b. RIO m b -> RIO m b) -> RIO m BrokerResult)
-> RIO m (Async BrokerResult)
forall (m :: * -> *) a.
MonadUnliftIO m =>
((forall b. m b -> m b) -> m a) -> m (Async a)
asyncWithUnmask (((forall b. RIO m b -> RIO m b) -> RIO m BrokerResult)
-> RIO m (Async BrokerResult))
-> ((forall b. RIO m b -> RIO m b) -> RIO m BrokerResult)
-> RIO m (Async BrokerResult)
forall a b. (a -> b) -> a -> b
$ \forall b. RIO m b -> RIO m b
unmaskInner ->
(forall b. RIO m b -> RIO m b)
-> MessageBox brokerBoxArg w'
-> BrokerConfig k w' w a m
-> BrokerState k a
-> RIO m BrokerResult
forall m k (msgBox :: * -> *) w' w a.
(HasLogFunc m, Ord k, Display k, IsMessageBox msgBox) =>
(forall x. RIO m x -> RIO m x)
-> msgBox w'
-> BrokerConfig k w' w a m
-> BrokerState k a
-> RIO m BrokerResult
brokerLoop forall b. RIO m b -> RIO m b
unmaskInner MessageBox brokerBoxArg w'
brokerBox BrokerConfig k w' w a m
config BrokerState k a
forall k a. Map k a
Map.empty
Either
SomeException
(Input (MessageBox brokerBoxArg) w', Async BrokerResult)
-> RIO
m
(Either
SomeException
(Input (MessageBox brokerBoxArg) w', Async BrokerResult))
forall (m :: * -> *) a. Monad m => a -> m a
return ((Input (MessageBox brokerBoxArg) w', Async BrokerResult)
-> Either
SomeException
(Input (MessageBox brokerBoxArg) w', Async BrokerResult)
forall a b. b -> Either a b
Right (Input (MessageBox brokerBoxArg) w'
brokerInp, Async BrokerResult
aInner))
Either
SomeException
(Either
SomeException
(Input (MessageBox brokerBoxArg) w', Async BrokerResult))
-> Either
SomeException
(Input (MessageBox brokerBoxArg) w', Async BrokerResult)
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (Either
SomeException
(Either
SomeException
(Input (MessageBox brokerBoxArg) w', Async BrokerResult))
-> Either
SomeException
(Input (MessageBox brokerBoxArg) w', Async BrokerResult))
-> RIO
m
(Either
SomeException
(Either
SomeException
(Input (MessageBox brokerBoxArg) w', Async BrokerResult)))
-> RIO
m
(Either
SomeException
(Input (MessageBox brokerBoxArg) w', Async BrokerResult))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Async
(Either
SomeException
(Input (MessageBox brokerBoxArg) w', Async BrokerResult))
-> RIO
m
(Either
SomeException
(Either
SomeException
(Input (MessageBox brokerBoxArg) w', Async BrokerResult)))
forall (m :: * -> *) a.
MonadIO m =>
Async a -> m (Either SomeException a)
waitCatch Async
(Either
SomeException
(Input (MessageBox brokerBoxArg) w', Async BrokerResult))
brokerA
data BrokerResult = MkBrokerResult
deriving stock (Int -> BrokerResult -> ShowS
[BrokerResult] -> ShowS
BrokerResult -> String
(Int -> BrokerResult -> ShowS)
-> (BrokerResult -> String)
-> ([BrokerResult] -> ShowS)
-> Show BrokerResult
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [BrokerResult] -> ShowS
$cshowList :: [BrokerResult] -> ShowS
show :: BrokerResult -> String
$cshow :: BrokerResult -> String
showsPrec :: Int -> BrokerResult -> ShowS
$cshowsPrec :: Int -> BrokerResult -> ShowS
Show, BrokerResult -> BrokerResult -> Bool
(BrokerResult -> BrokerResult -> Bool)
-> (BrokerResult -> BrokerResult -> Bool) -> Eq BrokerResult
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: BrokerResult -> BrokerResult -> Bool
$c/= :: BrokerResult -> BrokerResult -> Bool
== :: BrokerResult -> BrokerResult -> Bool
$c== :: BrokerResult -> BrokerResult -> Bool
Eq)
data BrokerConfig k w' w a m = MkBrokerConfig
{ BrokerConfig k w' w a m -> Demultiplexer w' k w
demultiplexer :: !(Demultiplexer w' k w),
BrokerConfig k w' w a m -> MessageHandler k w a m
messageDispatcher :: !(MessageHandler k w a m),
BrokerConfig k w' w a m -> ResourceCreator k w a m
resourceCreator :: !(ResourceCreator k w a m),
BrokerConfig k w' w a m -> ResourceCleaner k a m
resourceCleaner :: !(ResourceCleaner k a m)
}
type Demultiplexer w' k w = w' -> Multiplexed k w
type MessageHandler k w a m = k -> w -> a -> RIO m (ResourceUpdate a)
data ResourceUpdate a
=
KeepResource
|
UpdateResource a
|
RemoveResource !(Maybe a)
data Multiplexed k w
=
Initialize k !(Maybe w)
|
Dispatch k w
type ResourceCreator k w a m = k -> Maybe w -> RIO m a
type ResourceCleaner k a m = k -> a -> RIO m ()
type BrokerState k a = Map k a
{-# NOINLINE brokerLoop #-}
brokerLoop ::
( HasLogFunc m,
Ord k,
Display k,
IsMessageBox msgBox
) =>
(forall x. RIO m x -> RIO m x) ->
msgBox w' ->
BrokerConfig k w' w a m ->
BrokerState k a ->
RIO m BrokerResult
brokerLoop :: (forall x. RIO m x -> RIO m x)
-> msgBox w'
-> BrokerConfig k w' w a m
-> BrokerState k a
-> RIO m BrokerResult
brokerLoop forall x. RIO m x -> RIO m x
unmask msgBox w'
brokerBox BrokerConfig k w' w a m
config BrokerState k a
brokerState =
RIO m (Maybe (Either SomeException (BrokerState k a)))
-> (SomeException -> RIO m ())
-> RIO m (Maybe (Either SomeException (BrokerState k a)))
forall (m :: * -> *) e a b.
(MonadUnliftIO m, Exception e) =>
m a -> (e -> m b) -> m a
withException
( RIO m (Maybe w') -> RIO m (Maybe w')
forall x. RIO m x -> RIO m x
unmask (msgBox w' -> RIO m (Maybe w')
forall (box :: * -> *) (m :: * -> *) a.
(IsMessageBox box, MonadUnliftIO m) =>
box a -> m (Maybe a)
receive msgBox w'
brokerBox)
RIO m (Maybe w')
-> (Maybe w'
-> RIO m (Maybe (Either SomeException (BrokerState k a))))
-> RIO m (Maybe (Either SomeException (BrokerState k a)))
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (w' -> RIO m (Either SomeException (BrokerState k a)))
-> Maybe w'
-> RIO m (Maybe (Either SomeException (BrokerState k a)))
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
traverse (RIO m (BrokerState k a)
-> RIO m (Either SomeException (BrokerState k a))
forall (m :: * -> *) a.
MonadUnliftIO m =>
m a -> m (Either SomeException a)
tryAny (RIO m (BrokerState k a)
-> RIO m (Either SomeException (BrokerState k a)))
-> (w' -> RIO m (BrokerState k a))
-> w'
-> RIO m (Either SomeException (BrokerState k a))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (forall x. RIO m x -> RIO m x)
-> BrokerConfig k w' w a m
-> BrokerState k a
-> w'
-> RIO m (BrokerState k a)
forall k m w' w a.
(Ord k, HasLogFunc m, Display k) =>
(forall x. RIO m x -> RIO m x)
-> BrokerConfig k w' w a m
-> BrokerState k a
-> w'
-> RIO m (BrokerState k a)
onIncoming forall x. RIO m x -> RIO m x
unmask BrokerConfig k w' w a m
config BrokerState k a
brokerState)
)
( \(SomeException
ex :: SomeException) -> do
case SomeException -> Maybe AsyncCancelled
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
ex of
Just (AsyncCancelled
_cancelled :: AsyncCancelled) ->
Utf8Builder -> RIO m ()
forall (m :: * -> *) env.
(MonadIO m, MonadReader env m, HasLogFunc env, HasCallStack) =>
Utf8Builder -> m ()
logDebug Utf8Builder
"broker loop: cancelled"
Maybe AsyncCancelled
_ ->
Utf8Builder -> RIO m ()
forall (m :: * -> *) env.
(MonadIO m, MonadReader env m, HasLogFunc env, HasCallStack) =>
Utf8Builder -> m ()
logError
( Utf8Builder
"broker loop: exception while \
\receiving and dispatching messages: "
Utf8Builder -> Utf8Builder -> Utf8Builder
forall a. Semigroup a => a -> a -> a
<> SomeException -> Utf8Builder
forall a. Display a => a -> Utf8Builder
display SomeException
ex
)
BrokerConfig k w' w a m -> BrokerState k a -> RIO m ()
forall k w' w a m.
BrokerConfig k w' w a m -> BrokerState k a -> RIO m ()
cleanupAllResources BrokerConfig k w' w a m
config BrokerState k a
brokerState
)
RIO m (Maybe (Either SomeException (BrokerState k a)))
-> (Maybe (Either SomeException (BrokerState k a))
-> RIO m BrokerResult)
-> RIO m BrokerResult
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= RIO m BrokerResult
-> (Either SomeException (BrokerState k a) -> RIO m BrokerResult)
-> Maybe (Either SomeException (BrokerState k a))
-> RIO m BrokerResult
forall b a. b -> (a -> b) -> Maybe a -> b
maybe
( do
Utf8Builder -> RIO m ()
forall (m :: * -> *) env.
(MonadIO m, MonadReader env m, HasLogFunc env, HasCallStack) =>
Utf8Builder -> m ()
logError Utf8Builder
"broker loop: failed to receive next message"
BrokerConfig k w' w a m -> BrokerState k a -> RIO m ()
forall k w' w a m.
BrokerConfig k w' w a m -> BrokerState k a -> RIO m ()
cleanupAllResources BrokerConfig k w' w a m
config BrokerState k a
brokerState
BrokerResult -> RIO m BrokerResult
forall (m :: * -> *) a. Monad m => a -> m a
return BrokerResult
MkBrokerResult
)
( \Either SomeException (BrokerState k a)
res -> do
BrokerState k a
next <-
(SomeException -> RIO m (BrokerState k a))
-> (BrokerState k a -> RIO m (BrokerState k a))
-> Either SomeException (BrokerState k a)
-> RIO m (BrokerState k a)
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either
( \SomeException
err -> do
Utf8Builder -> RIO m ()
forall (m :: * -> *) env.
(MonadIO m, MonadReader env m, HasLogFunc env, HasCallStack) =>
Utf8Builder -> m ()
logWarn
( Utf8Builder
"broker loop: Handling the last message\
\ caused an exception:"
Utf8Builder -> Utf8Builder -> Utf8Builder
forall a. Semigroup a => a -> a -> a
<> SomeException -> Utf8Builder
forall a. Display a => a -> Utf8Builder
display SomeException
err
)
BrokerState k a -> RIO m (BrokerState k a)
forall (m :: * -> *) a. Monad m => a -> m a
return BrokerState k a
brokerState
)
BrokerState k a -> RIO m (BrokerState k a)
forall (m :: * -> *) a. Monad m => a -> m a
return
Either SomeException (BrokerState k a)
res
(forall x. RIO m x -> RIO m x)
-> msgBox w'
-> BrokerConfig k w' w a m
-> BrokerState k a
-> RIO m BrokerResult
forall m k (msgBox :: * -> *) w' w a.
(HasLogFunc m, Ord k, Display k, IsMessageBox msgBox) =>
(forall x. RIO m x -> RIO m x)
-> msgBox w'
-> BrokerConfig k w' w a m
-> BrokerState k a
-> RIO m BrokerResult
brokerLoop forall x. RIO m x -> RIO m x
unmask msgBox w'
brokerBox BrokerConfig k w' w a m
config BrokerState k a
next
)
{-# NOINLINE onIncoming #-}
onIncoming ::
(Ord k, HasLogFunc m, Display k) =>
(forall x. RIO m x -> RIO m x) ->
BrokerConfig k w' w a m ->
BrokerState k a ->
w' ->
RIO m (BrokerState k a)
onIncoming :: (forall x. RIO m x -> RIO m x)
-> BrokerConfig k w' w a m
-> BrokerState k a
-> w'
-> RIO m (BrokerState k a)
onIncoming forall x. RIO m x -> RIO m x
unmask BrokerConfig k w' w a m
config BrokerState k a
brokerState w'
w' =
case BrokerConfig k w' w a m -> Demultiplexer w' k w
forall k w' w a m. BrokerConfig k w' w a m -> Demultiplexer w' k w
demultiplexer BrokerConfig k w' w a m
config w'
w' of
Initialize k
k Maybe w
mw ->
(forall x. RIO m x -> RIO m x)
-> k
-> BrokerConfig k w' w a m
-> BrokerState k a
-> Maybe w
-> RIO m (BrokerState k a)
forall k m w' w a.
(Ord k, HasLogFunc m, Display k) =>
(forall x. RIO m x -> RIO m x)
-> k
-> BrokerConfig k w' w a m
-> BrokerState k a
-> Maybe w
-> RIO m (BrokerState k a)
onInitialize forall x. RIO m x -> RIO m x
unmask k
k BrokerConfig k w' w a m
config BrokerState k a
brokerState Maybe w
mw
Dispatch k
k w
w ->
(forall x. RIO m x -> RIO m x)
-> k
-> w
-> BrokerConfig k w' w a m
-> BrokerState k a
-> RIO m (BrokerState k a)
forall k m w w' a.
(Ord k, HasLogFunc m, Display k) =>
(forall x. RIO m x -> RIO m x)
-> k
-> w
-> BrokerConfig k w' w a m
-> BrokerState k a
-> RIO m (BrokerState k a)
onDispatch forall x. RIO m x -> RIO m x
unmask k
k w
w BrokerConfig k w' w a m
config BrokerState k a
brokerState
onInitialize ::
(Ord k, HasLogFunc m, Display k) =>
(forall x. RIO m x -> RIO m x) ->
k ->
BrokerConfig k w' w a m ->
BrokerState k a ->
Maybe w ->
RIO m (BrokerState k a)
onInitialize :: (forall x. RIO m x -> RIO m x)
-> k
-> BrokerConfig k w' w a m
-> BrokerState k a
-> Maybe w
-> RIO m (BrokerState k a)
onInitialize forall x. RIO m x -> RIO m x
unmask k
k BrokerConfig k w' w a m
config BrokerState k a
brokerState Maybe w
mw =
case k -> BrokerState k a -> Maybe a
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup k
k BrokerState k a
brokerState of
Just a
_ -> do
Utf8Builder -> RIO m ()
forall (m :: * -> *) env.
(MonadIO m, MonadReader env m, HasLogFunc env, HasCallStack) =>
Utf8Builder -> m ()
logError
( Utf8Builder
"cannot initialize a new worker, a worker with that ID exists: "
Utf8Builder -> Utf8Builder -> Utf8Builder
forall a. Semigroup a => a -> a -> a
<> k -> Utf8Builder
forall a. Display a => a -> Utf8Builder
display k
k
)
BrokerState k a -> RIO m (BrokerState k a)
forall (m :: * -> *) a. Monad m => a -> m a
return BrokerState k a
brokerState
Maybe a
Nothing ->
RIO m a -> RIO m (Either SomeException a)
forall (m :: * -> *) a.
MonadUnliftIO m =>
m a -> m (Either SomeException a)
tryAny (RIO m a -> RIO m a
forall x. RIO m x -> RIO m x
unmask (BrokerConfig k w' w a m -> ResourceCreator k w a m
forall k w' w a m.
BrokerConfig k w' w a m -> ResourceCreator k w a m
resourceCreator BrokerConfig k w' w a m
config k
k Maybe w
mw))
RIO m (Either SomeException a)
-> (Either SomeException a -> RIO m (BrokerState k a))
-> RIO m (BrokerState k a)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (SomeException -> RIO m (BrokerState k a))
-> (a -> RIO m (BrokerState k a))
-> Either SomeException a
-> RIO m (BrokerState k a)
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either
( \SomeException
err -> do
Utf8Builder -> RIO m ()
forall (m :: * -> *) env.
(MonadIO m, MonadReader env m, HasLogFunc env, HasCallStack) =>
Utf8Builder -> m ()
logError
( Utf8Builder
"the resource creator for worker "
Utf8Builder -> Utf8Builder -> Utf8Builder
forall a. Semigroup a => a -> a -> a
<> k -> Utf8Builder
forall a. Display a => a -> Utf8Builder
display k
k
Utf8Builder -> Utf8Builder -> Utf8Builder
forall a. Semigroup a => a -> a -> a
<> Utf8Builder
" threw an exception: "
Utf8Builder -> Utf8Builder -> Utf8Builder
forall a. Semigroup a => a -> a -> a
<> SomeException -> Utf8Builder
forall a. Display a => a -> Utf8Builder
display SomeException
err
)
BrokerState k a -> RIO m (BrokerState k a)
forall (m :: * -> *) a. Monad m => a -> m a
return BrokerState k a
brokerState
)
( \a
res ->
let brokerState1 :: BrokerState k a
brokerState1 = k -> a -> BrokerState k a -> BrokerState k a
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert k
k a
res BrokerState k a
brokerState
in case Maybe w
mw of
Maybe w
Nothing ->
BrokerState k a -> RIO m (BrokerState k a)
forall (m :: * -> *) a. Monad m => a -> m a
return BrokerState k a
brokerState1
Just w
w ->
RIO m (BrokerState k a) -> RIO m () -> RIO m (BrokerState k a)
forall (m :: * -> *) a b. MonadUnliftIO m => m a -> m b -> m a
onException
((forall x. RIO m x -> RIO m x)
-> k
-> w
-> BrokerConfig k w' w a m
-> BrokerState k a
-> RIO m (BrokerState k a)
forall k m w w' a.
(Ord k, HasLogFunc m, Display k) =>
(forall x. RIO m x -> RIO m x)
-> k
-> w
-> BrokerConfig k w' w a m
-> BrokerState k a
-> RIO m (BrokerState k a)
onDispatch forall x. RIO m x -> RIO m x
unmask k
k w
w BrokerConfig k w' w a m
config BrokerState k a
brokerState1)
( do
Utf8Builder -> RIO m ()
forall (m :: * -> *) env.
(MonadIO m, MonadReader env m, HasLogFunc env, HasCallStack) =>
Utf8Builder -> m ()
logError
( Utf8Builder
"exception while dispatching the "
Utf8Builder -> Utf8Builder -> Utf8Builder
forall a. Semigroup a => a -> a -> a
<> Utf8Builder
"post-initialization message for worker: "
Utf8Builder -> Utf8Builder -> Utf8Builder
forall a. Semigroup a => a -> a -> a
<> k -> Utf8Builder
forall a. Display a => a -> Utf8Builder
display k
k
)
BrokerConfig k w' w a m -> ResourceCleaner k a m
forall k w' w a m. BrokerConfig k w' w a m -> ResourceCleaner k a m
resourceCleaner BrokerConfig k w' w a m
config k
k a
res
)
)
onDispatch ::
(Ord k, HasLogFunc m, Display k) =>
(forall x. RIO m x -> RIO m x) ->
k ->
w ->
BrokerConfig k w' w a m ->
BrokerState k a ->
RIO m (BrokerState k a)
onDispatch :: (forall x. RIO m x -> RIO m x)
-> k
-> w
-> BrokerConfig k w' w a m
-> BrokerState k a
-> RIO m (BrokerState k a)
onDispatch forall x. RIO m x -> RIO m x
unmask k
k w
w BrokerConfig k w' w a m
config BrokerState k a
brokerState =
RIO m (BrokerState k a)
-> (a -> RIO m (BrokerState k a))
-> Maybe a
-> RIO m (BrokerState k a)
forall b a. b -> (a -> b) -> Maybe a -> b
maybe RIO m (BrokerState k a)
notFound a -> RIO m (BrokerState k a)
dispatch (k -> BrokerState k a -> Maybe a
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup k
k BrokerState k a
brokerState)
where
notFound :: RIO m (BrokerState k a)
notFound = do
Utf8Builder -> RIO m ()
forall (m :: * -> *) env.
(MonadIO m, MonadReader env m, HasLogFunc env, HasCallStack) =>
Utf8Builder -> m ()
logWarn
( Utf8Builder
"cannot dispatch message, worker not found: "
Utf8Builder -> Utf8Builder -> Utf8Builder
forall a. Semigroup a => a -> a -> a
<> k -> Utf8Builder
forall a. Display a => a -> Utf8Builder
display k
k
)
BrokerState k a -> RIO m (BrokerState k a)
forall (m :: * -> *) a. Monad m => a -> m a
return BrokerState k a
brokerState
dispatch :: a -> RIO m (BrokerState k a)
dispatch a
res =
RIO m (ResourceUpdate a)
-> RIO m (Either SomeException (ResourceUpdate a))
forall (m :: * -> *) a.
MonadUnliftIO m =>
m a -> m (Either SomeException a)
tryAny (RIO m (ResourceUpdate a) -> RIO m (ResourceUpdate a)
forall x. RIO m x -> RIO m x
unmask (BrokerConfig k w' w a m -> MessageHandler k w a m
forall k w' w a m.
BrokerConfig k w' w a m -> MessageHandler k w a m
messageDispatcher BrokerConfig k w' w a m
config k
k w
w a
res))
RIO m (Either SomeException (ResourceUpdate a))
-> (Either SomeException (ResourceUpdate a)
-> RIO m (BrokerState k a))
-> RIO m (BrokerState k a)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (SomeException -> RIO m (BrokerState k a))
-> (ResourceUpdate a -> RIO m (BrokerState k a))
-> Either SomeException (ResourceUpdate a)
-> RIO m (BrokerState k a)
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either
( \SomeException
err -> do
Utf8Builder -> RIO m ()
forall (m :: * -> *) env.
(MonadIO m, MonadReader env m, HasLogFunc env, HasCallStack) =>
Utf8Builder -> m ()
logError
( Utf8Builder
"the message dispatcher callback for worker "
Utf8Builder -> Utf8Builder -> Utf8Builder
forall a. Semigroup a => a -> a -> a
<> k -> Utf8Builder
forall a. Display a => a -> Utf8Builder
display k
k
Utf8Builder -> Utf8Builder -> Utf8Builder
forall a. Semigroup a => a -> a -> a
<> Utf8Builder
" threw: "
Utf8Builder -> Utf8Builder -> Utf8Builder
forall a. Semigroup a => a -> a -> a
<> SomeException -> Utf8Builder
forall a. Display a => a -> Utf8Builder
display SomeException
err
)
k
-> BrokerConfig k w' w a m
-> BrokerState k a
-> RIO m (BrokerState k a)
forall k w' w a m.
Ord k =>
k -> BrokerConfig k w' w a m -> Map k a -> RIO m (Map k a)
cleanupResource
k
k
BrokerConfig k w' w a m
config
BrokerState k a
brokerState
)
( \case
ResourceUpdate a
KeepResource ->
BrokerState k a -> RIO m (BrokerState k a)
forall (m :: * -> *) a. Monad m => a -> m a
return BrokerState k a
brokerState
UpdateResource a
newRes ->
BrokerState k a -> RIO m (BrokerState k a)
forall (m :: * -> *) a. Monad m => a -> m a
return (k -> a -> BrokerState k a -> BrokerState k a
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert k
k a
newRes BrokerState k a
brokerState)
RemoveResource Maybe a
mNewRes ->
k
-> BrokerConfig k w' w a m
-> BrokerState k a
-> RIO m (BrokerState k a)
forall k w' w a m.
Ord k =>
k -> BrokerConfig k w' w a m -> Map k a -> RIO m (Map k a)
cleanupResource
k
k
BrokerConfig k w' w a m
config
( BrokerState k a
-> (a -> BrokerState k a) -> Maybe a -> BrokerState k a
forall b a. b -> (a -> b) -> Maybe a -> b
maybe
BrokerState k a
brokerState
( \a
newRes ->
k -> a -> BrokerState k a -> BrokerState k a
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert k
k a
newRes BrokerState k a
brokerState
)
Maybe a
mNewRes
)
)
cleanupAllResources ::
BrokerConfig k w' w a m ->
BrokerState k a ->
RIO m ()
cleanupAllResources :: BrokerConfig k w' w a m -> BrokerState k a -> RIO m ()
cleanupAllResources BrokerConfig k w' w a m
config BrokerState k a
brokerState =
((k, a) -> RIO m ()) -> [(k, a)] -> RIO m ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_
( (k -> a -> RIO m ()) -> (k, a) -> RIO m ()
forall a b c. (a -> b -> c) -> (a, b) -> c
uncurry
(BrokerConfig k w' w a m -> k -> a -> RIO m ()
forall k w' w a m. BrokerConfig k w' w a m -> ResourceCleaner k a m
tryResourceCleaner BrokerConfig k w' w a m
config)
)
(BrokerState k a -> [(k, a)]
forall k a. Map k a -> [(k, a)]
Map.assocs BrokerState k a
brokerState)
cleanupResource ::
(Ord k) =>
k ->
BrokerConfig k w' w a m ->
Map k a ->
RIO m (Map k a)
cleanupResource :: k -> BrokerConfig k w' w a m -> Map k a -> RIO m (Map k a)
cleanupResource k
k BrokerConfig k w' w a m
config Map k a
brokerState = do
(a -> RIO m ()) -> Maybe a -> RIO m ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ (BrokerConfig k w' w a m -> k -> a -> RIO m ()
forall k w' w a m. BrokerConfig k w' w a m -> ResourceCleaner k a m
tryResourceCleaner BrokerConfig k w' w a m
config k
k) (k -> Map k a -> Maybe a
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup k
k Map k a
brokerState)
Map k a -> RIO m (Map k a)
forall (m :: * -> *) a. Monad m => a -> m a
return (k -> Map k a -> Map k a
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete k
k Map k a
brokerState)
tryResourceCleaner ::
BrokerConfig k w' w a m ->
k ->
a ->
RIO m ()
tryResourceCleaner :: BrokerConfig k w' w a m -> k -> a -> RIO m ()
tryResourceCleaner BrokerConfig k w' w a m
config k
k a
res = do
RIO m (Either SomeException ()) -> RIO m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (RIO m (Either SomeException ()) -> RIO m ())
-> RIO m (Either SomeException ()) -> RIO m ()
forall a b. (a -> b) -> a -> b
$ RIO m () -> RIO m (Either SomeException ())
forall (m :: * -> *) a.
MonadUnliftIO m =>
m a -> m (Either SomeException a)
tryAny (BrokerConfig k w' w a m -> k -> a -> RIO m ()
forall k w' w a m. BrokerConfig k w' w a m -> ResourceCleaner k a m
resourceCleaner BrokerConfig k w' w a m
config k
k a
res)