module Kafka.Worker.Analytics ( Analytics, AssignedPartitions (AssignedPartitions), PausedPartitions (PausedPartitions), TimeOfLastRebalance (TimeOfLastRebalance), init, read, updatePaused, updateTimeOfLastRebalance, ) where import qualified Control.Concurrent.STM as STM import qualified Control.Concurrent.STM.TVar as TVar import qualified Prelude newtype PausedPartitions = PausedPartitions Int newtype AssignedPartitions = AssignedPartitions Int newtype TimeOfLastRebalance = TimeOfLastRebalance Float data Analytics = Analytics { Analytics -> TVar PausedPartitions pausedPartitions :: TVar.TVar PausedPartitions, Analytics -> TVar TimeOfLastRebalance timeOfLastRebalance :: TVar.TVar TimeOfLastRebalance, Analytics -> IO AssignedPartitions assignedPartitions :: Prelude.IO AssignedPartitions } init :: Prelude.IO Int -> Prelude.IO Analytics init :: IO Int -> IO Analytics init IO Int assignedPartitions' = do TVar PausedPartitions pausedPartitions <- PausedPartitions -> IO (TVar PausedPartitions) forall a. a -> IO (TVar a) TVar.newTVarIO (Int -> PausedPartitions PausedPartitions Int 0) TVar TimeOfLastRebalance timeOfLastRebalance <- TimeOfLastRebalance -> IO (TVar TimeOfLastRebalance) forall a. a -> IO (TVar a) TVar.newTVarIO (Float -> TimeOfLastRebalance TimeOfLastRebalance Float 0) Analytics -> IO Analytics forall (f :: * -> *) a. Applicative f => a -> f a Prelude.pure ( Analytics :: TVar PausedPartitions -> TVar TimeOfLastRebalance -> IO AssignedPartitions -> Analytics Analytics { TVar PausedPartitions pausedPartitions :: TVar PausedPartitions pausedPartitions :: TVar PausedPartitions pausedPartitions, TVar TimeOfLastRebalance timeOfLastRebalance :: TVar TimeOfLastRebalance timeOfLastRebalance :: TVar TimeOfLastRebalance timeOfLastRebalance, assignedPartitions :: IO AssignedPartitions assignedPartitions = (Int -> AssignedPartitions) -> IO Int -> IO AssignedPartitions forall (m :: * -> *) a value. Functor m => (a -> value) -> m a -> m value map Int -> AssignedPartitions AssignedPartitions IO Int assignedPartitions' } ) read :: Analytics -> Prelude.IO (PausedPartitions, AssignedPartitions, TimeOfLastRebalance) read :: Analytics -> IO (PausedPartitions, AssignedPartitions, TimeOfLastRebalance) read Analytics {TVar PausedPartitions pausedPartitions :: TVar PausedPartitions pausedPartitions :: Analytics -> TVar PausedPartitions pausedPartitions, IO AssignedPartitions assignedPartitions :: IO AssignedPartitions assignedPartitions :: Analytics -> IO AssignedPartitions assignedPartitions, TVar TimeOfLastRebalance timeOfLastRebalance :: TVar TimeOfLastRebalance timeOfLastRebalance :: Analytics -> TVar TimeOfLastRebalance timeOfLastRebalance} = do PausedPartitions analyticsPausedPartitions <- TVar PausedPartitions -> IO PausedPartitions forall a. TVar a -> IO a TVar.readTVarIO TVar PausedPartitions pausedPartitions TimeOfLastRebalance analyticsTimeOfLastRebalance <- TVar TimeOfLastRebalance -> IO TimeOfLastRebalance forall a. TVar a -> IO a TVar.readTVarIO TVar TimeOfLastRebalance timeOfLastRebalance AssignedPartitions analyticsAssignedPartitions <- IO AssignedPartitions assignedPartitions (PausedPartitions, AssignedPartitions, TimeOfLastRebalance) -> IO (PausedPartitions, AssignedPartitions, TimeOfLastRebalance) forall (f :: * -> *) a. Applicative f => a -> f a Prelude.pure (PausedPartitions analyticsPausedPartitions, AssignedPartitions analyticsAssignedPartitions, TimeOfLastRebalance analyticsTimeOfLastRebalance) updatePaused :: Int -> Analytics -> Prelude.IO () updatePaused :: Int -> Analytics -> IO () updatePaused Int numPaused Analytics {TVar PausedPartitions pausedPartitions :: TVar PausedPartitions pausedPartitions :: Analytics -> TVar PausedPartitions pausedPartitions} = STM () -> IO () forall a. STM a -> IO a STM.atomically (STM () -> IO ()) -> STM () -> IO () forall a b. (a -> b) -> a -> b <| TVar PausedPartitions -> PausedPartitions -> STM () forall a. TVar a -> a -> STM () TVar.writeTVar TVar PausedPartitions pausedPartitions (Int -> PausedPartitions PausedPartitions Int numPaused) updateTimeOfLastRebalance :: Float -> Analytics -> Prelude.IO () updateTimeOfLastRebalance :: Float -> Analytics -> IO () updateTimeOfLastRebalance Float now Analytics {TVar TimeOfLastRebalance timeOfLastRebalance :: TVar TimeOfLastRebalance timeOfLastRebalance :: Analytics -> TVar TimeOfLastRebalance timeOfLastRebalance} = do STM () -> IO () forall a. STM a -> IO a STM.atomically (STM () -> IO ()) -> STM () -> IO () forall a b. (a -> b) -> a -> b <| TVar TimeOfLastRebalance -> TimeOfLastRebalance -> STM () forall a. TVar a -> a -> STM () TVar.writeTVar TVar TimeOfLastRebalance timeOfLastRebalance (Float -> TimeOfLastRebalance TimeOfLastRebalance Float now)