{-# LANGUAGE NamedFieldPuns #-} -- | -- A buffer for batched write operations. Push individual items into the buffer -- and provide an operation that writes out batches of them. module Data.Buffer ( -- * Buffer Buffer, new, push, flush, -- * Settings Settings, defaultSettings, write, size, frequencyInMicroSeconds, ) where import qualified Control.Concurrent.STM as STM import qualified Control.Debounce as Debounce import qualified Data.List.NonEmpty as NonEmpty import GHC.Natural (Natural) -- | A buffer for write operations. data Buffer a = Buffer { -- | Push a new item into the buffer. push :: a -> IO (), -- | Flush the current contents of the buffer. flush :: IO () } -- | Configuration settings for a new 'Buffer'. data Settings a = Settings { -- | Function to write a batch of items. write :: NonEmpty.NonEmpty a -> IO (), -- | The maximum amount of items to write in a single batch. size :: Natural, -- | The frequency with which the buffer gets flushed automatically. frequencyInMicroSeconds :: Int } -- | Default 'Buffer' settings. A buffer created by these settings has a size -- of 1 and writes items out as soon as they come in. defaultSettings :: Settings a defaultSettings = Settings { write = \_ -> pure (), size = 1, frequencyInMicroSeconds = 0 } -- | Creates a new 'Buffer'. new :: Settings a -> IO (Buffer a) new settings = do queue <- STM.atomically $ STM.newTBQueue (fromIntegral (size settings)) let writeList xs = maybe (pure ()) (write settings) (NonEmpty.nonEmpty xs) let flush = writeList =<< STM.atomically (STM.flushTBQueue queue) scheduleFlush <- Debounce.mkDebounce Debounce.defaultDebounceSettings { Debounce.debounceAction = flush, Debounce.debounceFreq = frequencyInMicroSeconds settings } let push x = do overflow <- STM.atomically $ do full <- STM.isFullTBQueue queue toFlush <- if full then STM.flushTBQueue queue else pure [] STM.writeTBQueue queue x pure toFlush -- Flush the items that overflow the queue immediately. writeList overflow -- Schedule a regular flush for the rest. scheduleFlush pure Buffer {push, flush}