module Control.Concurrent.STM.Batch
( Batch
, newBatch
, writeBatch
, flushBatch
, fromMilliSecs
, fromSecs
, fromMicroSecs
, TimeSpec(..)
) where
import Data.Maybe (isJust, fromJust)
import System.Clock
import Control.Concurrent (forkIO, threadDelay)
import Control.Monad (void, when, forever, unless)
import Control.Concurrent.STM
import Control.Concurrent.STM.TVar
import Control.Concurrent.STM.TMVar
data Batch a = Batch
{ batchAcc :: TVar [a]
, batchLength :: TVar Int
, batchLimit :: Int
, batchTimeout :: Maybe TimeSpec
, batchStarted :: TMVar TimeSpec
, batchHandler :: [a] -> STM ()
}
newBatch ::
Int
-> Maybe TimeSpec
-> ([a] -> STM ())
-> IO (Batch a)
newBatch batchLimit' batchTimeout' batchHandler' = do
batchLength' <- newTVarIO 0
batchAcc' <- newTVarIO []
batchStarted' <- newEmptyTMVarIO
let
batch = Batch
{ batchAcc = batchAcc'
, batchLength = batchLength'
, batchLimit = batchLimit'
, batchTimeout = batchTimeout'
, batchStarted = batchStarted'
, batchHandler = batchHandler'
}
when (isJust batchTimeout') $ void $ forkIO $ timeoutHandler batch
return batch
flushBatch :: Batch a -> STM ()
flushBatch ctx = do
acc <- readTVar $ batchAcc ctx
when (not $ null acc) $ batchHandler ctx acc
void $ takeTMVar $ batchStarted ctx
writeTVar (batchAcc ctx) []
writeTVar (batchLength ctx) 0
writeBatch :: Batch a -> a -> IO ()
writeBatch ctx item = do
batchInitial <- atomically $ do
modifyTVar' (batchAcc ctx) (item :)
modifyTVar' (batchLength ctx) (+ 1)
len <- readTVar $ batchLength ctx
unless (len < batchLimit ctx) $ flushBatch ctx
return $ len == 1
when (batchInitial && batchLimit ctx > 1) $ do
now <- getTime Monotonic
atomically $ putTMVar (batchStarted ctx) now
timeoutHandler :: Batch a -> IO ()
timeoutHandler ctx = let timeout = fromJust (batchTimeout ctx) in forever $ do
now <- getTime Monotonic
started <- atomically $ tryReadTMVar $ batchStarted ctx
case started of
Nothing -> threadDelay $ fromIntegral $ toMicroSecs now
Just t -> if now - t < timeout
then threadDelay $ fromIntegral $ toMicroSecs $ timeout + t - now
else atomically $ flushBatch ctx
fromMilliSecs :: Integer -> TimeSpec
fromMilliSecs ts = fromNanoSecs $ 1000000 * ts
fromSecs :: Integer -> TimeSpec
fromSecs ts = TimeSpec (fromIntegral ts) 0
fromMicroSecs :: Integer -> TimeSpec
fromMicroSecs ts = fromNanoSecs $ 1000 * ts
toMicroSecs :: TimeSpec -> Integer
toMicroSecs ts = 1000 `quot` toNanoSecs ts