module Kafka.Worker.Analytics
  ( Analytics,
    AssignedPartitions (AssignedPartitions),
    PausedPartitions (PausedPartitions),
    TimeOfLastRebalance (TimeOfLastRebalance),
    init,
    read,
    updatePaused,
    updateTimeOfLastRebalance,
  )
where

import qualified Control.Concurrent.STM as STM
import qualified Control.Concurrent.STM.TVar as TVar
import qualified Prelude

newtype PausedPartitions = PausedPartitions Int

newtype AssignedPartitions = AssignedPartitions Int

newtype TimeOfLastRebalance = TimeOfLastRebalance Float

data Analytics = Analytics
  { Analytics -> TVar PausedPartitions
pausedPartitions :: TVar.TVar PausedPartitions,
    Analytics -> TVar TimeOfLastRebalance
timeOfLastRebalance :: TVar.TVar TimeOfLastRebalance,
    Analytics -> IO AssignedPartitions
assignedPartitions :: Prelude.IO AssignedPartitions
  }

init :: Prelude.IO Int -> Prelude.IO Analytics
init :: IO Int -> IO Analytics
init IO Int
assignedPartitions' = do
  TVar PausedPartitions
pausedPartitions <- PausedPartitions -> IO (TVar PausedPartitions)
forall a. a -> IO (TVar a)
TVar.newTVarIO (Int -> PausedPartitions
PausedPartitions Int
0)
  TVar TimeOfLastRebalance
timeOfLastRebalance <- TimeOfLastRebalance -> IO (TVar TimeOfLastRebalance)
forall a. a -> IO (TVar a)
TVar.newTVarIO (Float -> TimeOfLastRebalance
TimeOfLastRebalance Float
0)
  Analytics -> IO Analytics
forall (f :: * -> *) a. Applicative f => a -> f a
Prelude.pure
    ( Analytics :: TVar PausedPartitions
-> TVar TimeOfLastRebalance -> IO AssignedPartitions -> Analytics
Analytics
        { TVar PausedPartitions
pausedPartitions :: TVar PausedPartitions
pausedPartitions :: TVar PausedPartitions
pausedPartitions,
          TVar TimeOfLastRebalance
timeOfLastRebalance :: TVar TimeOfLastRebalance
timeOfLastRebalance :: TVar TimeOfLastRebalance
timeOfLastRebalance,
          assignedPartitions :: IO AssignedPartitions
assignedPartitions = (Int -> AssignedPartitions) -> IO Int -> IO AssignedPartitions
forall (m :: * -> *) a value.
Functor m =>
(a -> value) -> m a -> m value
map Int -> AssignedPartitions
AssignedPartitions IO Int
assignedPartitions'
        }
    )

read :: Analytics -> Prelude.IO (PausedPartitions, AssignedPartitions, TimeOfLastRebalance)
read :: Analytics
-> IO (PausedPartitions, AssignedPartitions, TimeOfLastRebalance)
read Analytics {TVar PausedPartitions
pausedPartitions :: TVar PausedPartitions
pausedPartitions :: Analytics -> TVar PausedPartitions
pausedPartitions, IO AssignedPartitions
assignedPartitions :: IO AssignedPartitions
assignedPartitions :: Analytics -> IO AssignedPartitions
assignedPartitions, TVar TimeOfLastRebalance
timeOfLastRebalance :: TVar TimeOfLastRebalance
timeOfLastRebalance :: Analytics -> TVar TimeOfLastRebalance
timeOfLastRebalance} = do
  PausedPartitions
analyticsPausedPartitions <- TVar PausedPartitions -> IO PausedPartitions
forall a. TVar a -> IO a
TVar.readTVarIO TVar PausedPartitions
pausedPartitions
  TimeOfLastRebalance
analyticsTimeOfLastRebalance <- TVar TimeOfLastRebalance -> IO TimeOfLastRebalance
forall a. TVar a -> IO a
TVar.readTVarIO TVar TimeOfLastRebalance
timeOfLastRebalance
  AssignedPartitions
analyticsAssignedPartitions <- IO AssignedPartitions
assignedPartitions
  (PausedPartitions, AssignedPartitions, TimeOfLastRebalance)
-> IO (PausedPartitions, AssignedPartitions, TimeOfLastRebalance)
forall (f :: * -> *) a. Applicative f => a -> f a
Prelude.pure (PausedPartitions
analyticsPausedPartitions, AssignedPartitions
analyticsAssignedPartitions, TimeOfLastRebalance
analyticsTimeOfLastRebalance)

updatePaused :: Int -> Analytics -> Prelude.IO ()
updatePaused :: Int -> Analytics -> IO ()
updatePaused Int
numPaused Analytics {TVar PausedPartitions
pausedPartitions :: TVar PausedPartitions
pausedPartitions :: Analytics -> TVar PausedPartitions
pausedPartitions} =
  STM () -> IO ()
forall a. STM a -> IO a
STM.atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
<| TVar PausedPartitions -> PausedPartitions -> STM ()
forall a. TVar a -> a -> STM ()
TVar.writeTVar TVar PausedPartitions
pausedPartitions (Int -> PausedPartitions
PausedPartitions Int
numPaused)

updateTimeOfLastRebalance :: Float -> Analytics -> Prelude.IO ()
updateTimeOfLastRebalance :: Float -> Analytics -> IO ()
updateTimeOfLastRebalance Float
now Analytics {TVar TimeOfLastRebalance
timeOfLastRebalance :: TVar TimeOfLastRebalance
timeOfLastRebalance :: Analytics -> TVar TimeOfLastRebalance
timeOfLastRebalance} = do
  STM () -> IO ()
forall a. STM a -> IO a
STM.atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
<| TVar TimeOfLastRebalance -> TimeOfLastRebalance -> STM ()
forall a. TVar a -> a -> STM ()
TVar.writeTVar TVar TimeOfLastRebalance
timeOfLastRebalance (Float -> TimeOfLastRebalance
TimeOfLastRebalance Float
now)