{-# LANGUAGE FlexibleContexts      #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE RecordWildCards       #-}

module WriteBuffer
    ( module WriteBuffer
    , module X
    ) where

import           Control.Concurrent.Lifted
import           Control.Concurrent.STM          as X
import           Control.Concurrent.STM.TBMQueue as X
import           Control.Monad.Catch
import           Control.Monad.Reader
import           Control.Monad.Trans.Control
import           Data.ByteString                 (ByteString)
import           Data.DList
import           Data.Int
import           Data.IORef.Lifted
import           System.Clock
import           System.IO
import           System.Timeout

data WriteBufferOpts rec m
    = WriteBufferOpts
    { maxBufferSize    :: Int
    , maxTimeToWait    :: Integer
    , bufferInputQueue :: TBMQueue rec
    , onError          :: SomeException -> m ()
    , saveRecords      :: [rec] -> m ()
    }

makeBufferOpts :: MonadIO m => TBMQueue rec -> ([rec] -> m ()) -> WriteBufferOpts rec m
makeBufferOpts bufferInputQueue saveRecords = WriteBufferOpts {..}
  where
    maxBufferSize = 1000
    maxTimeToWait = 5 * 10^9
    onError = liftIO . print

runWriteBuffer :: (MonadCatch m, MonadBaseControl IO m, MonadIO m) => WriteBufferOpts rec m -> m ThreadId
runWriteBuffer WriteBufferOpts{..} = fork $ do
    timeRef <- newIORef =<< liftIO (getTime Monotonic)
    forever $ loop empty 0 timeRef
  where
    loop inputs len timeRef
        | len >= maxBufferSize = do
            saveRecords (toList inputs)
            loop empty 0 timeRef
        | otherwise = do
            earlier <- liftIO $ readIORef timeRef
            now <- liftIO $ getTime Monotonic
            let diff = diffTimeSpec now earlier
            if toNanoSecs diff >= maxTimeToWait then do
                liftIO $ writeIORef timeRef now
                saveRecords (toList inputs) `catch` onError
                loop empty 0 timeRef
            else do
                mminput <- liftIO
                    $ timeout (fromIntegral maxTimeToWait `div` 1000)
                    $ atomically
                    $ readTBMQueue bufferInputQueue
                case mminput of
                    Nothing ->  -- timeout
                        loop inputs len timeRef
                    Just minput ->
                        case minput of
                            Nothing -> -- closed queue
                                pure ()
                            Just input ->
                                loop (inputs `snoc` input) (len + 1) timeRef