-- | read-write lock specialized for using LMDB with MDB_NOLOCK option -- module Database.VCache.RWLock ( RWLock , newRWLock , withRWLock , withRdOnlyLock ) where import Control.Monad import Control.Exception import Control.Concurrent.MVar import Data.IORef import Data.IntSet (IntSet) import qualified Data.IntSet as IntSet -- | RWLock -- -- VCache uses LMDB with the MDB_NOLOCK option, mostly because I don't -- want to deal with the whole issue of OS bound threads or a limit on -- number of concurrent readers. Without locks, we essentially have one -- valid snapshot. The writer can begin dismantling earlier snapshots -- as needed to allocate pages. -- -- RWLock essentially enforces this sort of frame-buffer concurrency. data RWLock = RWLock { rwlock_frames :: !(MVar FB) , rwlock_writer :: !(MVar ()) -- enforce single writer } data FB = FB !F !F type F = IORef Frame data Frame = Frame { frame_reader_next :: {-# UNPACK #-} !Int , frame_readers :: !IntSet , frame_onClear :: ![IO ()] -- actions to perform } frame0 :: Frame frame0 = Frame 1 IntSet.empty [] newRWLock :: IO RWLock newRWLock = liftM2 RWLock (newMVar =<< newF2) newEmptyMVar where newF2 :: IO FB newF2 = liftM2 FB newF newF newF :: IO F newF = newIORef frame0 withWriterMutex :: RWLock -> IO a -> IO a withWriterMutex l = bracket_ getLock dropLock where getLock = putMVar (rwlock_writer l) () dropLock = takeMVar (rwlock_writer l) {-# INLINE withWriterMutex #-} -- | Grab the current read-write lock for the duration of -- an underlying action. This may wait on older readers. withRWLock :: RWLock -> IO a -> IO a withRWLock l action = withWriterMutex l $ do oldFrame <- rotateReaderFrames l mvWait <- newEmptyMVar onFrameCleared oldFrame (putMVar mvWait ()) takeMVar mvWait action -- rotate a fresh reader frame, and grab the oldest. -- Thus should only be performed while holding the writer lock. rotateReaderFrames :: RWLock -> IO F rotateReaderFrames l = mask_ $ do let var = rwlock_frames l f0 <- newF (FB f1 f2) <- takeMVar var putMVar var (FB f0 f1) return f2 -- -- NOTE: Each of these 'frames' actually contains readers of two -- transactions. Alignment between LMDB transactions and VCache -- RWLock isn't exact. -- -- Each write lock will rotate reader frames just once: -- -- (f1,f2) → (f0,f1) returning f2 -- -- Writer is working on LMDB frame N. -- -- f0 will have readers for frame N-1 and (after commit) for N. -- f1 will have readers for frame N-2 and some for N-1. -- f2 will have readers for frame N-3 and some for N-2. -- -- LMDB guarantees that the data pages for frames N-1 and N-2 are -- intact. However, frame N-3 will be dismantled while building -- frame N. Thus, we must wait for f2 readers to finish before we -- begin the writer N transaction. -- -- If we assume short-running readers and long-running writers, it -- is rare that the writer ever needs to wait on readers. Readers -- never need to wait on the writer. This assumption is achieved by -- batching writes in VCache. -- -- perform some action when a frame is cleared -- performs immediately, if possible. onFrameCleared :: F -> IO () -> IO () onFrameCleared f action = atomicModifyIORef f addAction >>= id where addAction frame = let bAlreadyClear = IntSet.null (frame_readers frame) in if bAlreadyClear then (frame0,action) else let onClear' = action : frame_onClear frame in let frame' = frame { frame_onClear = onClear' } in (frame', return ()) -- | Grab a read-only lock for the duration of some IO action. -- -- Readers never need to wait on the writer. withRdOnlyLock :: RWLock -> IO a -> IO a withRdOnlyLock l = bracket (newReader l) releaseReader . const newtype Reader = Reader { releaseReader :: IO () } -- obtains a reader handle; returns function to release reader. newReader :: RWLock -> IO Reader newReader l = mask_ $ do let var = rwlock_frames l fb@(FB f _) <- takeMVar var r <- atomicModifyIORef f addReader putMVar var fb return (Reader (delReader f r)) addReader :: Frame -> (Frame, Int) addReader f = let r = frame_reader_next f in let rdrs' = IntSet.insert r (frame_readers f) in let f' = f { frame_reader_next = (r + 1) , frame_readers = rdrs' } in (f', r) delReader :: F -> Int -> IO () delReader f r = atomicModifyIORef f del >>= sequence_ where del frm = let rdrs' = IntSet.delete r (frame_readers frm) in if IntSet.null rdrs' then (frame0, frame_onClear frm) else let frm' = frm { frame_readers = rdrs' } in (frm', [])