module Control.Concurrent.Pulse
( Pulse, newPulse, destroyPulse, withPulse, waitForPulse )
where
import Control.Concurrent
import Control.Concurrent.Async
import Control.Exception
import Control.Monad
import Data.IORef
import Data.Time.Clock
import qualified Data.Sequence as S
data Pulse
= Pulse
{ p_worker :: !(Async ())
, p_pending :: !(IORef (S.Seq (MVar ())))
}
withPulse :: DiffTime -> (Pulse -> IO a) -> IO a
withPulse everySecs = bracket (newPulse everySecs) destroyPulse
newPulse :: DiffTime -> IO Pulse
newPulse everySecs =
do pending <- newIORef S.empty
worker <- async (pulseWorker everySecs pending)
return (Pulse worker pending)
destroyPulse :: Pulse -> IO ()
destroyPulse p =
do cancel (p_worker p)
pure ()
pulseWorker :: DiffTime -> IORef (S.Seq (MVar ())) -> IO ()
pulseWorker everySecs pendingVar =
forever $
do threadDelay (round $ everySecs * 1000 * 1000)
pending <- atomicModifyIORef' pendingVar (\p -> (S.empty, p))
_ <- forConcurrently pending $ flip putMVar ()
pure ()
waitForPulse :: Pulse -> IO ()
waitForPulse p =
do var <- newEmptyMVar
_ <- atomicModifyIORef' (p_pending p) $ \x -> (x S.|> var, ())
takeMVar var