{-# LANGUAGE ScopedTypeVariables, Safe #-} -- | A brief note on the locking strategy: -- -- A Columbia data file is covered by three locks: a dual lock (comprising two locks) and a writer lock; -- the lock hierarchy goes in that order. The dual lock is taken in a shared mode during -- all reading. The only circumstance in which the dual lock is taken -- in exclusive mode is to execute the write phase of a garbage collection. -- -- The writer lock protects the action of writing to the end of the file and also -- to the root node offset; readers take the writer lock only during reading the root node -- offset; the writer lock is always taken in exclusive mode. -- -- The high level view is: you can always run the collector in parallel with any other -- operation you are doing; you can read freely while using the collector; writes to the -- file will be held up until the collector finishes. module Data.Columbia.Gc (markAndSweepGc) where import Control.Monad.Reader import Control.Monad import Control.Monad.Loops import Control.Exception import System.IO import System.IO.Error import System.Directory import System.FileLock import Data.Char import Data.Word import Data.Bits import Data.IORef import Data.Columbia.RWInstances import Data.Columbia.Integral import Data.Columbia.Headers import Data.Columbia.DualLock import Data.Columbia.SeekableStream import Data.Columbia.SeekableWriter import Data.Columbia.Mapper {-guardedGetChar handle = catchError(getWord8 handle) (\ex -> if isEOFError ex then return 0 else throwError ex)-} setBitAt :: (Monad m) => SeekableWriter m Word8 -> Word32 -> m() setBitAt writer n = runReaderT(do let (dv, md) = divMod n 8 seekWriter dv m <- liftM(maybe 0 id)$consumeTokenW seekWriter dv let m' = setBit m(fromIntegral md) putToken m') writer getBitAt :: (Monad m) => SeekableWriter m Word8 -> Word32 -> m(Maybe Bool) getBitAt writer n = runReaderT(do let (dv, md) = divMod n 8 seekWriter dv liftM(liftM(`testBit` fromIntegral md)) consumeTokenW) writer getBitAt' :: (Monad m) => SeekableWriter m Word8 -> Word32 -> m Bool getBitAt' writer n = liftM(maybe False id)$getBitAt writer n _mark :: (Monad m) => SeekableWriter m Word8 -> Word32 -> ReaderT(SeekableStream m Word8) m () _mark tmpwriter addr = do bool <- lift$getBitAt' tmpwriter addr unless bool$do seek addr hdr <- readHeader -- Have to determine how many bytes to mark and how many fields to recurse on. -- In the case of primitive types it is zero fields. (nFields, nBytes) <- nFieldsBytes hdr -- Mark fields lift$mapM_(setBitAt tmpwriter) [fromIntegral addr..fromIntegral addr+fromIntegral nBytes-1] -- Read addresses and recurse on them. addrs <- sequence(replicate nFields readIntegral) mapM_(_mark tmpwriter) addrs mark :: FilePath -> SeekableStream IO Word8 -> IO(FilePath,SeekableWriter IO Word8,IORef(Word32,Word32),Table) mark path stream = do dir <- getTemporaryDirectory (tmppath, tmphandle) <- openBinaryTempFile dir "mark" hClose tmphandle ref <- newIORef$!(0,0) table <- newTable tmppath let tmpwriter = makeIoWriter ref table undefined -- Mark root node. mapM_(setBitAt tmpwriter) [0..3] runReaderT(do -- Read root node. addr <- readIntegral -- ...and proceed to recurse on the structure of the data. _mark tmpwriter addr) stream return$!(tmppath,tmpwriter,ref,table) {-# INLINE forLoop #-} forLoop :: (MonadIO m) => IORef Word32 -> (Word32 -> m Bool) -> m Word32 forLoop ref f = do whileM_(liftIO(readIORef ref)>>=f)$liftIO$modifyIORef' ref succ liftIO$readIORef ref untilEOF :: (Monad m) => ReaderT(SeekableWriter m Word8) m t -> ReaderT(SeekableWriter m Word8) m Bool untilEOF m = do -- Save writer position; determine file size. x <- getWriterPosition seekWriterAtEnd n <- getWriterPosition seekWriter x whileM_ (do x2 <- getWriterPosition return$!n/=x2) m x2 <- getWriterPosition return$!n==x2 untilEOF' :: (Monad m) => ReaderT(SeekableStream m Word8) m t -> ReaderT(SeekableStream m Word8) m () untilEOF' m = do x <- getPosition seekAtEnd n <- getPosition seek x whileM_ (do x2 <- getPosition return$!n/=x2) m concludeFileWrite ref table@(Table path _) = do (_,sz) <- readIORef ref unmapAll table bracket (openBinaryFile path ReadWriteMode) hClose (`hSetFileSize` toInteger sz) -- Run-length compression of the bit field defined in the mark.tmp file. compress :: SeekableWriter IO Word8 -> IO((FilePath,SeekableWriter IO Word8),Word32) compress tmpwriter = do dir <- getTemporaryDirectory (tmppath2, tmphandle2) <- openBinaryTempFile dir "compressed" hClose tmphandle2 counter <- newIORef 0 newSize <- newIORef 0 ref <- newIORef$!(0,0) table <- newTable tmppath2 let tmpwriter2 = makeIoWriter ref table undefined m <- runReaderT(do -- Save writer position; determine file size. lift$_seekAtEnd$stream tmpwriter newFileSize <- lift$_getPosition$stream tmpwriter lift$_seek(stream tmpwriter) 0 whileM_ (lift$liftM(/=newFileSize) (_getPosition$stream tmpwriter)) (do m <- lift$readIORef counter sz <- lift$readIORef newSize lift$forLoop counter(liftM(maybe False id).getBitAt tmpwriter) n <- lift$readIORef counter writeIntegral m writeIntegral n writeIntegral sz lift$writeIORef newSize$!sz+n-m lift$forLoop counter(liftM(maybe False not).getBitAt tmpwriter) ) -- Stick on an extra piece that lets pointer fixups reconcile extra stuff. m <- lift$readIORef counter sz <- lift$readIORef newSize writeIntegral m writeIntegral(maxBound :: Word32) writeIntegral sz return m) tmpwriter2 return$!((tmppath2,tmpwriter2),m) -- N.B. That this only works if 1) ranges are disjoint, or 2) to <= fromStart. copyRange :: (Monad m) => SeekableWriter m Word8 -> Word32 -> Word32 -> Word32 -> m() copyRange writer to fromStart fromEnd = runReaderT( mapM_ (\ii -> do seekWriter ii ch <- liftM(maybe(error"end of file") id) consumeTokenW seekWriter$!ii-fromStart+to putToken ch) [fromStart..fromEnd-1]) writer sweep :: SeekableWriter IO Word8 -> IORef(Word32,Word32) -> Table -> SeekableStream IO Word8 -> IO() sweep writer newref newtable@(Table newpath _) tmpstream2 = do newSize <- newIORef 0 runReaderT(do seekAtEnd compressSz <- getPosition seek 0 whileM_ (liftM(/=compressSz-12) getPosition) (do st <- readIntegral end <- readIntegral sz <- readIntegral lift$copyRange writer sz st end lift$writeIORef newSize$!sz+end-st) ) tmpstream2 unmapAll newtable -- Once the file is all sweeped have to find the end of the file -- and truncate it. (The actual truncation is postponed until the end.) n <- readIORef newSize writeIORef newref$!(0,n) _fixUpPointers :: (Monad m) => Word32 -> Word32 -> Word32 -> ReaderT(SeekableStream m Word8) m Word32 _fixUpPointers start end addr2 = do let half = (end-start)`quot`2+start seek$12*half st <- readIntegral en <- readIntegral if addr2 < st then _fixUpPointers start half addr2 else if addr2 >= en then _fixUpPointers half end addr2 else liftM(\m -> m+addr2-st) readIntegral local' f x = ask>>=lift.runReaderT x.f -- Read the address at 'n' and use the information in compressed.tmp to find -- the corresponding fixed up address. Return the old address to be traversed -- further. _fixUpPointers2 :: (Monad m) => SeekableStream m Word8 -> Word32 -> Word32 -> ReaderT(SeekableWriter m Word8) m Word32 _fixUpPointers2 tmpstream2 len addr = do seekWriter addr addr2 <- local' stream$readIntegral n <- lift$runReaderT(_fixUpPointers 0 len addr2) tmpstream2 seekWriter addr writeIntegral n return addr2 _fixUpPointers3 tmpwriter tmpstream2 len addr = do b <- lift$getBitAt' tmpwriter addr unless(b||addr==0)$do --Account for possible zero addresses. seekWriter addr hdr <- local' stream readHeader (nFields, nBytes) <- local' stream(nFieldsBytes hdr) n <- getWriterPosition -- Mark fields to prevent following addresses twice lift$mapM_(setBitAt tmpwriter) [addr..addr+fromIntegral nBytes-1] -- Adjust record's pointers addrs <- mapM(_fixUpPointers2 tmpstream2 len) [n,n+4..n+4*fromIntegral nFields-4] -- Recurse on the structure of the data. mapM_(_fixUpPointers3 tmpwriter tmpstream2 len) addrs _fixUpPointers4 :: (MonadIO m) => SeekableStream m Word8 -> Word32 -> [IORef Word32] -> m() _fixUpPointers4 tmpstream2 len moreAddresses = mapM_(\addrRef -> do addr2 <- liftIO$readIORef addrRef n <- runReaderT(_fixUpPointers 0 len addr2) tmpstream2 liftIO$writeIORef addrRef n ) moreAddresses void' m = liftM(const()) m _fixUpPointers5 :: (Monad m) => SeekableWriter m Word8 -> SeekableStream m Word8 -> Word32 -> Word32->ReaderT(SeekableWriter m Word8) m () _fixUpPointers5 tmpwriter tmpstream2 len addr = do seekWriterAtEnd x <- getWriterPosition seekWriter addr -- Going to scan the differential segment and mop up its addresses. void'$untilEOF(do -- Scan over zeroes. whileM_ (liftM(maybe False(==0)) consumeTokenW) (return()) n <- getWriterPosition unless(x==n)$do relSeekWriter(-1) n <- getWriterPosition hdr <- local' stream readHeader m <- getWriterPosition (nFields, nBytes) <- local' stream$nFieldsBytes hdr mapM_(_fixUpPointers2 tmpstream2 len) [m,m+4..m+4*fromIntegral nFields-4] seekWriter$n+fromIntegral nBytes ) fixUpPointers writer tmpwriter tmpstream2 moreAddresses = do len <- liftM(`quot`12)$runReaderT (seekAtEnd>>getPosition) tmpstream2 runReaderT (do seekWriter 0 addr <- local' stream$readIntegral _fixUpPointers3 tmpwriter tmpstream2 len addr ) writer _fixUpPointers4 tmpstream2 len moreAddresses return len -- | Garbage collection for files. It is a good idea when using garbage collection, never -- to be holding addresses into a file when releasing that file's lock, as at that point the -- garbage collector may move the data referenced. There is also a pattern, where only -- one thread runs the garbage collector; in that case, the collector thread may hold addresses -- and have these updated when the garbage collector moves data. These are currently the only -- safe patterns for using addresses with the collector: -- -- * Multi-thread GC with use of addresses protected by locks. -- -- * Single-thread GC with use of addresses unprotected by locks /solely in the GC thread/ -- and protected use of addresses in other threads. -- -- Also note that appropriate locks are applied automatically when you use the wrappers in -- Data.Columbia.Utils, but must be applied manually in the same manner when not using those -- patterns. markAndSweepGc :: FilePath -> [IORef Word32] -> IO() markAndSweepGc path moreAddresses = do dir <- getAppUserDataDirectory "tmp" createDirectoryIfMissing True dir (newpath,newhandle) <- openBinaryTempFile dir "new" hClose newhandle l <- dualLockShared path withFileLock(path++".lock.writer") Exclusive$ \_->catch ( -- A copy is made for robustness. copyFile path newpath) (\(ex::SomeException)->throwIO ex) (newhandle,(tmppath,tmpwriter,_,_),((tmppath2,tmpwriter2),newFileSize),len,writer,ref,table) <- catch (do putStrLn"Mark phase" newhandle <- openBinaryFile newpath ReadWriteMode hClose newhandle table <- newTable newpath sz <- fileSizeShim table ref <- newIORef$!(0,sz) let writer = makeIoWriter ref table undefined tmp@(tmppath,tmpwriter,tmpref,tmptable) <- mark newpath(stream writer) mapM_(\ref -> readIORef ref>>=(`runReaderT` stream writer)._mark tmpwriter) moreAddresses putStrLn"Compress phase" tmp2@((tmppath2,tmpwriter2),_) <- compress tmpwriter unmapAll tmptable bracket (openFile tmppath ReadWriteMode) hClose (`hSetFileSize` 0) writeIORef tmpref$!(0,0) putStrLn"Fixup phase" len <- fixUpPointers writer tmpwriter(stream tmpwriter2) moreAddresses putStrLn"Sweep phase" sweep writer ref table(stream tmpwriter2) return$!(newhandle, tmp, tmp2, len, writer, ref, table)) (\(ex::SomeException)->unlockShared l>>throwIO ex) l2 <- switchLocks l withFileLock(path++".lock.writer") Exclusive$ \_->finally (do -- Reconcile the differential segment (the part that got written while the -- lion's share of the GC [in terms of time] was going on). putStrLn"Reconcile phase" oldtable <- newTable path oldsz <- fileSizeShim oldtable oldref <- newIORef$!(0,oldsz) let oldstream = makeIoStream oldref oldtable undefined n <- runReaderT (do n <- lift$runReaderT(seekWriterAtEnd>>getWriterPosition) writer seek newFileSize untilEOF' (lift$_consumeToken oldstream>>=maybe(return()) (_putToken writer)) return n) oldstream putStrLn$"newFileSize: "++show newFileSize++"; n: "++show n runReaderT(_fixUpPointers5 tmpwriter(stream tmpwriter2) len n) writer -- Don't forget the root block pointer in case it changed. addr :: Word32 <- runReaderT(seek 0>>readIntegral) oldstream runReaderT(do seekWriter 0 writeIntegral addr _fixUpPointers2(stream tmpwriter2) len 0) writer concludeFileWrite ref table -- Atomically swap the newly swept file in place of the old one. renameFile newpath path) (unlock l2)