{-# LANGUAGE BangPatterns #-}
module System.IO.Streams.Heartbeat
( heartbeatOutputStream
, heartbeatInputStream
, HeartbeatException (..)
) where
import Control.Concurrent (threadDelay)
import Control.Concurrent.Async (async, cancel, link)
import Control.Exception (Exception, throw)
import Control.Monad (forever)
import Data.IORef (atomicModifyIORef', newIORef, writeIORef)
import Data.Time.Clock (DiffTime, UTCTime, diffTimeToPicoseconds, diffUTCTime, getCurrentTime)
import System.IO.Streams (InputStream, OutputStream)
import qualified System.IO.Streams as Streams
heartbeatOutputStream :: DiffTime
-> a
-> OutputStream a -> IO (OutputStream a)
heartbeatOutputStream interval msg os = do
t <- newIORef =<< getCurrentTime
writeAsync <- async $ delayInterval >> forever (writeHeartbeat t)
link writeAsync
Streams.makeOutputStream (resetHeartbeat t writeAsync)
where
delayInterval = delayDiffTime interval
writeHeartbeat t = do
!now <- getCurrentTime
(!timeTilHeartbeat, !triggerHeartbeat) <- atomicModifyIORef' t (heartbeatTime interval now)
if triggerHeartbeat
then Streams.write (Just msg) os >> delayInterval
else delayDiffTime timeTilHeartbeat
resetHeartbeat t _ x@(Just _) = Streams.write x os >> getCurrentTime >>= writeIORef t
resetHeartbeat _ writeAsync Nothing = Streams.write Nothing os >> cancel writeAsync
data HeartbeatException = MissedHeartbeat DiffTime deriving (Show, Eq)
instance Exception HeartbeatException
heartbeatInputStream :: DiffTime
-> DiffTime
-> InputStream a -> IO (InputStream a)
heartbeatInputStream interval graceMultiplier is = do
t <- newIORef =<< getCurrentTime
checkAsync <- async $ delayDiffTime gracePeriod >> forever (checkHeartbeat t)
link checkAsync
Streams.mapM_ (resetHeartbeat t) is >>= Streams.atEndOfInput (cancel checkAsync)
where
gracePeriod = graceMultiplier * interval
checkHeartbeat t = do
!now <- getCurrentTime
!triggerDisconnect <- snd <$> atomicModifyIORef' t (heartbeatTime gracePeriod now)
if triggerDisconnect
then throw (MissedHeartbeat gracePeriod)
else delayDiffTime interval
resetHeartbeat t _ = getCurrentTime >>= writeIORef t
heartbeatTime :: DiffTime
-> UTCTime
-> UTCTime
-> (UTCTime, (DiffTime, Bool))
heartbeatTime interval now lastTime = (if triggerHeartbeat then now else lastTime, (timeTilHeartbeat, triggerHeartbeat))
where
timeSinceMsg = realToFrac $ diffUTCTime now lastTime
triggerHeartbeat = timeSinceMsg >= interval
timeTilHeartbeat = interval - timeSinceMsg
delayDiffTime :: DiffTime -> IO ()
delayDiffTime = threadDelay . picosToMicros
where picosToMicros = fromIntegral . diffTimeToPicoseconds . (/ 1000000)