module Database.VCache.Write
( writeStep
) where
import Control.Monad
import Control.Exception
import Control.Concurrent
import Control.Concurrent.STM
import Data.IORef
import Data.Map (Map)
import qualified Data.Map as Map
import qualified Data.List as L
import Data.ByteString (ByteString)
import qualified Data.ByteString as BS
import Foreign.Ptr
import Foreign.Storable
import Foreign.Marshal.Alloc
import Database.LMDB.Raw
import Database.VCache.Types
import Database.VCache.VPutFini
import Database.VCache.VGetInit
import Database.VCache.RWLock
import Database.VCache.Refct
import Database.VCache.Hash
import Database.VCache.Aligned
type RefctDiff = Map Address Refct
type WriteBatch = Map Address WriteCell
type WriteCell = (ByteString, [PutChild])
type GCBatch = WriteBatch
type UpdSeek = Map ByteString [Address]
addrSize :: Int
addrSize = sizeOf (undefined :: Address)
writeStep :: VSpace -> IO ()
writeStep vc = withRWLock (vcache_rwlock vc) $ do
takeMVar (vcache_signal vc)
ws <- atomically (takeWrites (vcache_writes vc))
wb <- seralizeWrites (write_data ws)
afrm <- allocFrameStep vc
let allocInit = alloc_init afrm
let ab = fmap fnWriteAlloc (alloc_list afrm)
let ub = Map.union wb ab
txn <- mdb_txn_begin (vcache_db_env vc) Nothing False
let gcLimit = 1000 + 2 * Map.size ub
gcb <- runGarbageCollector vc txn gcLimit
let fb = Map.union gcb ub
let bUpdGCSep = Map.size fb == (Map.size ub + Map.size gcb)
unless bUpdGCSep (fail "VCache bug: overlapping GC and update targets")
(UpdateNotes rcDiff hsDel) <- updateVirtualMemory vc txn allocInit fb
let rcAlloc = fmap (\ an -> if isNewRoot an then 1 else 0) (alloc_list afrm)
let rcUpd = Map.unionWith (\ a b -> (a + b)) rcDiff rcAlloc
updateReferenceCounts vc txn allocInit rcUpd
let hsAlloc = fmap (fmap alloc_addr) (alloc_seek afrm)
let hsUpd = Map.unionWith (++) hsDel hsAlloc
writeSecondaryIndexes vc txn allocInit hsUpd
mdb_txn_commit txn
modifyIORef' (vcache_gc_count vc) (+ (Map.size gcb))
vcache_signal_writes vc ws
mapM_ syncSignal (write_sync ws)
allocFrameStep :: VSpace -> IO AllocFrame
allocFrameStep vc = modifyMVarMasked (vcache_memory vc) $ \ m -> do
let ac = mem_alloc m
let addr = alloc_new_addr ac
let ac' = Allocator
{ alloc_new_addr = addr
, alloc_frm_next = AllocFrame Map.empty Map.empty addr
, alloc_frm_curr = alloc_frm_next ac
, alloc_frm_prev = alloc_frm_curr ac
}
let m' = m { mem_alloc = ac' }
return (m', alloc_frm_curr ac')
isNewRoot :: Allocation -> Bool
isNewRoot an = isPVarAddr (alloc_addr an) && not (BS.null (alloc_name an))
takeWrites :: TVar Writes -> STM Writes
takeWrites tv = do
wb <- readTVar tv
writeTVar tv (Writes Map.empty [])
return wb
seralizeWrites :: WriteLog -> IO WriteBatch
seralizeWrites = Map.traverseWithKey (const writeTxW)
writeTxW :: TxW -> IO WriteCell
writeTxW (TxW pv v) =
runVPutIO (pvar_space pv) (pvar_write pv v) >>= \ ((), _data, _deps) ->
return (_data,_deps)
fnWriteAlloc :: Allocation -> WriteCell
fnWriteAlloc an = (alloc_data an, alloc_deps an)
syncSignal :: MVar () -> IO ()
syncSignal mv = void (tryPutMVar mv ())
writeSecondaryIndexes :: VSpace -> MDB_txn -> Address -> UpdSeek -> IO ()
writeSecondaryIndexes vc txn allocInit updSeek =
if (Map.null updSeek) then return () else
alloca $ \ pAddr -> do
let vAddr = MDB_val { mv_data = castPtr pAddr, mv_size = fromIntegral addrSize }
croot <- mdb_cursor_open' txn (vcache_db_vroots vc)
chash <- mdb_cursor_open' txn (vcache_db_caddrs vc)
let recordRoot vKey addr = do
when (addr < allocInit) (fail "VCache bug: attempt to delete named root")
let flags = compileWriteFlags [MDB_NOOVERWRITE]
bOK <- mdb_cursor_put' flags croot vKey vAddr
unless bOK (fail "VCache bug: attempt to overwrite named root")
let insertHash vKey addr = do
let flags = compileWriteFlags [MDB_NODUPDATA]
bOK <- mdb_cursor_put' flags chash vKey vAddr
unless bOK (addrBug addr "VRef hash recorded twice")
let deleteHash vKey addr =
alloca $ \ pvKey ->
alloca $ \ pvAddr -> do
poke pvKey vKey
poke pvAddr vAddr
bExist <- mdb_cursor_get' MDB_GET_BOTH chash pvKey pvAddr
unless bExist (addrBug addr "VRef hash not found for deletion")
let flags = compileWriteFlags []
mdb_cursor_del' flags chash
let processName (name, addrs) =
withByteStringVal name $ \ vKey ->
forM_ addrs $ \ addr ->
poke pAddr addr >>
if isPVarAddr addr then recordRoot vKey addr else
if addr < allocInit then deleteHash vKey addr else
insertHash vKey addr
mapM_ processName (Map.toAscList updSeek)
mdb_cursor_close' chash
mdb_cursor_close' croot
return ()
updateReferenceCounts :: VSpace -> MDB_txn -> Address -> RefctDiff -> IO ()
updateReferenceCounts vc txn allocInit rcDiffMap =
if Map.null rcDiffMap then return () else
alloca $ \ pAddr ->
allocaBytes 16 $ \ pRefctBuff ->
alloca $ \ pvAddr ->
alloca $ \ pvData -> do
let vAddr = MDB_val { mv_data = castPtr pAddr, mv_size = fromIntegral addrSize }
let vZero = MDB_val { mv_data = nullPtr, mv_size = 0 }
poke pvAddr vAddr
wrc <- mdb_cursor_open' txn (vcache_db_refcts vc)
wc0 <- mdb_cursor_open' txn (vcache_db_refct0 vc)
let newEphemeron addr = do
let flags = compileWriteFlags [MDB_APPEND]
bOK <- mdb_cursor_put' flags wc0 vAddr vZero
unless bOK (addrBug addr "refct0 could not be appended")
let newAllocation addr rc =
if (0 == rc) then newEphemeron addr else do
unless (rc > 0) (addrBug addr "allocation with negative refct")
vRefct <- writeRefctBytes pRefctBuff rc
let flags = compileWriteFlags [MDB_APPEND]
bOK <- mdb_cursor_put' flags wrc vAddr vRefct
unless bOK (addrBug addr "refct could not be appended")
let updateFromZero addr rc = do
bZeroFound <- mdb_cursor_get' MDB_SET wc0 pvAddr pvData
unless bZeroFound (addrBug addr "has undefined refct")
unless (rc > 0) (addrBug addr "update refct0 to negative refct")
let df = compileWriteFlags []
mdb_cursor_del' df wc0
vRefct <- writeRefctBytes pRefctBuff rc
let wf = compileWriteFlags [MDB_NOOVERWRITE]
bOK <- mdb_cursor_put' wf wrc vAddr vRefct
unless bOK (addrBug addr "could not update refct from zero")
let deleteZero addr = do
bFoundZero <- mdb_cursor_get' MDB_SET wc0 pvAddr pvData
unless bFoundZero (addrBug addr "refct0 not found for deletion")
let df = compileWriteFlags []
mdb_cursor_del' df wc0
let updateRefct (addr,rcDiff) =
poke pAddr addr >>
if (addr >= allocInit) then newAllocation addr rcDiff else
if (minBound == rcDiff) then deleteZero addr else
if (0 == rcDiff) then return () else
mdb_cursor_get' MDB_SET wrc pvAddr pvData >>= \ bHasRefct ->
if (not bHasRefct) then updateFromZero addr rcDiff else
peek pvData >>= readRefctBytes >>= \ rcOld ->
assert (rcOld > 0) $
let rc = rcOld + rcDiff in
if (rc < 0) then addrBug addr "positive to negative refct" else
if (0 == rc)
then do let df = compileWriteFlags []
mdb_cursor_del' df wrc
let wf0 = compileWriteFlags [MDB_NOOVERWRITE]
bOK <- mdb_cursor_put' wf0 wc0 vAddr vZero
unless bOK (addrBug addr "has both refct0 and refct")
else do vRefct <- writeRefctBytes pRefctBuff rc
let ucf = compileWriteFlags [MDB_CURRENT]
bOK <- mdb_cursor_put' ucf wrc vAddr vRefct
unless bOK (addrBug addr "could not update refct")
mapM_ updateRefct (Map.toAscList rcDiffMap)
mdb_cursor_close' wc0
mdb_cursor_close' wrc
return ()
addrBug :: Address -> String -> IO a
addrBug addr msg = fail $ "VCache bug: address " ++ show addr ++ " " ++ msg
data UpdateNotes = UpdateNotes !RefctDiff !UpdSeek
emptyNotes :: UpdateNotes
emptyNotes = UpdateNotes Map.empty Map.empty
updateVirtualMemory :: VSpace -> MDB_txn -> Address -> WriteBatch -> IO UpdateNotes
updateVirtualMemory vc txn allocStart fb =
if Map.null fb then return emptyNotes else
alloca $ \ pAddr ->
alloca $ \ pvAddr ->
alloca $ \ pvOldData -> do
let vAddr = MDB_val { mv_data = castPtr pAddr, mv_size = fromIntegral addrSize }
poke pvAddr vAddr
cmem <- mdb_cursor_open' txn (vcache_db_memory vc)
let create udn addr bytes =
withByteStringVal bytes $ \ vData -> do
let cf = compileWriteFlags [MDB_APPEND]
bOK <- mdb_cursor_put' cf cmem vAddr vData
unless bOK (addrBug addr "created out of order")
return udn
let update (UpdateNotes rcs hs) addr bytes =
withByteStringVal bytes $ \ vData -> do
unless (isPVarAddr addr) (addrBug addr "VRef cannot be updated")
bExists <- mdb_cursor_get' MDB_SET cmem pvAddr pvOldData
unless bExists (addrBug addr "undefined on update")
oldDeps <- readDataDeps vc addr =<< peek pvOldData
let rcs' = addRefcts oldDeps rcs
let uf = compileWriteFlags [MDB_CURRENT]
bOK <- mdb_cursor_put' uf cmem vAddr vData
unless bOK (addrBug addr "could not updated")
return (UpdateNotes rcs' hs)
let delete (UpdateNotes rcs hs) addr = do
bExists <- mdb_cursor_get' MDB_SET cmem pvAddr pvOldData
unless bExists (addrBug addr "undefined on delete")
vOldData <- peek pvOldData
hs' <- if not (isVRefAddr addr) then return hs else
hashVal vOldData >>= \ h ->
return (addHash h addr hs)
oldDeps <- readDataDeps vc addr vOldData
let rcs' = addRefcts oldDeps rcs
let df = compileWriteFlags []
mdb_cursor_del' df cmem
return (UpdateNotes rcs' hs')
let processCell rcs (addr, (bytes, deps')) =
poke pAddr addr >>
if (BS.null bytes) then assert (L.null deps') $ delete rcs addr else
if (addr >= allocStart) then create rcs addr bytes else
update rcs addr bytes
(UpdateNotes rcOld delSeek) <- foldM processCell emptyNotes (Map.toAscList fb)
mdb_cursor_close' cmem
assertValidOldDeps allocStart rcOld
let rcNew = Map.foldr' (addRefcts . fmap putChildAddr . snd) Map.empty fb
let rcDiff = Map.unionWith () rcNew rcOld
return (UpdateNotes rcDiff delSeek)
addHash :: ByteString -> Address -> UpdSeek -> UpdSeek
addHash h addr = Map.alter f h where
f = Just . (addr:) . maybe [] id
addRefcts :: [Address] -> RefctDiff -> RefctDiff
addRefcts = flip (L.foldl' altr) where
altr rc addr = Map.alter (Just . maybe 1 (+ 1)) addr rc
assertValidOldDeps :: Address -> RefctDiff -> IO ()
assertValidOldDeps allocStart rcDepsOld =
case Map.maxViewWithKey rcDepsOld of
Nothing -> return ()
Just ((maxOldDep,_), _) ->
unless (maxOldDep < allocStart) (fail "VCache bug: time traveling allocator")
readDataDeps :: VSpace -> Address -> MDB_val -> IO [Address]
readDataDeps vc addr vData = _vget vgetInit state0 >>= toDeps where
toDeps (VGetR () sf) = return (vget_children sf)
toDeps (VGetE eMsg) = addrBug addr $ "contains malformed data: " ++ eMsg
state0 = VGetS
{ vget_children = []
, vget_target = mv_data vData
, vget_limit = mv_data vData `plusPtr` fromIntegral (mv_size vData)
, vget_space = vc
}
runGarbageCollector :: VSpace -> MDB_txn -> Int -> IO GCBatch
runGarbageCollector vc txn gcLimit = do
gcb0 <- gcCandidates vc txn gcLimit
gcb <- gcSelectFrame vc gcb0
gcClearFrame vc txn gcb
return gcb
gcCandidates :: VSpace -> MDB_txn -> Int -> IO GCBatch
gcCandidates vc txn gcLimit =
alloca $ \ pvAddr -> do
c0 <- mdb_cursor_open' txn (vcache_db_refct0 vc)
let loop !n !b !gcb =
if (not b) then restartGC vc >> return gcb else
(peek pvAddr >>= peekAddr) >>= \ addr ->
let gcb' = Map.insert addr gcCell gcb in
if (0 == n) then continueGC vc addr >> return gcb' else
mdb_cursor_get' MDB_NEXT c0 pvAddr nullPtr >>= \ b' ->
loop (n1) b' gcb'
let initC0 =
readIORef (vcache_gc_start vc) >>= \ mbContinue ->
case mbContinue of
Nothing -> mdb_cursor_get' MDB_FIRST c0 pvAddr nullPtr
Just addr -> alloca $ \ pAddr -> do
let vAddr = MDB_val { mv_data = castPtr pAddr
, mv_size = fromIntegral $ sizeOf addr }
poke pAddr (1 + addr)
poke pvAddr vAddr
mdb_cursor_get' MDB_SET_RANGE c0 pvAddr nullPtr
b0 <- initC0
gcb <- loop (gcLimit 1) b0 Map.empty
mdb_cursor_close' c0
return gcb
gcSelectFrame :: VSpace -> GCBatch -> IO GCBatch
gcSelectFrame vc gcb =
modifyMVarMasked (vcache_memory vc) $ \ m -> do
let gcb' = (((gcb `Map.difference` mem_evrefs m)
`Map.difference` mem_cvrefs m)
`Map.difference` mem_pvars m)
let gc' = GC { gc_frm_curr = GCFrame gcb'
, gc_frm_prev = gc_frm_curr (mem_gc m) }
let m' = m { mem_gc = gc' }
return (m', gcb')
gcClearFrame :: VSpace -> MDB_txn -> GCBatch -> IO ()
gcClearFrame vc txn gcb =
alloca $ \ pAddr ->
alloca $ \ pvAddr -> do
let vAddr = MDB_val { mv_data = castPtr pAddr, mv_size = fromIntegral addrSize }
poke pvAddr vAddr
c0 <- mdb_cursor_open' txn (vcache_db_refct0 vc)
let clearAddr addr = do
poke pAddr addr
bFound <- mdb_cursor_get' MDB_SET c0 pvAddr nullPtr
unless bFound (addrBug addr "not found for GC")
let flags = compileWriteFlags []
mdb_cursor_del' flags c0
mapM_ clearAddr (Map.keys gcb)
mdb_cursor_close' c0
return ()
restartGC :: VSpace -> IO ()
restartGC vc = writeIORef (vcache_gc_start vc) Nothing
continueGC :: VSpace -> Address -> IO ()
continueGC vc !addr = writeIORef (vcache_gc_start vc) (Just addr)
gcCell :: WriteCell
gcCell = (BS.empty, [])
peekAddr :: MDB_val -> IO Address
peekAddr v =
let expectedSize = fromIntegral addrSize in
let bBadSize = expectedSize /= mv_size v in
if bBadSize then fail "VCache bug: badly formed address" else
peekAligned (castPtr (mv_data v))