{-# LANGUAGE ScopedTypeVariables, Safe #-}

-- | A brief note on the locking strategy:
--
--   A Columbia data file is covered by three locks: a dual lock (comprising two locks) and a writer lock;
--   the lock hierarchy goes in that order. The dual lock is taken in a shared mode during
--   all reading. The only circumstance in which the dual lock is taken
--   in exclusive mode is to execute the write phase of a garbage collection.
--
--   The writer lock protects the action of writing to the end of the file and also
--   to the root node offset; readers take the writer lock only during reading the root node
--   offset; the writer lock is always taken in exclusive mode.
--
--   The high level view is: you can always run the collector in parallel with any other
--   operation you are doing; you can read freely while using the collector; writes to the
--   file will be held up until the collector finishes.
module Data.Columbia.Gc (markAndSweepGc) where

import Control.Monad.Reader
import Control.Monad
import Control.Monad.Loops
import Control.Exception
import System.IO
import System.IO.Error	
import System.Directory
import System.FileLock
import Data.Char
import Data.Word
import Data.Bits
import Data.IORef
import Data.Columbia.RWInstances
import Data.Columbia.Integral
import Data.Columbia.Headers
import Data.Columbia.DualLock
import Data.Columbia.SeekableStream
import Data.Columbia.SeekableWriter
import Data.Columbia.Mapper

{-guardedGetChar handle =
	catchError(getWord8 handle)
		(\ex -> if isEOFError ex then
				return 0
			else throwError ex)-}

setBitAt :: (Monad m) => SeekableWriter m Word8 -> Word32 -> m()
setBitAt writer n = runReaderT(do
	let (dv, md) = divMod n 8
	seekWriter dv
	m <- liftM(maybe 0 id)$consumeTokenW
	seekWriter dv
	let m' = setBit m(fromIntegral md)
	putToken m')
	writer
getBitAt :: (Monad m) => SeekableWriter m Word8 -> Word32 -> m(Maybe Bool)
getBitAt writer n = runReaderT(do
	let (dv, md) = divMod n 8
	seekWriter dv
	liftM(liftM(`testBit` fromIntegral md)) consumeTokenW)
	writer
getBitAt' :: (Monad m) => SeekableWriter m Word8 -> Word32 -> m Bool
getBitAt' writer n = liftM(maybe False id)$getBitAt writer n

_mark :: (Monad m) => SeekableWriter m Word8 -> Word32 -> ReaderT(SeekableStream m Word8) m ()
_mark tmpwriter addr = do
	bool <- lift$getBitAt' tmpwriter addr
	unless bool$do
		seek addr
		hdr <- readHeader
		-- Have to determine how many bytes to mark and how many fields to recurse on.
		-- In the case of primitive types it is zero fields.
		(nFields, nBytes) <- nFieldsBytes hdr
		-- Mark fields
		lift$mapM_(setBitAt tmpwriter) [fromIntegral addr..fromIntegral addr+fromIntegral nBytes-1]
		-- Read addresses and recurse on them.
		addrs <- sequence(replicate nFields readIntegral)
		mapM_(_mark tmpwriter) addrs

mark :: FilePath -> SeekableStream IO Word8 -> IO(FilePath,SeekableWriter IO Word8,IORef(Word32,Word32),Table)
mark path stream = do
	dir <- getTemporaryDirectory
	(tmppath, tmphandle) <- openBinaryTempFile dir "mark"
	hClose tmphandle
	ref <- newIORef$!(0,0)
	table <- newTable tmppath
	let tmpwriter = makeIoWriter ref table undefined
	-- Mark root node.
	mapM_(setBitAt tmpwriter) [0..3]
	runReaderT(do
		-- Read root node.
		addr <- readIntegral
		-- ...and proceed to recurse on the structure of the data.
		_mark tmpwriter addr)
		stream
	return$!(tmppath,tmpwriter,ref,table)

{-# INLINE forLoop #-}
forLoop :: (MonadIO m) => IORef Word32 -> (Word32 -> m Bool) -> m Word32
forLoop ref f = do
	whileM_(liftIO(readIORef ref)>>=f)$liftIO$modifyIORef' ref succ
	liftIO$readIORef ref

untilEOF :: (Monad m)
	=> ReaderT(SeekableWriter m Word8) m t
	-> ReaderT(SeekableWriter m Word8) m Bool
untilEOF m = do
	-- Save writer position; determine file size.
	x <- getWriterPosition
	seekWriterAtEnd
	n <- getWriterPosition
	seekWriter x
	whileM_
		(do
		x2 <- getWriterPosition
		return$!n/=x2)
		m
	x2 <- getWriterPosition
	return$!n==x2

untilEOF' :: (Monad m)
	=> ReaderT(SeekableStream m Word8) m t
	-> ReaderT(SeekableStream m Word8) m ()
untilEOF' m = do
	x <- getPosition
	seekAtEnd
	n <- getPosition
	seek x
	whileM_
		(do
		x2 <- getPosition
		return$!n/=x2)
		m

-- Run-length compression of the bit field defined in the mark.tmp file.
compress :: SeekableWriter IO Word8 -> IO((FilePath,SeekableWriter IO Word8),Word32)	
compress tmpwriter = do
	dir <- getTemporaryDirectory
	(tmppath2, tmphandle2) <- openBinaryTempFile dir "compressed"
	hClose tmphandle2
	counter <- newIORef 0
	newSize <- newIORef 0
	ref <- newIORef$!(0,0)
	table <- newTable tmppath2
	let tmpwriter2 = makeIoWriter ref table undefined
	m <- runReaderT(do
		-- Save writer position; determine file size.
		lift$_seekAtEnd$stream tmpwriter
		newFileSize <- lift$_getPosition$stream tmpwriter
		lift$_seek(stream tmpwriter) 0
		whileM_
			(lift$liftM(/=newFileSize) (_getPosition$stream tmpwriter))
			(do
				m <- lift$readIORef counter
				sz <- lift$readIORef newSize
				lift$forLoop counter(liftM(maybe False id).getBitAt tmpwriter)
				n <- lift$readIORef counter
				writeIntegral m
				writeIntegral n
				writeIntegral sz
				lift$writeIORef newSize$!sz+n-m
				lift$forLoop counter(liftM(maybe False not).getBitAt tmpwriter)
			)
		-- Stick on an extra piece that lets pointer fixups reconcile extra stuff.
		m <- lift$readIORef counter
		sz <- lift$readIORef newSize
		writeIntegral m
		writeIntegral(maxBound :: Word32)
		writeIntegral sz
		return m)
		tmpwriter2

	return$!((tmppath2,tmpwriter2),m)

-- N.B. That this only works if 1) ranges are disjoint, or 2) to <= fromStart.
copyRange :: (Monad m) => SeekableWriter m Word8 -> Word32 -> Word32 -> Word32 -> m()
copyRange writer to fromStart fromEnd = runReaderT(
	mapM_
	(\ii -> do
	seekWriter ii
	ch <- liftM(maybe(error"end of file") id) consumeTokenW
	seekWriter$!ii-fromStart+to
	putToken ch)
	[fromStart..fromEnd-1])
	writer

sweep :: SeekableWriter IO Word8 -> IORef(Word32,Word32) -> Table -> SeekableStream IO Word8 -> IO()
sweep writer newref newtable@(Table newpath _) tmpstream2 = do
	newSize <- newIORef 0
	runReaderT(do
		seekAtEnd
		compressSz <- getPosition
		seek 0
		whileM_
			(liftM(/=compressSz-12) getPosition)
			(do
			st <- readIntegral
			end <- readIntegral
			sz <- readIntegral
			lift$copyRange writer sz st end
			lift$writeIORef newSize$!sz+end-st)
			)
		tmpstream2
	unmapAll newtable
	-- Once the file is all sweeped have to find the end of the file
	-- and truncate it. (The actual truncation is postponed until the end.)
	n <- readIORef newSize
	writeIORef newref$!(0,n)

_fixUpPointers :: (Monad m) => Word32 -> Word32 -> Word32 -> ReaderT(SeekableStream m Word8) m Word32
_fixUpPointers start end addr2 = do
	let half = (end-start)`quot`2+start
	seek$12*half
	st <- readIntegral
	en <- readIntegral
	if addr2 < st then
			_fixUpPointers start half addr2
		else if addr2 >= en then
			_fixUpPointers half end addr2
		else
			liftM(\m -> m+addr2-st) readIntegral

-- Read the address at 'n' and use the information in compressed.tmp to find
-- the corresponding fixed up address. Return the old address to be traversed
-- further.
_fixUpPointers2 :: (Monad m) => SeekableStream m Word8 -> Word32 -> Word32 -> ReaderT(SeekableWriter m Word8) m Word32
_fixUpPointers2 tmpstream2 len addr = do
	seekWriter addr
	addr2 <- local' stream$readIntegral
	n <- lift$runReaderT(_fixUpPointers 0 len addr2) tmpstream2
	seekWriter addr
	writeIntegral n
	return addr2

_fixUpPointers3 tmpwriter tmpstream2 len addr = do
	b <- lift$getBitAt' tmpwriter addr
	unless(b||addr==0)$do --Account for possible zero addresses.
		seekWriter addr
		hdr <- local' stream readHeader
		(nFields, nBytes) <- local' stream(nFieldsBytes hdr)
		n <- getWriterPosition
		-- Mark fields to prevent following addresses twice
		lift$mapM_(setBitAt tmpwriter) [addr..addr+fromIntegral nBytes-1]
		-- Adjust record's pointers
		addrs <- mapM(_fixUpPointers2 tmpstream2 len) [n,n+4..n+4*fromIntegral nFields-4]
		-- Recurse on the structure of the data.
		mapM_(_fixUpPointers3 tmpwriter tmpstream2 len) addrs

_fixUpPointers4 :: (MonadIO m) => SeekableStream m Word8 -> Word32 -> [IORef Word32] -> m()
_fixUpPointers4 tmpstream2 len moreAddresses =
	mapM_(\addrRef -> do
		addr2 <- liftIO$readIORef addrRef
		n <- runReaderT(_fixUpPointers 0 len addr2) tmpstream2
		liftIO$writeIORef addrRef n
		) moreAddresses

void' m = liftM(const()) m
_fixUpPointers5 :: (Monad m) => SeekableWriter m Word8 -> SeekableStream m Word8 -> Word32 -> Word32->ReaderT(SeekableWriter m Word8) m ()
_fixUpPointers5 tmpwriter tmpstream2 len addr = do
	seekWriterAtEnd
	x <- getWriterPosition
	seekWriter addr
	-- Going to scan the differential segment and mop up its addresses.
	void'$untilEOF(do
		-- Scan over zeroes.
		whileM_
			(liftM(maybe False(==0)) consumeTokenW)
			(return())
		n <- getWriterPosition
		unless(x==n)$do
			relSeekWriter(-1)
			n <- getWriterPosition
			hdr <- local' stream readHeader
			m <- getWriterPosition
			(nFields, nBytes) <- local' stream$nFieldsBytes hdr
			mapM_(_fixUpPointers2 tmpstream2 len) [m,m+4..m+4*fromIntegral nFields-4]
			seekWriter$n+fromIntegral nBytes
		)

fixUpPointers writer tmpwriter tmpstream2 moreAddresses = do
	len <- liftM(`quot`12)$runReaderT
		(seekAtEnd>>getPosition)
		tmpstream2
	runReaderT
		(do
		seekWriter 0
		addr <- local' stream$readIntegral
		_fixUpPointers3 tmpwriter tmpstream2 len addr
		)
		writer
	_fixUpPointers4 tmpstream2 len moreAddresses
	return len

-- | Garbage collection for files. It is a good idea when using garbage collection, never
--   to be holding addresses into a file when releasing that file's lock, as at that point the
--   garbage collector may move the data referenced. There is also a pattern, where only
--   one thread runs the garbage collector; in that case, the collector thread may hold addresses
--   and have these updated when the garbage collector moves data. These are currently the only
--   safe patterns for using addresses with the collector:
--
--   * Multi-thread GC with use of addresses protected by locks.
--
--   * Single-thread GC with use of addresses unprotected by locks /solely in the GC thread/
--     and protected use of addresses in other threads.
--
--  Also note that appropriate locks are applied automatically when you use the wrappers in
--  Data.Columbia.Utils, but must be applied manually in the same manner when not using those
--  patterns.
markAndSweepGc :: FilePath -> [IORef Word32] -> IO()
markAndSweepGc path moreAddresses = do
	dir <- getAppUserDataDirectory "tmp"
	createDirectoryIfMissing True dir
	(newpath,newhandle) <- openBinaryTempFile dir "new"
	hClose newhandle
	l <- dualLockShared path
	withFileLock(path++".lock.writer") Exclusive$ \_->catch
		(
		-- A copy is made for robustness.
		copyFile path newpath)
		(\(ex::SomeException)->throwIO ex)
	(newhandle,(tmppath,tmpwriter,_,_),((tmppath2,tmpwriter2),newFileSize),len,writer,ref,table) <- catch
		(do
		putStrLn"Mark phase"
		newhandle <- openBinaryFile newpath ReadWriteMode
		hClose newhandle
		table <- newTable newpath
		sz <- fileSizeShim table
		ref <- newIORef$!(0,sz)
		let writer = makeIoWriter ref table undefined
		tmp@(tmppath,tmpwriter,tmpref,tmptable) <- mark newpath(stream writer)
		mapM_(\ref -> readIORef ref>>=(`runReaderT` stream writer)._mark tmpwriter) moreAddresses
		putStrLn"Compress phase"
		tmp2@((tmppath2,tmpwriter2),_) <- compress tmpwriter
		unmapAll tmptable
		bracket
			(openFile tmppath ReadWriteMode)
			hClose
			(`hSetFileSize` 0)
		writeIORef tmpref$!(0,0)
		putStrLn"Fixup phase"
		len <- fixUpPointers writer tmpwriter(stream tmpwriter2) moreAddresses
		putStrLn"Sweep phase"
		sweep writer ref table(stream tmpwriter2)
		return$!(newhandle, tmp, tmp2, len, writer, ref, table))
		(\(ex::SomeException)->unlockShared l>>throwIO ex)
	l2 <- switchLocks l
	withFileLock(path++".lock.writer") Exclusive$ \_->finally
		(do
		-- Reconcile the differential segment (the part that got written while the
		-- lion's share of the GC [in terms of time] was going on).
		putStrLn"Reconcile phase"
		oldtable <- newTable path
		oldsz <- fileSizeShim oldtable
		oldref <- newIORef$!(0,oldsz)
		let oldstream = makeIoStream oldref oldtable undefined
		n <- runReaderT
			(do
			n <- lift$runReaderT(seekWriterAtEnd>>getWriterPosition) writer
			seek newFileSize
			untilEOF'
				(lift$_consumeToken oldstream>>=maybe(return()) (_putToken writer))
			return n)
			oldstream
		putStrLn$"newFileSize: "++show newFileSize++"; n: "++show n
		runReaderT(_fixUpPointers5 tmpwriter(stream tmpwriter2) len n) writer
		-- Don't forget the root block pointer in case it changed.
		addr :: Word32 <- runReaderT(seek 0>>readIntegral) oldstream
		runReaderT(do
			seekWriter 0
			writeIntegral addr
			_fixUpPointers2(stream tmpwriter2) len 0) writer
		concludeFileWrite ref table
		-- Atomically swap the newly swept file in place of the old one.
		renameFile newpath path)
		(unlock l2)