module Control.Concurrent.STM.TChan.Extra where
import Control.Monad (forever)
import Control.Concurrent (threadDelay)
import Control.Concurrent.STM (STM, atomically)
import Control.Concurrent.STM.TMVar (newEmptyTMVar, tryTakeTMVar, putTMVar)
import Control.Concurrent.STM.TChan (TChan, readTChan, writeTChan, newTChan)
import Control.Concurrent.Async (Async, async, cancel, wait)
type DiffNanosec = Int
debounceStatic :: DiffNanosec -> TChan a -> IO (TChan a, Async ())
debounceStatic toWaitFurther outputChan = do
(presentedChan,writingThread) <- atomically $ (,)
<$> newTChan
<*> newEmptyTMVar
let invokeWrite x = do
threadDelay toWaitFurther
atomically $ writeTChan outputChan x
writer <- async $ forever $ do
x <- atomically $ readTChan presentedChan
newWriter <- async (invokeWrite x)
mInvoker <- atomically $ tryTakeTMVar writingThread
case mInvoker of
Nothing -> pure ()
Just i -> cancel i
atomically $ putTMVar writingThread newWriter
pure (presentedChan, writer)
throttleStatic :: DiffNanosec -> TChan a -> IO (TChan a, Async ())
throttleStatic toWaitFurther outputChan = do
(presentedChan,writingThread) <- atomically $ (,)
<$> newTChan
<*> newEmptyTMVar
let invokeWrite x = do
threadDelay toWaitFurther
atomically $ writeTChan outputChan x
writer <- async $ forever $ do
x <- atomically $ readTChan presentedChan
mInvoker <- atomically $ tryTakeTMVar writingThread
case mInvoker of
Nothing -> pure ()
Just i -> wait i
newWriter <- async (invokeWrite x)
atomically $ putTMVar writingThread newWriter
pure (presentedChan, writer)
intersperseStatic :: DiffNanosec -> IO a -> TChan a -> IO (TChan a, Async (), Async ())
intersperseStatic timeBetween xM outputChan = do
(presentedChan,writingThread) <- atomically $ (,)
<$> newTChan
<*> newEmptyTMVar
let invokeWritePing = do
threadDelay timeBetween
x <- xM
atomically $ writeTChan outputChan x
writer <- async $ forever $ do
mInvoker <- atomically $ tryTakeTMVar writingThread
case mInvoker of
Nothing -> pure ()
Just i -> wait i
newWriter <- async invokeWritePing
atomically $ putTMVar writingThread newWriter
listener <- async $ forever $ do
(y,mInvoker) <- atomically $ do
y' <- readTChan presentedChan
(\q -> (y',q)) <$> tryTakeTMVar writingThread
case mInvoker of
Nothing -> pure ()
Just i -> cancel i
atomically $ writeTChan outputChan y
pure (presentedChan, writer, listener)