{-| Module : Control.Concurrent.Throttle Description : Throttling async mechanism Copyright : (c) CNRS, 2024-Present License : AGPL + CECILL v3 Maintainer : team@gargantext.org Stability : experimental Portability : POSIX -} {-# LANGUAGE ScopedTypeVariables #-} module Control.Concurrent.Throttle ( throttle ) where import Control.Concurrent (threadDelay) import Control.Concurrent.Async qualified as Async import Control.Concurrent.STM (atomically) import Control.Concurrent.STM.TChan qualified as TChan import Control.Concurrent.STM.TVar qualified as TVar import Control.Monad (forever) import Data.Map.Strict qualified as Map import Data.Maybe (isNothing) import Data.Time.Clock.POSIX (getPOSIXTime) -- TODO Add a ThrottleHash typeclass which converts 'a' to 'id'? {-| Throttling with given interval. Here, throttling means: perform action only as frequently as allowed and other calls are DROPPED. This is in contrast to things like Conduit throttling, where actions are just SLOWED DOWN. We use this for asynchronous notifications and, if messages are the same, we can just drop them safely. Our input is the provided 'TChan.TChan'. This function should be spawned as a thread. We provide separate 'id' and 'a'. 'id' is used for uniquely identifying groups of throttled messages, while 'a' are actual messages that are sent to 'action' callback. -} throttle :: (Ord id, Eq id, Show id) => Int -> TChan.TChan (id, a) -> (a -> IO ()) -> IO () throttle delay tchan action = do smap <- TVar.newTVarIO Map.empty :: IO (TVar.TVar (Map.Map id (a, Int))) Async.withAsync (mapCleaner smap) $ \_ -> forever $ do (msgId, msg) <- atomically $ TChan.readTChan tchan now <- unixTime atomically $ TVar.modifyTVar smap (Map.insert msgId (msg, now)) where -- | This thread just clears outdated map elements at regular intervals mapCleaner smap = forever $ do -- https://stackoverflow.com/questions/42843882/how-do-you-get-a-millisecond-precision-unix-timestamp-in-haskell now <- unixTime m <- TVar.readTVarIO smap -- let (_needToWait, canRun) = Map.partition (\(_, t) -> now - t < delay) m let canRun = Map.filter (\(_, t) -> now - t >= delay) m -- putStrLn $ "[mapCleaner] m " <> show (Map.mapWithKey (\k (_, t) -> (k, now - t)) m) -- putStrLn $ "[mapCleaner] canRun " <> show (Map.keys canRun) mapM_ (\(msg, _) -> action msg) canRun -- OK so this is a bit tricky. STM guarantees atomic read above -- and 'smap' could have been modified while we ran 'mapM_'. The -- only way to modify 'smap' is to add new items. -- * an item in 'canRun' was added: so we called the throttled -- function and it's been added in the meantime into the queue. -- In this case we have to compare the time again with 'now' -- * an item not in 'canRun' was added atomically $ TVar.modifyTVar smap $ Map.filterWithKey (\k (_, t) -> isNothing (Map.lookup k canRun) || (now - t > 0 && now - t < delay)) threadDelay (delay `div` 2) -- | Get Unix timestamp, with millisecond resolution unixTime :: IO Int unixTime = (round . (* 1000000)) <$> getPOSIXTime