{-# LANGUAGE Strict #-}

-- | A broker extracts a /key/ value from incoming messages
--  and creates, keeps and destroys a /resource/ for each key.
--
-- The demultiplexed messages and their resources are passed to
-- a custom 'MessageHandler'/
--
-- The user provides a 'Demultiplexer' is a pure function that
-- returns a key for the resource associated
-- to the message and potientially changes the
-- message.
--
-- The demultiplexer may also return a value indicating that
-- a new resource must be created, or that a message
-- shall be ignored.
--
-- The broker is run in a seperate process using 'async'.
-- The usual way to stop a broker is to 'cancel' it.
--
-- When cancelling a broker, the resource cleanup
-- actions for all resources will be called with
-- async exceptions masked.
--
-- In order to prevent the resource map filling up with
-- /dead/ resources, the user of this module has to ensure
-- that whenever a resource is not required anymore, a message
-- will be sent to the broker, that will cause the 'MessageHandler'
-- to be executed for the resource, which will in turn return,
-- return 'RemoveResource'.
module RIO.ProcessPool.Broker
  ( spawnBroker,
    BrokerConfig (..),
    BrokerResult (..),
    ResourceCreator,
    Demultiplexer,
    ResourceCleaner,
    MessageHandler,
    Multiplexed (..),
    ResourceUpdate (..),
  )
where

import qualified Data.Map.Strict as Map
import RIO
import UnliftIO.MessageBox
  ( IsMessageBox (Input, newInput, receive),
    IsMessageBoxArg (MessageBox, newMessageBox),
  )

-- | Spawn a broker with a new 'MessageBox',
--  and return its message 'Input' channel as well as
-- the 'Async' handle of the spawned process, needed to
-- stop the broker process.
--
-- * @k@ is the /key/ for the resource associated to an incoming
--   message
-- * @w'@ is the type of incoming messages.
-- * @w@ is the type of the demultiplexed messages.
-- * @a@ specifies the resource type.
-- * @m@ is the base monad
spawnBroker ::
  forall brokerBoxArg k w' w a m.
  ( HasLogFunc m,
    Ord k,
    Display k,
    IsMessageBoxArg brokerBoxArg
  ) =>
  brokerBoxArg ->
  BrokerConfig k w' w a m ->
  RIO
    m
    ( Either
        SomeException
        ( Input (MessageBox brokerBoxArg) w',
          Async BrokerResult
        )
    )
spawnBroker :: brokerBoxArg
-> BrokerConfig k w' w a m
-> RIO
     m
     (Either
        SomeException
        (Input (MessageBox brokerBoxArg) w', Async BrokerResult))
spawnBroker brokerBoxArg
brokerBoxArg BrokerConfig k w' w a m
config = do
  Async
  (Either
     SomeException
     (Input (MessageBox brokerBoxArg) w', Async BrokerResult))
brokerA <- RIO
  m
  (Either
     SomeException
     (Input (MessageBox brokerBoxArg) w', Async BrokerResult))
-> RIO
     m
     (Async
        (Either
           SomeException
           (Input (MessageBox brokerBoxArg) w', Async BrokerResult)))
forall (m :: * -> *) a. MonadUnliftIO m => m a -> m (Async a)
async (RIO
   m
   (Either
      SomeException
      (Input (MessageBox brokerBoxArg) w', Async BrokerResult))
 -> RIO
      m
      (Async
         (Either
            SomeException
            (Input (MessageBox brokerBoxArg) w', Async BrokerResult))))
-> RIO
     m
     (Either
        SomeException
        (Input (MessageBox brokerBoxArg) w', Async BrokerResult))
-> RIO
     m
     (Async
        (Either
           SomeException
           (Input (MessageBox brokerBoxArg) w', Async BrokerResult)))
forall a b. (a -> b) -> a -> b
$ do
    Either
  SomeException
  (MessageBox brokerBoxArg w', Input (MessageBox brokerBoxArg) w')
mBrokerBox <-
      RIO
  m (MessageBox brokerBoxArg w', Input (MessageBox brokerBoxArg) w')
-> RIO
     m
     (Either
        SomeException
        (MessageBox brokerBoxArg w', Input (MessageBox brokerBoxArg) w'))
forall (m :: * -> *) a.
MonadUnliftIO m =>
m a -> m (Either SomeException a)
tryAny
        ( do
            MessageBox brokerBoxArg w'
b <- brokerBoxArg -> RIO m (MessageBox brokerBoxArg w')
forall argument (m :: * -> *) a.
(IsMessageBoxArg argument, MonadUnliftIO m) =>
argument -> m (MessageBox argument a)
newMessageBox brokerBoxArg
brokerBoxArg
            Input (MessageBox brokerBoxArg) w'
i <- MessageBox brokerBoxArg w'
-> RIO m (Input (MessageBox brokerBoxArg) w')
forall (box :: * -> *) (m :: * -> *) a.
(IsMessageBox box, MonadUnliftIO m) =>
box a -> m (Input box a)
newInput MessageBox brokerBoxArg w'
b
            (MessageBox brokerBoxArg w', Input (MessageBox brokerBoxArg) w')
-> RIO
     m (MessageBox brokerBoxArg w', Input (MessageBox brokerBoxArg) w')
forall (m :: * -> *) a. Monad m => a -> m a
return (MessageBox brokerBoxArg w'
b, Input (MessageBox brokerBoxArg) w'
i)
        )
    case Either
  SomeException
  (MessageBox brokerBoxArg w', Input (MessageBox brokerBoxArg) w')
mBrokerBox of
      Left SomeException
er -> Either
  SomeException
  (Input (MessageBox brokerBoxArg) w', Async BrokerResult)
-> RIO
     m
     (Either
        SomeException
        (Input (MessageBox brokerBoxArg) w', Async BrokerResult))
forall (m :: * -> *) a. Monad m => a -> m a
return (SomeException
-> Either
     SomeException
     (Input (MessageBox brokerBoxArg) w', Async BrokerResult)
forall a b. a -> Either a b
Left SomeException
er)
      Right (MessageBox brokerBoxArg w'
brokerBox, Input (MessageBox brokerBoxArg) w'
brokerInp) -> do
        Async BrokerResult
aInner <- RIO m (Async BrokerResult) -> RIO m (Async BrokerResult)
forall (m :: * -> *) a. MonadUnliftIO m => m a -> m a
mask_ (RIO m (Async BrokerResult) -> RIO m (Async BrokerResult))
-> RIO m (Async BrokerResult) -> RIO m (Async BrokerResult)
forall a b. (a -> b) -> a -> b
$
          ((forall b. RIO m b -> RIO m b) -> RIO m BrokerResult)
-> RIO m (Async BrokerResult)
forall (m :: * -> *) a.
MonadUnliftIO m =>
((forall b. m b -> m b) -> m a) -> m (Async a)
asyncWithUnmask (((forall b. RIO m b -> RIO m b) -> RIO m BrokerResult)
 -> RIO m (Async BrokerResult))
-> ((forall b. RIO m b -> RIO m b) -> RIO m BrokerResult)
-> RIO m (Async BrokerResult)
forall a b. (a -> b) -> a -> b
$ \forall b. RIO m b -> RIO m b
unmaskInner ->
            (forall b. RIO m b -> RIO m b)
-> MessageBox brokerBoxArg w'
-> BrokerConfig k w' w a m
-> BrokerState k a
-> RIO m BrokerResult
forall m k (msgBox :: * -> *) w' w a.
(HasLogFunc m, Ord k, Display k, IsMessageBox msgBox) =>
(forall x. RIO m x -> RIO m x)
-> msgBox w'
-> BrokerConfig k w' w a m
-> BrokerState k a
-> RIO m BrokerResult
brokerLoop forall b. RIO m b -> RIO m b
unmaskInner MessageBox brokerBoxArg w'
brokerBox BrokerConfig k w' w a m
config BrokerState k a
forall k a. Map k a
Map.empty
        Either
  SomeException
  (Input (MessageBox brokerBoxArg) w', Async BrokerResult)
-> RIO
     m
     (Either
        SomeException
        (Input (MessageBox brokerBoxArg) w', Async BrokerResult))
forall (m :: * -> *) a. Monad m => a -> m a
return ((Input (MessageBox brokerBoxArg) w', Async BrokerResult)
-> Either
     SomeException
     (Input (MessageBox brokerBoxArg) w', Async BrokerResult)
forall a b. b -> Either a b
Right (Input (MessageBox brokerBoxArg) w'
brokerInp, Async BrokerResult
aInner))
  Either
  SomeException
  (Either
     SomeException
     (Input (MessageBox brokerBoxArg) w', Async BrokerResult))
-> Either
     SomeException
     (Input (MessageBox brokerBoxArg) w', Async BrokerResult)
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (Either
   SomeException
   (Either
      SomeException
      (Input (MessageBox brokerBoxArg) w', Async BrokerResult))
 -> Either
      SomeException
      (Input (MessageBox brokerBoxArg) w', Async BrokerResult))
-> RIO
     m
     (Either
        SomeException
        (Either
           SomeException
           (Input (MessageBox brokerBoxArg) w', Async BrokerResult)))
-> RIO
     m
     (Either
        SomeException
        (Input (MessageBox brokerBoxArg) w', Async BrokerResult))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Async
  (Either
     SomeException
     (Input (MessageBox brokerBoxArg) w', Async BrokerResult))
-> RIO
     m
     (Either
        SomeException
        (Either
           SomeException
           (Input (MessageBox brokerBoxArg) w', Async BrokerResult)))
forall (m :: * -> *) a.
MonadIO m =>
Async a -> m (Either SomeException a)
waitCatch Async
  (Either
     SomeException
     (Input (MessageBox brokerBoxArg) w', Async BrokerResult))
brokerA

-- | This is just what the 'Async' returned from
-- 'spawnBroker' returns, it's current purpose is to
-- make code easier to read.
--
-- Instead of some @Async ()@ that could be anything,
-- there is @Async BrokerResult@.
data BrokerResult = MkBrokerResult
  deriving stock (Int -> BrokerResult -> ShowS
[BrokerResult] -> ShowS
BrokerResult -> String
(Int -> BrokerResult -> ShowS)
-> (BrokerResult -> String)
-> ([BrokerResult] -> ShowS)
-> Show BrokerResult
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [BrokerResult] -> ShowS
$cshowList :: [BrokerResult] -> ShowS
show :: BrokerResult -> String
$cshow :: BrokerResult -> String
showsPrec :: Int -> BrokerResult -> ShowS
$cshowsPrec :: Int -> BrokerResult -> ShowS
Show, BrokerResult -> BrokerResult -> Bool
(BrokerResult -> BrokerResult -> Bool)
-> (BrokerResult -> BrokerResult -> Bool) -> Eq BrokerResult
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: BrokerResult -> BrokerResult -> Bool
$c/= :: BrokerResult -> BrokerResult -> Bool
== :: BrokerResult -> BrokerResult -> Bool
$c== :: BrokerResult -> BrokerResult -> Bool
Eq)

-- | The broker configuration, used by 'spawnBroker'.
--
-- * @k@ is the /key/ for the resource associated to an incoming
--   message
-- * @w'@ is the type of incoming messages.
-- * @w@ is the type of the demultiplexed messages.
-- * @a@ specifies the resource type.
-- * @m@ is the base monad
data BrokerConfig k w' w a m = MkBrokerConfig
  { BrokerConfig k w' w a m -> Demultiplexer w' k w
demultiplexer :: !(Demultiplexer w' k w),
    BrokerConfig k w' w a m -> MessageHandler k w a m
messageDispatcher :: !(MessageHandler k w a m),
    BrokerConfig k w' w a m -> ResourceCreator k w a m
resourceCreator :: !(ResourceCreator k w a m),
    BrokerConfig k w' w a m -> ResourceCleaner k a m
resourceCleaner :: !(ResourceCleaner k a m)
  }

-- | User supplied callback to extract the key and the 'Multiplexed'
--  from a message.
--  (Sync-) Exceptions thrown from this function are caught and lead
--  to dropping of the incoming message, while the broker continues.
--
-- * @k@ is the /key/ for the resource associated to an incoming
--   message
-- * @w'@ is the type of incoming messages.
-- * @w@ is the type of the demultiplexed messages.
type Demultiplexer w' k w = w' -> Multiplexed k w

-- | User supplied callback to use the 'Multiplexed' message and
--  the associated resource.
--  (Sync-) Exceptions thrown from this function are caught and lead
--  to immediate cleanup of the resource but the broker continues.
--
-- * Type @k@ is the /key/ for the resource associated to an incoming
--   message
-- * Type @w@ is the type of incoming, demultiplexed, messages.
-- * Type @a@ specifies the resource type.
-- * Type @m@ is the base monad
type MessageHandler k w a m = k -> w -> a -> RIO m (ResourceUpdate a)

-- | This value indicates in what state a worker is in after the
--  'MessageHandler' action was executed.
data ResourceUpdate a
  = -- | The resources is still required.
    KeepResource
  | -- | The resource is still required but must be updated.
    UpdateResource a
  | -- | The resource is obsolete and can
    --   be removed from the broker.
    --   The broker will call 'ResourceCleaner' either
    --   on the current, or an updated resource value.
    RemoveResource !(Maybe a)

-- | The action that the broker has to take for in incoming message.
--
-- * @k@ is the /key/ for the resource associated to an incoming
--   message
-- * @w@ is the type of the demultiplexed messages.
data Multiplexed k w
  = -- | The message is an initialization message, that requires the
    --   creation of a new resouce for the given key.
    --   When the resource is created, then /maybe/ additionally
    --   a message will also be dispatched.
    Initialize k !(Maybe w)
  | -- | Dispatch a message using an existing resource.
    -- Silently ignore if no resource for the key exists.
    Dispatch k w

-- deriving stock (Show)

-- | User supplied callback to create and initialize a resource.
--  (Sync-) Exceptions thrown from this function are caught,
--  and the broker continues.
--
-- * @k@ is the /key/ for the resource associated to an incoming
--   message
-- * @w@ is the type of the demultiplexed messages.
-- * @a@ specifies the resource type.
-- * @m@ is the monad of the returned action.
type ResourceCreator k w a m = k -> Maybe w -> RIO m a

-- | User supplied callback called _with exceptions masked_
-- when the 'MessageHandler' returns 'RemoveResource'
-- (Sync-) Exceptions thrown from this function are caught,
-- and do not prevent the removal of the resource, also the
-- broker continues.
--
-- * @k@ is the /key/ for the resource associated to an incoming
--   message
-- * @a@ specifies the resource type.
-- * @m@ is the monad of the returned action.
type ResourceCleaner k a m = k -> a -> RIO m ()

type BrokerState k a = Map k a

{-# NOINLINE brokerLoop #-}
brokerLoop ::
  ( HasLogFunc m,
    Ord k,
    Display k,
    IsMessageBox msgBox
  ) =>
  (forall x. RIO m x -> RIO m x) ->
  msgBox w' ->
  BrokerConfig k w' w a m ->
  BrokerState k a ->
  RIO m BrokerResult
brokerLoop :: (forall x. RIO m x -> RIO m x)
-> msgBox w'
-> BrokerConfig k w' w a m
-> BrokerState k a
-> RIO m BrokerResult
brokerLoop forall x. RIO m x -> RIO m x
unmask msgBox w'
brokerBox BrokerConfig k w' w a m
config BrokerState k a
brokerState =
  RIO m (Maybe (Either SomeException (BrokerState k a)))
-> (SomeException -> RIO m ())
-> RIO m (Maybe (Either SomeException (BrokerState k a)))
forall (m :: * -> *) e a b.
(MonadUnliftIO m, Exception e) =>
m a -> (e -> m b) -> m a
withException
    ( RIO m (Maybe w') -> RIO m (Maybe w')
forall x. RIO m x -> RIO m x
unmask (msgBox w' -> RIO m (Maybe w')
forall (box :: * -> *) (m :: * -> *) a.
(IsMessageBox box, MonadUnliftIO m) =>
box a -> m (Maybe a)
receive msgBox w'
brokerBox)
        RIO m (Maybe w')
-> (Maybe w'
    -> RIO m (Maybe (Either SomeException (BrokerState k a))))
-> RIO m (Maybe (Either SomeException (BrokerState k a)))
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (w' -> RIO m (Either SomeException (BrokerState k a)))
-> Maybe w'
-> RIO m (Maybe (Either SomeException (BrokerState k a)))
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
traverse (RIO m (BrokerState k a)
-> RIO m (Either SomeException (BrokerState k a))
forall (m :: * -> *) a.
MonadUnliftIO m =>
m a -> m (Either SomeException a)
tryAny (RIO m (BrokerState k a)
 -> RIO m (Either SomeException (BrokerState k a)))
-> (w' -> RIO m (BrokerState k a))
-> w'
-> RIO m (Either SomeException (BrokerState k a))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (forall x. RIO m x -> RIO m x)
-> BrokerConfig k w' w a m
-> BrokerState k a
-> w'
-> RIO m (BrokerState k a)
forall k m w' w a.
(Ord k, HasLogFunc m, Display k) =>
(forall x. RIO m x -> RIO m x)
-> BrokerConfig k w' w a m
-> BrokerState k a
-> w'
-> RIO m (BrokerState k a)
onIncoming forall x. RIO m x -> RIO m x
unmask BrokerConfig k w' w a m
config BrokerState k a
brokerState)
    )
    ( \(SomeException
ex :: SomeException) -> do      
        case SomeException -> Maybe AsyncCancelled
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
ex of
          Just (AsyncCancelled
_cancelled :: AsyncCancelled) ->
            Utf8Builder -> RIO m ()
forall (m :: * -> *) env.
(MonadIO m, MonadReader env m, HasLogFunc env, HasCallStack) =>
Utf8Builder -> m ()
logDebug Utf8Builder
"broker loop: cancelled"
          Maybe AsyncCancelled
_ ->
            Utf8Builder -> RIO m ()
forall (m :: * -> *) env.
(MonadIO m, MonadReader env m, HasLogFunc env, HasCallStack) =>
Utf8Builder -> m ()
logError
              ( Utf8Builder
"broker loop: exception while \
                \receiving and dispatching messages: "
                  Utf8Builder -> Utf8Builder -> Utf8Builder
forall a. Semigroup a => a -> a -> a
<> SomeException -> Utf8Builder
forall a. Display a => a -> Utf8Builder
display SomeException
ex
              )
        BrokerConfig k w' w a m -> BrokerState k a -> RIO m ()
forall k w' w a m.
BrokerConfig k w' w a m -> BrokerState k a -> RIO m ()
cleanupAllResources BrokerConfig k w' w a m
config BrokerState k a
brokerState
    )
    RIO m (Maybe (Either SomeException (BrokerState k a)))
-> (Maybe (Either SomeException (BrokerState k a))
    -> RIO m BrokerResult)
-> RIO m BrokerResult
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= RIO m BrokerResult
-> (Either SomeException (BrokerState k a) -> RIO m BrokerResult)
-> Maybe (Either SomeException (BrokerState k a))
-> RIO m BrokerResult
forall b a. b -> (a -> b) -> Maybe a -> b
maybe
      ( do
          Utf8Builder -> RIO m ()
forall (m :: * -> *) env.
(MonadIO m, MonadReader env m, HasLogFunc env, HasCallStack) =>
Utf8Builder -> m ()
logError Utf8Builder
"broker loop: failed to receive next message"
          BrokerConfig k w' w a m -> BrokerState k a -> RIO m ()
forall k w' w a m.
BrokerConfig k w' w a m -> BrokerState k a -> RIO m ()
cleanupAllResources BrokerConfig k w' w a m
config BrokerState k a
brokerState
          BrokerResult -> RIO m BrokerResult
forall (m :: * -> *) a. Monad m => a -> m a
return BrokerResult
MkBrokerResult
      )
      ( \Either SomeException (BrokerState k a)
res -> do
          BrokerState k a
next <-
            (SomeException -> RIO m (BrokerState k a))
-> (BrokerState k a -> RIO m (BrokerState k a))
-> Either SomeException (BrokerState k a)
-> RIO m (BrokerState k a)
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either
              ( \SomeException
err -> do
                  Utf8Builder -> RIO m ()
forall (m :: * -> *) env.
(MonadIO m, MonadReader env m, HasLogFunc env, HasCallStack) =>
Utf8Builder -> m ()
logWarn
                    ( Utf8Builder
"broker loop: Handling the last message\
                      \ caused an exception:"
                        Utf8Builder -> Utf8Builder -> Utf8Builder
forall a. Semigroup a => a -> a -> a
<> SomeException -> Utf8Builder
forall a. Display a => a -> Utf8Builder
display SomeException
err
                    )
                  BrokerState k a -> RIO m (BrokerState k a)
forall (m :: * -> *) a. Monad m => a -> m a
return BrokerState k a
brokerState
              )
              BrokerState k a -> RIO m (BrokerState k a)
forall (m :: * -> *) a. Monad m => a -> m a
return
              Either SomeException (BrokerState k a)
res
          (forall x. RIO m x -> RIO m x)
-> msgBox w'
-> BrokerConfig k w' w a m
-> BrokerState k a
-> RIO m BrokerResult
forall m k (msgBox :: * -> *) w' w a.
(HasLogFunc m, Ord k, Display k, IsMessageBox msgBox) =>
(forall x. RIO m x -> RIO m x)
-> msgBox w'
-> BrokerConfig k w' w a m
-> BrokerState k a
-> RIO m BrokerResult
brokerLoop forall x. RIO m x -> RIO m x
unmask msgBox w'
brokerBox BrokerConfig k w' w a m
config BrokerState k a
next
      )

{-# NOINLINE onIncoming #-}
onIncoming ::
  (Ord k, HasLogFunc m, Display k) =>
  (forall x. RIO m x -> RIO m x) ->
  BrokerConfig k w' w a m ->
  BrokerState k a ->
  w' ->
  RIO m (BrokerState k a)
onIncoming :: (forall x. RIO m x -> RIO m x)
-> BrokerConfig k w' w a m
-> BrokerState k a
-> w'
-> RIO m (BrokerState k a)
onIncoming forall x. RIO m x -> RIO m x
unmask BrokerConfig k w' w a m
config BrokerState k a
brokerState w'
w' =
  case BrokerConfig k w' w a m -> Demultiplexer w' k w
forall k w' w a m. BrokerConfig k w' w a m -> Demultiplexer w' k w
demultiplexer BrokerConfig k w' w a m
config w'
w' of
    Initialize k
k Maybe w
mw ->
      (forall x. RIO m x -> RIO m x)
-> k
-> BrokerConfig k w' w a m
-> BrokerState k a
-> Maybe w
-> RIO m (BrokerState k a)
forall k m w' w a.
(Ord k, HasLogFunc m, Display k) =>
(forall x. RIO m x -> RIO m x)
-> k
-> BrokerConfig k w' w a m
-> BrokerState k a
-> Maybe w
-> RIO m (BrokerState k a)
onInitialize forall x. RIO m x -> RIO m x
unmask k
k BrokerConfig k w' w a m
config BrokerState k a
brokerState Maybe w
mw
    Dispatch k
k w
w ->
      (forall x. RIO m x -> RIO m x)
-> k
-> w
-> BrokerConfig k w' w a m
-> BrokerState k a
-> RIO m (BrokerState k a)
forall k m w w' a.
(Ord k, HasLogFunc m, Display k) =>
(forall x. RIO m x -> RIO m x)
-> k
-> w
-> BrokerConfig k w' w a m
-> BrokerState k a
-> RIO m (BrokerState k a)
onDispatch forall x. RIO m x -> RIO m x
unmask k
k w
w BrokerConfig k w' w a m
config BrokerState k a
brokerState

onInitialize ::
  (Ord k, HasLogFunc m, Display k) =>
  (forall x. RIO m x -> RIO m x) ->
  k ->
  BrokerConfig k w' w a m ->
  BrokerState k a ->
  Maybe w ->
  RIO m (BrokerState k a)
onInitialize :: (forall x. RIO m x -> RIO m x)
-> k
-> BrokerConfig k w' w a m
-> BrokerState k a
-> Maybe w
-> RIO m (BrokerState k a)
onInitialize forall x. RIO m x -> RIO m x
unmask k
k BrokerConfig k w' w a m
config BrokerState k a
brokerState Maybe w
mw =
  case k -> BrokerState k a -> Maybe a
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup k
k BrokerState k a
brokerState of
    Just a
_ -> do
      Utf8Builder -> RIO m ()
forall (m :: * -> *) env.
(MonadIO m, MonadReader env m, HasLogFunc env, HasCallStack) =>
Utf8Builder -> m ()
logError
        ( Utf8Builder
"cannot initialize a new worker, a worker with that ID exists: "
            Utf8Builder -> Utf8Builder -> Utf8Builder
forall a. Semigroup a => a -> a -> a
<> k -> Utf8Builder
forall a. Display a => a -> Utf8Builder
display k
k
        )
      BrokerState k a -> RIO m (BrokerState k a)
forall (m :: * -> *) a. Monad m => a -> m a
return BrokerState k a
brokerState
    Maybe a
Nothing ->
      RIO m a -> RIO m (Either SomeException a)
forall (m :: * -> *) a.
MonadUnliftIO m =>
m a -> m (Either SomeException a)
tryAny (RIO m a -> RIO m a
forall x. RIO m x -> RIO m x
unmask (BrokerConfig k w' w a m -> ResourceCreator k w a m
forall k w' w a m.
BrokerConfig k w' w a m -> ResourceCreator k w a m
resourceCreator BrokerConfig k w' w a m
config k
k Maybe w
mw))
        RIO m (Either SomeException a)
-> (Either SomeException a -> RIO m (BrokerState k a))
-> RIO m (BrokerState k a)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (SomeException -> RIO m (BrokerState k a))
-> (a -> RIO m (BrokerState k a))
-> Either SomeException a
-> RIO m (BrokerState k a)
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either
          ( \SomeException
err -> do
              Utf8Builder -> RIO m ()
forall (m :: * -> *) env.
(MonadIO m, MonadReader env m, HasLogFunc env, HasCallStack) =>
Utf8Builder -> m ()
logError
                ( Utf8Builder
"the resource creator for worker "
                    Utf8Builder -> Utf8Builder -> Utf8Builder
forall a. Semigroup a => a -> a -> a
<> k -> Utf8Builder
forall a. Display a => a -> Utf8Builder
display k
k
                    Utf8Builder -> Utf8Builder -> Utf8Builder
forall a. Semigroup a => a -> a -> a
<> Utf8Builder
" threw an exception: "
                    Utf8Builder -> Utf8Builder -> Utf8Builder
forall a. Semigroup a => a -> a -> a
<> SomeException -> Utf8Builder
forall a. Display a => a -> Utf8Builder
display SomeException
err
                )
              BrokerState k a -> RIO m (BrokerState k a)
forall (m :: * -> *) a. Monad m => a -> m a
return BrokerState k a
brokerState
          )
          ( \a
res ->
              let brokerState1 :: BrokerState k a
brokerState1 = k -> a -> BrokerState k a -> BrokerState k a
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert k
k a
res BrokerState k a
brokerState
               in case Maybe w
mw of
                    Maybe w
Nothing ->
                      BrokerState k a -> RIO m (BrokerState k a)
forall (m :: * -> *) a. Monad m => a -> m a
return BrokerState k a
brokerState1
                    Just w
w ->
                      RIO m (BrokerState k a) -> RIO m () -> RIO m (BrokerState k a)
forall (m :: * -> *) a b. MonadUnliftIO m => m a -> m b -> m a
onException
                        ((forall x. RIO m x -> RIO m x)
-> k
-> w
-> BrokerConfig k w' w a m
-> BrokerState k a
-> RIO m (BrokerState k a)
forall k m w w' a.
(Ord k, HasLogFunc m, Display k) =>
(forall x. RIO m x -> RIO m x)
-> k
-> w
-> BrokerConfig k w' w a m
-> BrokerState k a
-> RIO m (BrokerState k a)
onDispatch forall x. RIO m x -> RIO m x
unmask k
k w
w BrokerConfig k w' w a m
config BrokerState k a
brokerState1)
                        ( do
                            Utf8Builder -> RIO m ()
forall (m :: * -> *) env.
(MonadIO m, MonadReader env m, HasLogFunc env, HasCallStack) =>
Utf8Builder -> m ()
logError
                              ( Utf8Builder
"exception while dispatching the "
                                  Utf8Builder -> Utf8Builder -> Utf8Builder
forall a. Semigroup a => a -> a -> a
<> Utf8Builder
"post-initialization message for worker: "
                                  Utf8Builder -> Utf8Builder -> Utf8Builder
forall a. Semigroup a => a -> a -> a
<> k -> Utf8Builder
forall a. Display a => a -> Utf8Builder
display k
k
                              )
                            BrokerConfig k w' w a m -> ResourceCleaner k a m
forall k w' w a m. BrokerConfig k w' w a m -> ResourceCleaner k a m
resourceCleaner BrokerConfig k w' w a m
config k
k a
res
                        )
          )

onDispatch ::
  (Ord k, HasLogFunc m, Display k) =>
  (forall x. RIO m x -> RIO m x) ->
  k ->
  w ->
  BrokerConfig k w' w a m ->
  BrokerState k a ->
  RIO m (BrokerState k a)
onDispatch :: (forall x. RIO m x -> RIO m x)
-> k
-> w
-> BrokerConfig k w' w a m
-> BrokerState k a
-> RIO m (BrokerState k a)
onDispatch forall x. RIO m x -> RIO m x
unmask k
k w
w BrokerConfig k w' w a m
config BrokerState k a
brokerState =
  RIO m (BrokerState k a)
-> (a -> RIO m (BrokerState k a))
-> Maybe a
-> RIO m (BrokerState k a)
forall b a. b -> (a -> b) -> Maybe a -> b
maybe RIO m (BrokerState k a)
notFound a -> RIO m (BrokerState k a)
dispatch (k -> BrokerState k a -> Maybe a
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup k
k BrokerState k a
brokerState)
  where
    notFound :: RIO m (BrokerState k a)
notFound = do
      Utf8Builder -> RIO m ()
forall (m :: * -> *) env.
(MonadIO m, MonadReader env m, HasLogFunc env, HasCallStack) =>
Utf8Builder -> m ()
logWarn
        ( Utf8Builder
"cannot dispatch message, worker not found: "
            Utf8Builder -> Utf8Builder -> Utf8Builder
forall a. Semigroup a => a -> a -> a
<> k -> Utf8Builder
forall a. Display a => a -> Utf8Builder
display k
k
        )
      BrokerState k a -> RIO m (BrokerState k a)
forall (m :: * -> *) a. Monad m => a -> m a
return BrokerState k a
brokerState
    dispatch :: a -> RIO m (BrokerState k a)
dispatch a
res =
      RIO m (ResourceUpdate a)
-> RIO m (Either SomeException (ResourceUpdate a))
forall (m :: * -> *) a.
MonadUnliftIO m =>
m a -> m (Either SomeException a)
tryAny (RIO m (ResourceUpdate a) -> RIO m (ResourceUpdate a)
forall x. RIO m x -> RIO m x
unmask (BrokerConfig k w' w a m -> MessageHandler k w a m
forall k w' w a m.
BrokerConfig k w' w a m -> MessageHandler k w a m
messageDispatcher BrokerConfig k w' w a m
config k
k w
w a
res))
        RIO m (Either SomeException (ResourceUpdate a))
-> (Either SomeException (ResourceUpdate a)
    -> RIO m (BrokerState k a))
-> RIO m (BrokerState k a)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (SomeException -> RIO m (BrokerState k a))
-> (ResourceUpdate a -> RIO m (BrokerState k a))
-> Either SomeException (ResourceUpdate a)
-> RIO m (BrokerState k a)
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either
          ( \SomeException
err -> do

              Utf8Builder -> RIO m ()
forall (m :: * -> *) env.
(MonadIO m, MonadReader env m, HasLogFunc env, HasCallStack) =>
Utf8Builder -> m ()
logError
                ( Utf8Builder
"the message dispatcher callback for worker "
                    Utf8Builder -> Utf8Builder -> Utf8Builder
forall a. Semigroup a => a -> a -> a
<> k -> Utf8Builder
forall a. Display a => a -> Utf8Builder
display k
k
                    Utf8Builder -> Utf8Builder -> Utf8Builder
forall a. Semigroup a => a -> a -> a
<> Utf8Builder
" threw: "
                    Utf8Builder -> Utf8Builder -> Utf8Builder
forall a. Semigroup a => a -> a -> a
<> SomeException -> Utf8Builder
forall a. Display a => a -> Utf8Builder
display SomeException
err
                )
              k
-> BrokerConfig k w' w a m
-> BrokerState k a
-> RIO m (BrokerState k a)
forall k w' w a m.
Ord k =>
k -> BrokerConfig k w' w a m -> Map k a -> RIO m (Map k a)
cleanupResource
                k
k
                BrokerConfig k w' w a m
config
                BrokerState k a
brokerState
          )
          ( \case
              ResourceUpdate a
KeepResource ->
                BrokerState k a -> RIO m (BrokerState k a)
forall (m :: * -> *) a. Monad m => a -> m a
return BrokerState k a
brokerState
              UpdateResource a
newRes ->
                BrokerState k a -> RIO m (BrokerState k a)
forall (m :: * -> *) a. Monad m => a -> m a
return (k -> a -> BrokerState k a -> BrokerState k a
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert k
k a
newRes BrokerState k a
brokerState)
              RemoveResource Maybe a
mNewRes ->
                k
-> BrokerConfig k w' w a m
-> BrokerState k a
-> RIO m (BrokerState k a)
forall k w' w a m.
Ord k =>
k -> BrokerConfig k w' w a m -> Map k a -> RIO m (Map k a)
cleanupResource
                  k
k
                  BrokerConfig k w' w a m
config
                  ( BrokerState k a
-> (a -> BrokerState k a) -> Maybe a -> BrokerState k a
forall b a. b -> (a -> b) -> Maybe a -> b
maybe
                      BrokerState k a
brokerState
                      ( \a
newRes ->
                          k -> a -> BrokerState k a -> BrokerState k a
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert k
k a
newRes BrokerState k a
brokerState
                      )
                      Maybe a
mNewRes
                  )
          )

cleanupAllResources ::
  BrokerConfig k w' w a m ->
  BrokerState k a ->
  RIO m ()
cleanupAllResources :: BrokerConfig k w' w a m -> BrokerState k a -> RIO m ()
cleanupAllResources BrokerConfig k w' w a m
config BrokerState k a
brokerState =
  ((k, a) -> RIO m ()) -> [(k, a)] -> RIO m ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_
    ( (k -> a -> RIO m ()) -> (k, a) -> RIO m ()
forall a b c. (a -> b -> c) -> (a, b) -> c
uncurry
        (BrokerConfig k w' w a m -> k -> a -> RIO m ()
forall k w' w a m. BrokerConfig k w' w a m -> ResourceCleaner k a m
tryResourceCleaner BrokerConfig k w' w a m
config)
    )
    (BrokerState k a -> [(k, a)]
forall k a. Map k a -> [(k, a)]
Map.assocs BrokerState k a
brokerState)

cleanupResource ::
  (Ord k) =>
  k ->
  BrokerConfig k w' w a m ->
  Map k a ->
  RIO m (Map k a)
cleanupResource :: k -> BrokerConfig k w' w a m -> Map k a -> RIO m (Map k a)
cleanupResource k
k BrokerConfig k w' w a m
config Map k a
brokerState = do
  (a -> RIO m ()) -> Maybe a -> RIO m ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ (BrokerConfig k w' w a m -> k -> a -> RIO m ()
forall k w' w a m. BrokerConfig k w' w a m -> ResourceCleaner k a m
tryResourceCleaner BrokerConfig k w' w a m
config k
k) (k -> Map k a -> Maybe a
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup k
k Map k a
brokerState)
  Map k a -> RIO m (Map k a)
forall (m :: * -> *) a. Monad m => a -> m a
return (k -> Map k a -> Map k a
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete k
k Map k a
brokerState)

tryResourceCleaner ::
  BrokerConfig k w' w a m ->
  k ->
  a ->
  RIO m ()
tryResourceCleaner :: BrokerConfig k w' w a m -> k -> a -> RIO m ()
tryResourceCleaner BrokerConfig k w' w a m
config k
k a
res = do
  RIO m (Either SomeException ()) -> RIO m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (RIO m (Either SomeException ()) -> RIO m ())
-> RIO m (Either SomeException ()) -> RIO m ()
forall a b. (a -> b) -> a -> b
$ RIO m () -> RIO m (Either SomeException ())
forall (m :: * -> *) a.
MonadUnliftIO m =>
m a -> m (Either SomeException a)
tryAny (BrokerConfig k w' w a m -> k -> a -> RIO m ()
forall k w' w a m. BrokerConfig k w' w a m -> ResourceCleaner k a m
resourceCleaner BrokerConfig k w' w a m
config k
k a
res)