{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE BangPatterns #-}
module Database.Franz.Reader where
import Control.Concurrent
import Control.Concurrent.STM
import Control.Exception
import Control.Monad
import Control.Monad.State.Strict
import Control.Monad.Trans.Maybe
import Data.Serialize
import Database.Franz.Protocol
import qualified Data.ByteString.Char8 as B
import qualified Data.HashMap.Strict as HM
import qualified Data.IntMap.Strict as IM
import qualified Data.Vector.Unboxed as U
import qualified Data.Vector as V
import Data.Void
import Data.Maybe (isJust)
import GHC.Clock (getMonotonicTime)
import System.Directory
import System.FilePath
import System.IO
import System.FSNotify
data Stream = Stream
{ vOffsets :: !(TVar (IM.IntMap Int))
, indexNames :: ![B.ByteString]
, indices :: !(HM.HashMap B.ByteString (TVar (IM.IntMap Int)))
, vCount :: !(TVar Int)
, vCaughtUp :: !(TVar Bool)
, followThread :: !ThreadId
, indexHandle :: !Handle
, payloadHandle :: !Handle
, vActivity :: !(TVar Activity)
}
type Activity = Either Double Int
addActivity :: Stream -> STM ()
addActivity str = modifyTVar' (vActivity str) $ \case
Left _ -> Right 0
Right n -> Right (n + 1)
removeActivity :: Stream -> IO ()
removeActivity str = do
now <- getMonotonicTime
atomically $ modifyTVar' (vActivity str) $ \case
Left _ -> Left now
Right n
| n <= 0 -> Left now
| otherwise -> Right (n - 1)
closeStream :: Stream -> IO ()
closeStream Stream{..} = do
killThread followThread
hClose payloadHandle
hClose indexHandle
createStream :: WatchManager -> FilePath -> IO Stream
createStream man path = do
let offsetPath = path </> "offsets"
let payloadPath = path </> "payloads"
exist <- doesFileExist offsetPath
unless exist $ throwIO $ StreamNotFound offsetPath
initialOffsetsBS <- B.readFile offsetPath
payloadHandle <- openBinaryFile payloadPath ReadMode
indexNames <- B.lines <$> B.readFile (path </> "indices")
let icount = 1 + length indexNames
let count = B.length initialOffsetsBS `div` (8 * icount)
let getI = fromIntegral <$> getInt64le
initialIndices <- either (throwIO . InternalError) pure
$ runGet (V.replicateM count $ U.replicateM icount getI) initialOffsetsBS
let initialOffsets = IM.fromList $ V.toList
$ V.zip (V.enumFromN 0 count) $ V.map U.head initialIndices
vOffsets <- newTVarIO $! initialOffsets
vCaughtUp <- newTVarIO False
vCount <- newTVarIO $! IM.size initialOffsets
_ <- watchDir man path (\case
Modified p _ _ | p == offsetPath -> True
_ -> False)
$ const $ atomically $ writeTVar vCaughtUp False
vIndices <- forM [1..length indexNames] $ \i -> newTVarIO
$ IM.fromList $ V.toList $ V.zip (V.map (U.! i) initialIndices) (V.enumFromN 0 count)
indexHandle <- openFile offsetPath ReadMode
let final :: Either SomeException Void -> IO ()
final (Left exc) | Just ThreadKilled <- fromException exc = pure ()
final (Left exc) = logFollower [path, "terminated with", show exc]
final (Right v) = absurd v
followThread <- flip forkFinally final $ do
forM_ (IM.maxViewWithKey initialOffsets) $ \((i, _), _) -> do
hSeek indexHandle AbsoluteSeek $ fromIntegral $ succ i * icount * 8
forever $ do
bs <- B.hGet indexHandle (8 * icount)
if B.null bs
then do
atomically $ writeTVar vCaughtUp True
atomically $ readTVar vCaughtUp >>= check . not
else do
ofs : indices <- either (throwIO . InternalError) pure $ runGet (replicateM icount getI) bs
atomically $ do
i <- readTVar vCount
modifyTVar' vOffsets $ IM.insert i ofs
forM_ (zip vIndices indices) $ \(v, x) -> modifyTVar' v $ IM.insert (fromIntegral x) i
writeTVar vCount $! i + 1
let indices = HM.fromList $ zip indexNames vIndices
vActivity <- getMonotonicTime >>= newTVarIO . Left
return Stream{..}
where
logFollower = hPutStrLn stderr . unwords . (:) "[follower]"
type QueryResult = ((Int, Int)
, (Int, Int))
range :: Int
-> Int
-> RequestType
-> IM.IntMap Int
-> (Bool, QueryResult)
range begin end rt allOffsets = case rt of
AllItems -> (ready, (firstItem, maybe firstItem fst $ IM.maxViewWithKey body))
LastItem -> case IM.maxViewWithKey body of
Nothing -> (False, (zero, zero))
Just (ofs', r) -> case IM.maxViewWithKey (IM.union left r) of
Just (ofs, _) -> (ready, (ofs, ofs'))
Nothing -> (ready, (zero, ofs'))
where
zero = (-1, 0)
ready = isJust lastItem || not (null cont)
(wing, lastItem, cont) = IM.splitLookup end allOffsets
(left, body) = splitR begin $ maybe id (IM.insert end) lastItem wing
firstItem = maybe zero fst $ IM.maxViewWithKey left
splitR :: Int -> IM.IntMap a -> (IM.IntMap a, IM.IntMap a)
splitR i m = let (l, p, r) = IM.splitLookup i m in (l, maybe id (IM.insert i) p r)
data FranzReader = FranzReader
{ watchManager :: WatchManager
, vStreams :: TVar (HM.HashMap FilePath (HM.HashMap B.ByteString Stream))
}
data ReaperState = ReaperState
{
prunedStreams :: !Int
, totalStreams :: !Int
}
reaper :: Double
-> Double
-> FranzReader -> IO ()
reaper int life FranzReader{..} = forever $ do
now <- getMonotonicTime
let shouldPrune (Left t) = now - t >= life
shouldPrune _ = False
tryPrune :: FilePath -> B.ByteString -> STM (Maybe Stream)
tryPrune mPath sPath = runMaybeT $ do
currentAllStreams <- lift $ readTVar vStreams
currentStreams <- MaybeT . pure $ HM.lookup mPath currentAllStreams
currentStream <- MaybeT . pure $ HM.lookup sPath currentStreams
currentAct <- lift $ readTVar (vActivity currentStream)
guard $ shouldPrune currentAct
let newStreams = HM.delete sPath currentStreams
lift . writeTVar vStreams $ if HM.null newStreams
then HM.delete mPath currentAllStreams
else HM.insert mPath newStreams currentAllStreams
pure currentStream
allStreams <- readTVarIO vStreams
stats <- flip execStateT (ReaperState 0 0) $
forM_ (HM.toList allStreams) $ \(mPath, streams) -> do
forM_ (HM.toList streams) $ \(sPath, stream) -> do
modify' $ \s -> s { totalStreams = totalStreams s + 1 }
snapAct <- lift $ readTVarIO (vActivity stream)
when (shouldPrune snapAct) $ do
deletedStream'm <- lift . atomically $ tryPrune mPath sPath
forM_ deletedStream'm $ \prunedStream -> do
lift $ closeStream prunedStream
modify' $ \s -> s { prunedStreams = prunedStreams s + 1 }
when (prunedStreams stats > 0) $ hPutStrLn stderr $ unwords
[ "[reaper] closed"
, show (prunedStreams stats)
, "out of"
, show (totalStreams stats)
]
threadDelay $ floor $ int * 1e6
withFranzReader :: (FranzReader -> IO ()) -> IO ()
withFranzReader k = do
vStreams <- newTVarIO HM.empty
withManager $ \watchManager -> k FranzReader{..}
handleQuery :: FilePath
-> FranzReader
-> FilePath
-> Query
-> (Stream -> STM (Bool, QueryResult) -> IO r) -> IO r
handleQuery prefix FranzReader{..} dir (Query name begin_ end_ rt) cont
= bracket acquire removeActivity
$ \stream@Stream{..} -> cont stream $ do
readTVar vCaughtUp >>= check
allOffsets <- readTVar vOffsets
let finalOffset = case IM.maxViewWithKey allOffsets of
Just ((k, _), _) -> k + 1
Nothing -> 0
let rotate i
| i < 0 = finalOffset + i
| otherwise = i
begin <- case begin_ of
BySeqNum i -> pure $ rotate i
ByIndex index val -> case HM.lookup index indices of
Nothing -> throwSTM $ IndexNotFound index $ HM.keys indices
Just v -> do
m <- readTVar v
let (_, wing) = splitR val m
return $! maybe maxBound fst $ IM.minView wing
end <- case end_ of
BySeqNum i -> pure $ rotate i
ByIndex index val -> case HM.lookup index indices of
Nothing -> throwSTM $ IndexNotFound index $ HM.keys indices
Just v -> do
m <- readTVar v
let (body, lastItem, _) = IM.splitLookup val m
let body' = maybe id (IM.insert val) lastItem body
return $! maybe minBound fst $ IM.maxView body'
return $! range begin end rt allOffsets
where
acquire = join $ atomically $ do
allStreams <- readTVar vStreams
let !path = prefix </> dir </> B.unpack name
let !streams = maybe mempty id $ HM.lookup dir allStreams
case HM.lookup name streams of
Nothing -> pure $ bracketOnError
(createStream watchManager path)
closeStream $ \s -> atomically $ do
addActivity s
modifyTVar' vStreams $ HM.insert dir $ HM.insert name s streams
pure s
Just s -> do
addActivity s
pure (pure s)