module WriteBuffer
( module WriteBuffer
, module X
) where
import Control.Concurrent.Lifted
import Control.Concurrent.STM as X
import Control.Concurrent.STM.TBMQueue as X
import Control.Monad.Catch
import Control.Monad.Reader
import Control.Monad.Trans.Control
import Data.ByteString (ByteString)
import Data.DList
import Data.Int
import Data.IORef.Lifted
import System.Clock
import System.IO
import System.Timeout
data WriteBufferOpts rec m
= WriteBufferOpts
{ maxBufferSize :: Int
, maxTimeToWait :: Integer
, bufferInputQueue :: TBMQueue rec
, onError :: SomeException -> m ()
, saveRecords :: [rec] -> m ()
}
makeBufferOpts :: MonadIO m => TBMQueue rec -> ([rec] -> m ()) -> WriteBufferOpts rec m
makeBufferOpts bufferInputQueue saveRecords = WriteBufferOpts {..}
where
maxBufferSize = 1000
maxTimeToWait = 5 * 10^9
onError = liftIO . print
runWriteBuffer :: (MonadCatch m, MonadBaseControl IO m, MonadIO m) => WriteBufferOpts rec m -> m ThreadId
runWriteBuffer WriteBufferOpts{..} = fork $ do
timeRef <- newIORef =<< liftIO (getTime Monotonic)
forever $ loop empty 0 timeRef
where
loop inputs len timeRef
| len >= maxBufferSize = do
saveRecords (toList inputs)
loop empty 0 timeRef
| otherwise = do
earlier <- liftIO $ readIORef timeRef
now <- liftIO $ getTime Monotonic
let diff = diffTimeSpec now earlier
if toNanoSecs diff >= maxTimeToWait then do
liftIO $ writeIORef timeRef now
saveRecords (toList inputs) `catch` onError
loop empty 0 timeRef
else do
mminput <- liftIO
$ timeout (fromIntegral maxTimeToWait `div` 1000)
$ atomically
$ readTBMQueue bufferInputQueue
case mminput of
Nothing ->
loop inputs len timeRef
Just minput ->
case minput of
Nothing ->
pure ()
Just input ->
loop (inputs `snoc` input) (len + 1) timeRef