{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE KindSignatures #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Database.Franz.Writer (
    -- * Writer interface
    WriterHandle,
    openWriter,
    closeWriter,
    withWriter,
    write,
    flush,
    getLastSeqNo
    ) where

import Control.Concurrent
import Control.DeepSeq (NFData(..))
import Control.Exception
import Control.Monad
import qualified Data.ByteString.FastBuilder as BB
import qualified Data.Vector.Storable.Mutable as MV
import Data.Foldable (toList)
import Data.Int
import Data.Word (Word64)
import Data.IORef
import Data.Kind (Type)
import Foreign.Ptr (Ptr)
import Foreign.Storable (Storable(..))
import GHC.IO.Handle.FD (openFileBlocking)
import System.Directory
import System.Endian (toLE64)
import System.FilePath
import System.IO

data WriterHandle (f :: Type -> Type) = WriterHandle
  { WriterHandle f -> Handle
hPayload :: !Handle
  , WriterHandle f -> Handle
hOffset :: !Handle -- ^ Handle for offsets and indices
  , WriterHandle f -> MVar (Int, Word64)
vOffset :: !(MVar (Int, Word64)) -- ^ (next sequential number, current payload file size)
  , WriterHandle f -> IOVector Word64
offsetBuf :: !(MV.IOVector Word64) -- ^ pending indices
  , WriterHandle f -> IORef Int
offsetPtr :: !(IORef Int) -- ^ the number of pending indices
  , WriterHandle f -> Int
indexCount :: !Int -- ^ the number of Word64s to write for item
  }
instance NFData (WriterHandle f) where
  rnf :: WriterHandle f -> ()
rnf WriterHandle{} = ()

-- | Get the sequential number of the last item item written.
getLastSeqNo :: WriterHandle f -> IO Int
getLastSeqNo :: WriterHandle f -> IO Int
getLastSeqNo = ((Int, Word64) -> Int) -> IO (Int, Word64) -> IO Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Int -> Int -> Int
forall a. Num a => a -> a -> a
subtract Int
1 (Int -> Int) -> ((Int, Word64) -> Int) -> (Int, Word64) -> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Int, Word64) -> Int
forall a b. (a, b) -> a
fst) (IO (Int, Word64) -> IO Int)
-> (WriterHandle f -> IO (Int, Word64)) -> WriterHandle f -> IO Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. MVar (Int, Word64) -> IO (Int, Word64)
forall a. MVar a -> IO a
readMVar (MVar (Int, Word64) -> IO (Int, Word64))
-> (WriterHandle f -> MVar (Int, Word64))
-> WriterHandle f
-> IO (Int, Word64)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. WriterHandle f -> MVar (Int, Word64)
forall (f :: * -> *). WriterHandle f -> MVar (Int, Word64)
vOffset

openWriter :: Foldable f
  => f String
  -- ^ index names: a fixed-length collection of the names of indices for this stream.
  -- Use Proxy if you don't want any indices. If you want only one type of index, use `Identity ""`.
  -> FilePath
  -> IO (WriterHandle f)
openWriter :: f String -> String -> IO (WriterHandle f)
openWriter f String
idents String
path = do
  Bool -> String -> IO ()
createDirectoryIfMissing Bool
True String
path
  let payloadPath :: String
payloadPath = String
path String -> String -> String
</> String
"payloads"
  let offsetPath :: String
offsetPath = String
path String -> String -> String
</> String
"offsets"
  let indexPath :: String
indexPath = String
path String -> String -> String
</> String
"indices"
  Bool
alreadyExists <- String -> IO Bool
doesFileExist String
payloadPath
  MVar (Int, Word64)
vOffset <- if Bool
alreadyExists
    then do
      Integer
count <- String -> IOMode -> (Handle -> IO Integer) -> IO Integer
forall r. String -> IOMode -> (Handle -> IO r) -> IO r
withFile String
offsetPath IOMode
ReadMode Handle -> IO Integer
hFileSize
      Integer
size <- String -> IOMode -> (Handle -> IO Integer) -> IO Integer
forall r. String -> IOMode -> (Handle -> IO r) -> IO r
withFile String
payloadPath IOMode
ReadMode Handle -> IO Integer
hFileSize
      (Int, Word64) -> IO (MVar (Int, Word64))
forall a. a -> IO (MVar a)
newMVar (Integer -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Integer
count Int -> Int -> Int
forall a. Integral a => a -> a -> a
`div` Int
8, Integer -> Word64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Integer
size)
    else (Int, Word64) -> IO (MVar (Int, Word64))
forall a. a -> IO (MVar a)
newMVar (Int
0,Word64
0)
  String -> String -> IO ()
writeFile String
indexPath (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ [String] -> String
unlines ([String] -> String) -> [String] -> String
forall a b. (a -> b) -> a -> b
$ f String -> [String]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList f String
idents
  -- Open the file in blocking mode because a write on a non-blocking
  -- FD makes use of an unsafe call to write(2), which in turn blocks other
  -- threads when GC runs.
  Handle
hPayload <- String -> IOMode -> IO Handle
openFileBlocking String
payloadPath IOMode
AppendMode
  Handle
hOffset <- String -> IOMode -> IO Handle
openFileBlocking String
offsetPath IOMode
AppendMode
  IOVector Word64
offsetBuf <- Int -> IO (MVector (PrimState IO) Word64)
forall (m :: * -> *) a.
(PrimMonad m, Storable a) =>
Int -> m (MVector (PrimState m) a)
MV.new Int
offsetBufferSize
  IORef Int
offsetPtr <- Int -> IO (IORef Int)
forall a. a -> IO (IORef a)
newIORef Int
0
  let indexCount :: Int
indexCount = f String -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length f String
idents Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1
  WriterHandle f -> IO (WriterHandle f)
forall (m :: * -> *) a. Monad m => a -> m a
return WriterHandle :: forall (f :: * -> *).
Handle
-> Handle
-> MVar (Int, Word64)
-> IOVector Word64
-> IORef Int
-> Int
-> WriterHandle f
WriterHandle{Int
Handle
IORef Int
MVar (Int, Word64)
IOVector Word64
indexCount :: Int
offsetPtr :: IORef Int
offsetBuf :: IOVector Word64
hOffset :: Handle
hPayload :: Handle
vOffset :: MVar (Int, Word64)
indexCount :: Int
offsetPtr :: IORef Int
offsetBuf :: IOVector Word64
vOffset :: MVar (Int, Word64)
hOffset :: Handle
hPayload :: Handle
..}

-- | Flush any pending data and close a 'WriterHandle'.
closeWriter :: WriterHandle f -> IO ()
closeWriter :: WriterHandle f -> IO ()
closeWriter h :: WriterHandle f
h@WriterHandle{Int
Handle
IORef Int
MVar (Int, Word64)
IOVector Word64
indexCount :: Int
offsetPtr :: IORef Int
offsetBuf :: IOVector Word64
vOffset :: MVar (Int, Word64)
hOffset :: Handle
hPayload :: Handle
indexCount :: forall (f :: * -> *). WriterHandle f -> Int
offsetPtr :: forall (f :: * -> *). WriterHandle f -> IORef Int
offsetBuf :: forall (f :: * -> *). WriterHandle f -> IOVector Word64
vOffset :: forall (f :: * -> *). WriterHandle f -> MVar (Int, Word64)
hOffset :: forall (f :: * -> *). WriterHandle f -> Handle
hPayload :: forall (f :: * -> *). WriterHandle f -> Handle
..} = do
  WriterHandle f -> IO ()
forall (f :: * -> *). WriterHandle f -> IO ()
flush WriterHandle f
h
  Handle -> IO ()
hClose Handle
hPayload
  Handle -> IO ()
hClose Handle
hOffset

withWriter :: Foldable f => f String -> FilePath -> (WriterHandle f -> IO a) -> IO a
withWriter :: f String -> String -> (WriterHandle f -> IO a) -> IO a
withWriter f String
idents String
path = IO (WriterHandle f)
-> (WriterHandle f -> IO ()) -> (WriterHandle f -> IO a) -> IO a
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket (f String -> String -> IO (WriterHandle f)
forall (f :: * -> *).
Foldable f =>
f String -> String -> IO (WriterHandle f)
openWriter f String
idents String
path) WriterHandle f -> IO ()
forall (f :: * -> *). WriterHandle f -> IO ()
closeWriter

offsetBufferSize :: Int
offsetBufferSize :: Int
offsetBufferSize = Int
256

write :: Foldable f
  => WriterHandle f
  -> f Int64 -- ^ index values
  -> BB.Builder -- ^ payload
  -> IO Int
write :: WriterHandle f -> f Int64 -> Builder -> IO Int
write h :: WriterHandle f
h@WriterHandle{Int
Handle
IORef Int
MVar (Int, Word64)
IOVector Word64
indexCount :: Int
offsetPtr :: IORef Int
offsetBuf :: IOVector Word64
vOffset :: MVar (Int, Word64)
hOffset :: Handle
hPayload :: Handle
indexCount :: forall (f :: * -> *). WriterHandle f -> Int
offsetPtr :: forall (f :: * -> *). WriterHandle f -> IORef Int
offsetBuf :: forall (f :: * -> *). WriterHandle f -> IOVector Word64
vOffset :: forall (f :: * -> *). WriterHandle f -> MVar (Int, Word64)
hOffset :: forall (f :: * -> *). WriterHandle f -> Handle
hPayload :: forall (f :: * -> *). WriterHandle f -> Handle
..} f Int64
ixs !Builder
bs = MVar (Int, Word64)
-> ((Int, Word64) -> IO ((Int, Word64), Int)) -> IO Int
forall a b. MVar a -> (a -> IO (a, b)) -> IO b
modifyMVar MVar (Int, Word64)
vOffset (((Int, Word64) -> IO ((Int, Word64), Int)) -> IO Int)
-> ((Int, Word64) -> IO ((Int, Word64), Int)) -> IO Int
forall a b. (a -> b) -> a -> b
$ \(Int
n, Word64
ofs) -> do
  Word64
len <- Int -> Word64
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> Word64) -> IO Int -> IO Word64
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Handle -> Builder -> IO Int
BB.hPutBuilderLen Handle
hPayload Builder
bs
  let ofs' :: Word64
ofs' = Word64
ofs Word64 -> Word64 -> Word64
forall a. Num a => a -> a -> a
+ Word64
len
  Int
pos <- IORef Int -> IO Int
forall a. IORef a -> IO a
readIORef IORef Int
offsetPtr
  Int
pos' <- if Int
pos Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
indexCount Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
offsetBufferSize
    then Int
0 Int -> IO () -> IO Int
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ WriterHandle f -> IO ()
forall (f :: * -> *). WriterHandle f -> IO ()
unsafeFlush WriterHandle f
h
    else Int -> IO Int
forall (m :: * -> *) a. Monad m => a -> m a
return Int
pos
  MVector (PrimState IO) Word64 -> Int -> Word64 -> IO ()
forall (m :: * -> *) a.
(PrimMonad m, Storable a) =>
MVector (PrimState m) a -> Int -> a -> m ()
MV.write IOVector Word64
MVector (PrimState IO) Word64
offsetBuf Int
pos' (Word64 -> IO ()) -> Word64 -> IO ()
forall a b. (a -> b) -> a -> b
$ Word64 -> Word64
toLE64 (Word64 -> Word64) -> Word64 -> Word64
forall a b. (a -> b) -> a -> b
$ Word64 -> Word64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word64
ofs'
  [(Int, Int64)] -> ((Int, Int64) -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ ([Int] -> [Int64] -> [(Int, Int64)]
forall a b. [a] -> [b] -> [(a, b)]
zip [Int
pos'Int -> Int -> Int
forall a. Num a => a -> a -> a
+Int
1..] (f Int64 -> [Int64]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList f Int64
ixs))
    (((Int, Int64) -> IO ()) -> IO ())
-> ((Int, Int64) -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \(Int
i, Int64
v) -> MVector (PrimState IO) Word64 -> Int -> Word64 -> IO ()
forall (m :: * -> *) a.
(PrimMonad m, Storable a) =>
MVector (PrimState m) a -> Int -> a -> m ()
MV.write IOVector Word64
MVector (PrimState IO) Word64
offsetBuf Int
i (Word64 -> IO ()) -> Word64 -> IO ()
forall a b. (a -> b) -> a -> b
$ Word64 -> Word64
toLE64 (Word64 -> Word64) -> Word64 -> Word64
forall a b. (a -> b) -> a -> b
$ Int64 -> Word64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int64
v
  IORef Int -> Int -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef Int
offsetPtr (Int
pos' Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
indexCount)

  let !n' :: Int
n' = Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1
  ((Int, Word64), Int) -> IO ((Int, Word64), Int)
forall (m :: * -> *) a. Monad m => a -> m a
return ((Int
n', Word64
ofs'), Int
n)
{-# INLINE write #-}

-- | Flush the changes.
flush :: WriterHandle f -> IO ()
flush :: WriterHandle f -> IO ()
flush WriterHandle f
h = MVar (Int, Word64) -> ((Int, Word64) -> IO ()) -> IO ()
forall a b. MVar a -> (a -> IO b) -> IO b
withMVar (WriterHandle f -> MVar (Int, Word64)
forall (f :: * -> *). WriterHandle f -> MVar (Int, Word64)
vOffset WriterHandle f
h) (((Int, Word64) -> IO ()) -> IO ())
-> ((Int, Word64) -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ IO () -> (Int, Word64) -> IO ()
forall a b. a -> b -> a
const (IO () -> (Int, Word64) -> IO ())
-> IO () -> (Int, Word64) -> IO ()
forall a b. (a -> b) -> a -> b
$ WriterHandle f -> IO ()
forall (f :: * -> *). WriterHandle f -> IO ()
unsafeFlush WriterHandle f
h

-- | Flush the change without locking 'vOffset'
unsafeFlush :: WriterHandle f -> IO ()
unsafeFlush :: WriterHandle f -> IO ()
unsafeFlush WriterHandle{Int
Handle
IORef Int
MVar (Int, Word64)
IOVector Word64
indexCount :: Int
offsetPtr :: IORef Int
offsetBuf :: IOVector Word64
vOffset :: MVar (Int, Word64)
hOffset :: Handle
hPayload :: Handle
indexCount :: forall (f :: * -> *). WriterHandle f -> Int
offsetPtr :: forall (f :: * -> *). WriterHandle f -> IORef Int
offsetBuf :: forall (f :: * -> *). WriterHandle f -> IOVector Word64
vOffset :: forall (f :: * -> *). WriterHandle f -> MVar (Int, Word64)
hOffset :: forall (f :: * -> *). WriterHandle f -> Handle
hPayload :: forall (f :: * -> *). WriterHandle f -> Handle
..}  = do
  -- NB it's important to write the payload and indices prior to offsets
  -- because the reader watches the offset file then consume other files.
  -- Because of this, offsets are buffered in a buffer.
  Int
len <- IORef Int -> IO Int
forall a. IORef a -> IO a
readIORef IORef Int
offsetPtr
  Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
len Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
    Handle -> IO ()
hFlush Handle
hPayload
    IOVector Word64 -> (Ptr Word64 -> IO ()) -> IO ()
forall a b. Storable a => IOVector a -> (Ptr a -> IO b) -> IO b
MV.unsafeWith IOVector Word64
offsetBuf ((Ptr Word64 -> IO ()) -> IO ()) -> (Ptr Word64 -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Ptr Word64
ptr -> Handle -> Ptr Word64 -> Int -> IO ()
forall a. Storable a => Handle -> Ptr a -> Int -> IO ()
hPutElems Handle
hOffset Ptr Word64
ptr Int
len
    IORef Int -> Int -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef Int
offsetPtr Int
0
    Handle -> IO ()
hFlush Handle
hOffset

-- | Same as hPutBuf but with a number of elements to write instead of a number
-- of bytes.
hPutElems :: forall a. Storable a => Handle -> Ptr a -> Int -> IO ()
hPutElems :: Handle -> Ptr a -> Int -> IO ()
hPutElems Handle
hdl Ptr a
ptr Int
len = Handle -> Ptr a -> Int -> IO ()
forall a. Handle -> Ptr a -> Int -> IO ()
hPutBuf Handle
hdl Ptr a
ptr (Int
len Int -> Int -> Int
forall a. Num a => a -> a -> a
* a -> Int
forall a. Storable a => a -> Int
sizeOf (a
forall a. HasCallStack => a
undefined :: a))
{-# INLINE hPutElems #-}