{-| Module : Z.IO.LowResTimer Description : Low resolution (0.1s) timing wheel Copyright : (c) Dong Han, 2017-2018 License : BSD Maintainer : winterland1989@gmail.com Stability : experimental Portability : non-portable This module provide low resolution (0.1s) timers using a timing wheel of size 128 per capability, each timer thread will automatically started or stopped based on demannd. register or cancel a timeout is O(1), and each step only need scan n/128 items given timers are registered in an even fashion. This timer is particularly suitable for high concurrent approximated IO timeout scheduling. You should not rely on it to provide timing information since it's very inaccurate. Reference: * * -} module Z.IO.LowResTimer ( -- * low resolution timers registerLowResTimer , registerLowResTimer_ , registerLowResTimerOn , LowResTimer , queryLowResTimer , cancelLowResTimer , cancelLowResTimer_ , timeoutLowRes , timeoutLowResEx , threadDelayLowRes , throttle , throttle_ , throttleTrailing_ -- * low resolution timer manager , LowResTimerManager , getLowResTimerManager , isLowResTimerManagerRunning , lowResTimerManagerCapabilitiesChanged ) where import Z.Data.Array import Data.Word #ifndef mingw32_HOST_OS import GHC.Event #endif import Control.Concurrent import Control.Monad import Data.IORef import GHC.Conc import System.IO.Unsafe import Z.Data.PrimRef import Z.IO.Exception import Z.IO.UV.FFI_Env (uv_hrtime) -- queueSize :: Int {-# INLINABLE queueSize #-} queueSize = 128 -- | A simple timing wheel -- data TimerList = TimerItem {-# UNPACK #-} !Counter (IO ()) TimerList | TimerNil data LowResTimerManager = LowResTimerManager (SmallArray (IORef TimerList)) -- timer queue (MVar Int) -- current time wheel's index Counter -- registered counter, stop timer thread if go downs to zero (MVar Bool) -- running lock newLowResTimerManager :: IO LowResTimerManager {-# INLINABLE newLowResTimerManager #-} newLowResTimerManager = do indexLock <- newMVar 0 regCounter <- newCounter 0 runningLock <- newMVar False queue <- newArr queueSize forM_ [0..queueSize-1] $ \ i -> do writeArr queue i =<< newIORef TimerNil iqueue <- unsafeFreezeArr queue return (LowResTimerManager iqueue indexLock regCounter runningLock) lowResTimerManager :: IORef (SmallArray LowResTimerManager) {-# NOINLINE lowResTimerManager #-} lowResTimerManager = unsafePerformIO $ do numCaps <- getNumCapabilities lrtmArray <- newArr numCaps forM_ [0..numCaps-1] $ \ i -> do writeArr lrtmArray i =<< newLowResTimerManager ilrtmArray <- unsafeFreezeArr lrtmArray newIORef ilrtmArray -- | Create new low resolution timer manager on capability change. -- -- Since low resolution timer manager is not hooked into RTS, you're responsible to call this function -- after you call 'setNumCapabilities' to match timer manager array size with new capability number. -- -- This is not a must though, when we fetch timer manager we always take a modulo. -- lowResTimerManagerCapabilitiesChanged :: IO () {-# INLINABLE lowResTimerManagerCapabilitiesChanged #-} lowResTimerManagerCapabilitiesChanged = do lrtmArray <- readIORef lowResTimerManager let oldSize = sizeofArr lrtmArray numCaps <- getNumCapabilities when (numCaps /= oldSize) $ do lrtmArray' <- newArr numCaps if numCaps < oldSize then do forM_ [0..numCaps-1] $ \ i -> do writeArr lrtmArray' i =<< indexArrM lrtmArray i else do forM_ [0..oldSize-1] $ \ i -> do writeArr lrtmArray' i =<< indexArrM lrtmArray i forM_ [oldSize..numCaps-1] $ \ i -> do writeArr lrtmArray' i =<< newLowResTimerManager ilrtmArray' <- unsafeFreezeArr lrtmArray' atomicModifyIORef' lowResTimerManager $ \ _ -> (ilrtmArray', ()) -- | Get a 'LowResTimerManager' for current thread. -- getLowResTimerManager :: IO LowResTimerManager {-# INLINABLE getLowResTimerManager #-} getLowResTimerManager = do (cap, _) <- threadCapability =<< myThreadId lrtmArray <- readIORef lowResTimerManager indexArrM lrtmArray (cap `rem` sizeofArr lrtmArray) -- | Check if a timer manager's wheel is turning -- -- This is mostly for testing purpose. -- isLowResTimerManagerRunning :: LowResTimerManager -> IO Bool {-# INLINABLE isLowResTimerManagerRunning #-} isLowResTimerManagerRunning (LowResTimerManager _ _ _ runningLock) = readMVar runningLock -- | Register a new timer on current capability's timer manager, start the timing wheel if it's not turning. -- -- If the action could block, you may want to run it in another thread. Example to kill a thread after 10s: -- -- @ -- registerLowResTimer 100 (forkIO $ killThread tid) -- @ -- registerLowResTimer :: Int -- ^ timeout in unit of 0.1s -> IO () -- ^ the action you want to perform, it should not block -> IO LowResTimer {-# INLINABLE registerLowResTimer #-} registerLowResTimer t action = do lrtm <- getLowResTimerManager registerLowResTimerOn lrtm t action -- | 'void' ('registerLowResTimer' t action) registerLowResTimer_ :: Int -- ^ timeout in unit of 0.1s -> IO () -- ^ the action you want to perform, it should not block -> IO () {-# INLINABLE registerLowResTimer_ #-} registerLowResTimer_ t action = void (registerLowResTimer t action) -- | Same as 'registerLowResTimer', but allow you choose timer manager. -- registerLowResTimerOn :: LowResTimerManager -- ^ a low resolution timer manager -> Int -- ^ timeout in unit of 0.1s -> IO () -- ^ the action you want to perform, it should not block -> IO LowResTimer {-# INLINABLE registerLowResTimerOn #-} registerLowResTimerOn lrtm@(LowResTimerManager queue indexLock regCounter _) t action = do let (round_, tick) = (max 0 t) `quotRem` queueSize i <- readMVar indexLock tlistRef <- indexArrM queue ((i + tick) `rem` queueSize) roundCounter <- newCounter round_ mask_ $ do atomicModifyIORef' tlistRef $ \ tlist -> let newList = TimerItem roundCounter action tlist in (newList, ()) atomicAddCounter_ regCounter 1 ensureLowResTimerManager lrtm return (LowResTimer roundCounter) -- cancel is simple, just set the round number to -1. -- next scan will eventually release it -- | Timer registered by 'registerLowResTimer' or 'registerLowResTimerOn'. -- newtype LowResTimer = LowResTimer Counter -- | Query how many seconds remain before timer firing. -- -- A return value <= 0 indictate the timer is firing or fired. -- queryLowResTimer :: LowResTimer -> IO Int {-# INLINABLE queryLowResTimer #-} queryLowResTimer (LowResTimer c) = readPrimRef c -- | Cancel a timer, return the remaining ticks. -- -- This function have no effect after the timer is fired. -- cancelLowResTimer :: LowResTimer -> IO Int {-# INLINABLE cancelLowResTimer #-} cancelLowResTimer (LowResTimer c) = atomicOrCounter c (-1) -- | @void . cancelLowResTimer@ -- cancelLowResTimer_ :: LowResTimer -> IO () {-# INLINABLE cancelLowResTimer_ #-} cancelLowResTimer_ = void . cancelLowResTimer -- | similar to 'System.Timeout.timeout', this function put a limit on time which an IO can consume. -- -- Note timeoutLowRes is also implemented with 'Exception' underhood, which can have some surprising -- effects on some devices, e.g. use 'timeoutLowRes' with reading or writing on 'UVStream's may close -- the 'UVStream' once a reading or writing is not able to be done in time. timeoutLowRes :: Int -- ^ timeout in unit of 0.1s -> IO a -> IO (Maybe a) {-# INLINABLE timeoutLowRes #-} timeoutLowRes timeo io = do mid <- myThreadId catch (do timer <- registerLowResTimer timeo (timeoutAThread mid) r <- io _ <- cancelLowResTimer timer return (Just r)) ( \ (_ :: TimeOutException) -> return Nothing ) where timeoutAThread tid = void . forkIO $ throwTo tid (TimeOutException tid undefined) -- | Similar to 'timeoutLowRes', but throw an async 'TimeOutException' to current thread -- instead of return 'Nothing' if timeout. timeoutLowResEx :: HasCallStack => Int -- ^ timeout in unit of 0.1s -> IO a -> IO a {-# INLINABLE timeoutLowResEx #-} timeoutLowResEx timeo io = do mid <- myThreadId timer <- registerLowResTimer timeo (timeoutAThread mid) r <- io _ <- cancelLowResTimer timer return r where timeoutAThread tid = void . forkIO $ throwTo tid (TimeOutException tid callStack) -- | see 'timeoutLowResEx' on 'TimeOutException'. -- -- This exception is not a sub-exception type of 'SomeIOException', -- but a sub-exception type of 'TimeOutException'. data TimeOutException = TimeOutException ThreadId CallStack deriving Show instance Exception TimeOutException where toException = asyncExceptionToException fromException = asyncExceptionFromException -- | Similiar to 'threadDelay', suspends the current thread for a given number of deciseconds. -- threadDelayLowRes :: Int -> IO () {-# INLINABLE threadDelayLowRes #-} threadDelayLowRes dsecs = mask_ $ do m <- newEmptyMVar t <- registerLowResTimer dsecs (putMVar m ()) takeMVar m `onException` cancelLowResTimer_ t -------------------------------------------------------------------------------- -- | Check if low resolution timer manager loop is running, start loop if not. -- ensureLowResTimerManager :: LowResTimerManager -> IO () {-# INLINABLE ensureLowResTimerManager #-} ensureLowResTimerManager lrtm@(LowResTimerManager _ _ _ runningLock) = do modifyMVar_ runningLock $ \ running -> do unless running $ do t <- uv_hrtime tid <- forkIO (startLowResTimerManager lrtm t) labelThread tid "Z-IO: low resolution time manager" -- make sure we can see it in GHC event log return True -- | Start low resolution timer loop, the loop is automatically stopped if there's no more new registrations. -- startLowResTimerManager :: LowResTimerManager -> Word64 -> IO () {-# INLINABLE startLowResTimerManager #-} startLowResTimerManager lrtm@(LowResTimerManager _ _ regCounter runningLock) !stdT = do modifyMVar_ runningLock $ \ _ -> do -- we shouldn't receive async exception here c <- readPrimRef regCounter -- unless something terribly wrong happened, e.g., stackoverflow if c > 0 then do t <- uv_hrtime -- we can't use 100000 as maximum, because that will produce a 0us thread delay -- and GHC's registerTimeout will run next startLowResTimerManager directly(on current thread) -- but we're still holding runningLock, which cause an deadlock. let !deltaT = min (fromIntegral ((t - stdT) `quot` 1000)) 99999 _ <- forkIO (fireLowResTimerQueue lrtm) -- we offload the scanning to another thread to minimize -- the time we holding runningLock case () of _ #ifndef mingw32_HOST_OS | rtsSupportsBoundThreads -> do htm <- getSystemTimerManager void $ registerTimeout htm (100000 - deltaT) (startLowResTimerManager lrtm (stdT + 100000000)) #endif | otherwise -> void . forkIO $ do -- we have to fork another thread since we're holding runningLock, threadDelay (100000 - deltaT) -- this may affect accuracy, but on windows there're no other choices. startLowResTimerManager lrtm (stdT + 100000000) return True else do return False -- if we haven't got any registered timeout, we stop the time manager -- doing this can stop us from getting the way of idle GC -- since we're still inside runningLock, we won't miss new registration. -- | Scan the timeout queue in current tick index, and move tick index forward by one. -- fireLowResTimerQueue :: LowResTimerManager -> IO () {-# INLINABLE fireLowResTimerQueue #-} fireLowResTimerQueue (LowResTimerManager queue indexLock regCounter _) = do (tList, tListRef) <- modifyMVar indexLock $ \ index -> do -- get the index lock tListRef <- indexArrM queue index tList <- atomicModifyIORef' tListRef $ \ tList -> (TimerNil, tList) -- swap current index list with an empty one let !index' = (index+1) `rem` queueSize -- move index forward by 1 return (index', (tList, tListRef)) -- release the lock go tList tListRef regCounter where go (TimerItem roundCounter action nextList) tListRef counter = do r <- atomicSubCounter roundCounter 1 case r `compare` 0 of LT -> do -- if round number is less than 0, then it's a cancelled timer atomicSubCounter_ counter 1 go nextList tListRef counter EQ -> do -- if round number is equal to 0, fire it atomicSubCounter_ counter 1 catchSync action ( \ (_ :: SomeException) -> return () ) -- well, we really don't want timers break our loop go nextList tListRef counter GT -> do -- if round number is larger than 0, put it back for another round atomicModifyIORef' tListRef $ \ tlist -> (TimerItem roundCounter action tlist, ()) go nextList tListRef counter go TimerNil _ _ = return () -------------------------------------------------------------------------------- -- | Cache result of an IO action for give time t. -- -- This combinator is useful when you want to share IO result within a period, the action will be called -- on demand, and the result will be cached for t milliseconds. -- -- One common way to get a shared periodical updated value is to start a seperate thread and do calculation -- periodically, but doing that will stop system from being idle, which stop idle GC from running, -- and in turn disable deadlock detection, which is too bad. This function solves that. throttle :: Int -- ^ cache time in unit of 0.1s -> IO a -- ^ the original IO action -> IO (IO a) -- ^ throttled IO action {-# INLINABLE throttle #-} throttle t action = do resultCounter <- newCounter 0 resultRef <- newIORef =<< action return $ do c <- atomicOrCounter resultCounter (-1) -- 0x11111111 or 0x1111111111111111 depend machine word size if c == 0 then do registerLowResTimer_ t (void $ atomicAndCounter resultCounter 0) !r <- action atomicWriteIORef resultRef r return r else readIORef resultRef -- | Throttle an IO action without caching result. -- -- The IO action will run at leading edge. i.e. once run, during following (t/10)s throttled action will -- no-ops. -- -- Note the action will run in the calling thread. throttle_ :: Int -- ^ cache time in unit of 0.1s -> IO () -- ^ the original IO action -> IO (IO ()) -- ^ throttled IO action {-# INLINABLE throttle_ #-} throttle_ t action = do resultCounter <- newCounter 0 return $ do c <- atomicOrCounter resultCounter (-1) -- 0x11111111 or 0x1111111111111111 depend machine word size when (c == 0) $ do registerLowResTimer_ t (void $ atomicAndCounter resultCounter 0) void action -- | Similar to 'throttle_' but run action in trailing edge -- -- The IO action will run at trailing edge. i.e. no matter how many times throttled action -- are called, original action will run only once after (t/10)s. -- -- Note the action will be run in a new created thread. throttleTrailing_ :: Int -> IO () -- ^ the original IO action -> IO (IO ()) -- ^ throttled IO action {-# INLINABLE throttleTrailing_ #-} throttleTrailing_ t action = do resultCounter <- newCounter 0 return $ do c <- atomicOrCounter resultCounter (-1) -- 0x11111111 or 0x1111111111111111 depend machine word size when (c == 0) . registerLowResTimer_ t . void . forkIO $ do atomicAndCounter_ resultCounter 0 action