module Control.Concurrent.Pulse ( Pulse, newPulse, destroyPulse, withPulse, waitForPulse ) where import Control.Concurrent import Control.Concurrent.Async import Control.Exception import Control.Monad import Data.IORef import Data.Time.Clock import qualified Data.Sequence as S -- | A heartbeat data Pulse = Pulse { p_worker :: !(Async ()) , p_pending :: !(IORef (S.Seq (MVar ()))) } -- | Automatically calls 'newPulse' and 'destroyPulse' as needed withPulse :: DiffTime -> (Pulse -> IO a) -> IO a withPulse everySecs = bracket (newPulse everySecs) destroyPulse -- | Create a new pulse that will send a heartbeat every 'DiffTime' seconds newPulse :: DiffTime -> IO Pulse newPulse everySecs = do pending <- newIORef S.empty worker <- async (pulseWorker everySecs pending) return (Pulse worker pending) -- | Destroy the pulse. Note that all pending 'waitForPulse' will be left blocked destroyPulse :: Pulse -> IO () destroyPulse p = do cancel (p_worker p) pure () pulseWorker :: DiffTime -> IORef (S.Seq (MVar ())) -> IO () pulseWorker everySecs pendingVar = forever $ do threadDelay (round $ everySecs * 1000 * 1000) pending <- atomicModifyIORef' pendingVar (\p -> (S.empty, p)) _ <- forConcurrently pending $ flip putMVar () pure () -- | Block until the next heartbeat is triggered on the 'Pulse' waitForPulse :: Pulse -> IO () waitForPulse p = do var <- newEmptyMVar _ <- atomicModifyIORef' (p_pending p) $ \x -> (x S.|> var, ()) takeMVar var {-# INLINE waitForPulse #-}