{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE KindSignatures #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Database.Franz (
    -- * Writer interface
    WriterHandle,
    openWriter,
    closeWriter,
    withWriter,
    write,
    flush,
    getLastSeqNo
    ) where

import Control.Concurrent
import Control.Exception
import Control.Monad
import qualified Data.ByteString.FastBuilder as BB
import qualified Data.Vector.Storable.Mutable as MV
import Data.Foldable (toList)
import Data.Int
import Data.Word (Word64)
import Data.IORef
import Data.Kind (Type)
import Foreign.Ptr (Ptr)
import Foreign.Storable (Storable(..))
import GHC.IO.Handle.FD (openFileBlocking)
import System.Directory
import System.Endian (toLE64)
import System.FilePath
import System.IO

data WriterHandle (f :: Type -> Type) = WriterHandle
  { hPayload :: Handle
  , hOffset :: Handle -- ^ Handle for offsets and indices
  , vOffset :: MVar (Int, Word64) -- ^ (next sequential number, current payload file size)
  , offsetBuf :: MV.IOVector Word64 -- ^ pending indices
  , offsetPtr :: IORef Int -- ^ the number of pending indices
  , indexCount :: Int -- ^ the number of Word64s to write for item
  }

-- | Get the sequential number of the last item item written.
getLastSeqNo :: WriterHandle f -> IO Int
getLastSeqNo = fmap (subtract 1 . fst) . readMVar . vOffset

openWriter :: Foldable f
  => f String
  -- ^ index names: a fixed-length collection of the names of indices for this stream.
  -- Use Proxy if you don't want any indices. If you want only one type of index, use `Identity ""`.
  -> FilePath
  -> IO (WriterHandle f)
openWriter idents path = do
  createDirectoryIfMissing True path
  let payloadPath = path </> "payloads"
  let offsetPath = path </> "offsets"
  let indexPath = path </> "indices"
  alreadyExists <- doesFileExist payloadPath
  vOffset <- if alreadyExists
    then do
      count <- withFile offsetPath ReadMode hFileSize
      size <- withFile payloadPath ReadMode hFileSize
      newMVar (fromIntegral count `div` 8, fromIntegral size)
    else newMVar (0,0)
  writeFile indexPath $ unlines $ toList idents
  -- Open the file in blocking mode because a write on a non-blocking
  -- FD makes use of an unsafe call to write(2), which in turn blocks other
  -- threads when GC runs.
  hPayload <- openFileBlocking payloadPath AppendMode
  hOffset <- openFileBlocking offsetPath AppendMode
  offsetBuf <- MV.new offsetBufferSize
  offsetPtr <- newIORef 0
  let indexCount = length idents + 1
  return WriterHandle{..}

-- | Flush any pending data and close a 'WriterHandle'.
closeWriter :: Foldable f => WriterHandle f -> IO ()
closeWriter h@WriterHandle{..} = do
  flush h
  hClose hPayload
  hClose hOffset

withWriter :: Foldable f => f String -> FilePath -> (WriterHandle f -> IO a) -> IO a
withWriter idents path = bracket (openWriter idents path) closeWriter

offsetBufferSize :: Int
offsetBufferSize = 256

write :: Foldable f
  => WriterHandle f
  -> f Int64 -- ^ index values
  -> BB.Builder -- ^ payload
  -> IO Int
write h@WriterHandle{..} ixs !bs = modifyMVar vOffset $ \(n, ofs) -> do
  len <- fromIntegral <$> BB.hPutBuilderLen hPayload bs
  let ofs' = ofs + len
  pos <- readIORef offsetPtr
  pos' <- if pos + indexCount >= offsetBufferSize
    then 0 <$ unsafeFlush h
    else return pos
  MV.write offsetBuf pos' $ toLE64 $ fromIntegral ofs'
  forM_ (zip [pos'+1..] (toList ixs))
    $ \(i, v) -> MV.write offsetBuf i $ toLE64 $ fromIntegral v
  writeIORef offsetPtr (pos' + indexCount)

  let !n' = n + 1
  return ((n', ofs'), n)
{-# INLINE write #-}

-- | Flush the changes.
flush :: WriterHandle f -> IO ()
flush h = withMVar (vOffset h) $ const $ unsafeFlush h

-- | Flush the change without locking 'vOffset'
unsafeFlush :: WriterHandle f -> IO ()
unsafeFlush WriterHandle{..}  = do
  -- NB it's important to write the payload and indices prior to offsets
  -- because the reader watches the offset file then consume other files.
  -- Because of this, offsets are buffered in a buffer.
  len <- readIORef offsetPtr
  when (len > 0) $ do
    hFlush hPayload
    MV.unsafeWith offsetBuf $ \ptr -> hPutElems hOffset ptr len
    writeIORef offsetPtr 0
    hFlush hOffset

-- | Same as hPutBuf but with a number of elements to write instead of a number
-- of bytes.
hPutElems :: forall a. Storable a => Handle -> Ptr a -> Int -> IO ()
hPutElems hdl ptr len = hPutBuf hdl ptr (len * sizeOf (undefined :: a))
{-# INLINE hPutElems #-}