{-# LANGUAGE ScopedTypeVariables, Safe #-}

-- | A brief note on the locking strategy:
--
--   A Columbia data file is covered by three locks: a dual lock (comprising two locks) and a writer lock;
--   the lock hierarchy goes in that order. The dual lock is taken in a shared mode during
--   all reading. The only circumstance in which the dual lock is taken
--   in exclusive mode is to execute the write phase of a garbage collection.
--
--   The writer lock protects the action of writing to the end of the file and also
--   to the root node offset; readers take the writer lock only during reading the root node
--   offset; the writer lock is always taken in exclusive mode.
--
--   The high level view is: you can always run the collector in parallel with any other
--   operation you are doing; you can read and write freely while using the collector.
module Data.Columbia.Gc (markAndSweepGc) where

import Control.Monad.Reader
import Control.Monad
import Control.Monad.Loops
import Control.Exception
import System.IO
import System.IO.Error	
import System.Directory
import System.FileLock
import Data.Char
import Data.Word
import Data.Bits
import Data.IORef
import Data.Columbia.Types
import Data.Columbia.Internal.RWInstances
import Data.Columbia.Internal.IntegralTypes
import Data.Columbia.Internal.Headers
import Data.Columbia.Internal.DualLock
import Data.Columbia.Internal.SeekableStream
import Data.Columbia.Internal.SeekableWriter
import Data.Columbia.Internal.Mapper

{-guardedGetChar handle =
	catchError(getWord8 handle)
		(\ex -> if isEOFError ex then
				return 0
			else throwError ex)-}

setBitAt :: (Monad m) => SeekableWriter m Word8 -> Pointer -> m()
setBitAt writer n = runReaderT(do
	let (dv, md) = divMod n 8
	seekWriter dv
	m <- liftM(maybe 0 id)$consumeTokenW
	seekWriter dv
	let m' = setBit m(fromIntegral md)
	putToken m')
	writer
getBitAt :: (Monad m) => SeekableWriter m Word8 -> Pointer -> m(Maybe Bool)
getBitAt writer n = runReaderT(do
	let (dv, md) = divMod n 8
	seekWriter dv
	liftM(liftM(`testBit` fromIntegral md)) consumeTokenW)
	writer
getBitAt' :: (Monad m) => SeekableWriter m Word8 -> Pointer -> m Bool
getBitAt' writer n = liftM(maybe False id)$getBitAt writer n

_mark :: (Monad m) => SeekableWriter m Word8 -> Pointer -> ReaderT(SeekableStream m Word8) m ()
_mark tmpwriter addr = do
	bool <- lift$getBitAt' tmpwriter addr
	unless bool$do
		seek addr
		hdr <- readHeader
		-- Have to determine how many bytes to mark and how many fields to recurse on.
		-- In the case of primitive types it is zero fields.
		(nFields, nBytes) <- nFieldsBytes hdr
		-- Mark fields
		lift$mapM_(setBitAt tmpwriter) [fromIntegral addr..fromIntegral addr+fromIntegral nBytes-1]
		-- Read addresses and recurse on them.
		addrs <- sequence(replicate nFields readIntegral)
		mapM_(_mark tmpwriter) addrs

mark :: FilePath -> SeekableStream IO Word8 -> IO(SeekableWriter IO Word8,IO(),IORef(Pointer,Pointer),FilePath)
mark path stream = do
	dir <- getTemporaryDirectory
	(tmppath, tmphandle) <- openBinaryTempFile dir "mark"
	hClose tmphandle
	(tmpwriter,tmpconclude,tmpref) <- makeIoWriter' tmppath undefined False
	-- Mark root node.
	mapM_(setBitAt tmpwriter) [0..3]
	runReaderT(do
		-- Read root node.
		addr <- readIntegral
		-- ...and proceed to recurse on the structure of the data.
		_mark tmpwriter addr)
		stream
	return$!(tmpwriter,tmpconclude,tmpref,tmppath)

{-# INLINE forLoop #-}
forLoop :: (MonadIO m) => IORef Word32 -> (Word32 -> m Bool) -> m Word32
forLoop ref f = do
	whileM_(liftIO(readIORef ref)>>=f)$liftIO$modifyIORef' ref succ
	liftIO$readIORef ref

untilEOF :: (Monad m)
	=> ReaderT(SeekableWriter m Word8) m t
	-> ReaderT(SeekableWriter m Word8) m Bool
untilEOF m = do
	-- Save writer position; determine file size.
	x <- getWriterPosition
	seekWriterAtEnd
	n <- getWriterPosition
	seekWriter x
	whileM_
		(do
		x2 <- getWriterPosition
		return$!x2<n)
		m
	x2 <- getWriterPosition
	return$!n==x2

untilEOF' :: (MonadIO m)
	=> ReaderT(SeekableStream m Word8) m t
	-> ReaderT(SeekableStream m Word8) m ()
untilEOF' m = do
	x <- getPosition
	seekAtEnd
	n <- getPosition
	seek x
	whileM_
		(do
		x2 <- getPosition
		return$!x2<n)
		m

-- Run-length compression of the bit field defined in the mark.tmp file.
compress :: SeekableWriter IO Word8 -> IO(SeekableWriter IO Word8,IO(),Pointer)
compress tmpwriter = do
	dir <- getTemporaryDirectory
	(tmppath2, tmphandle2) <- openBinaryTempFile dir "compressed"
	hClose tmphandle2
	counter <- newIORef 0
	newSize <- newIORef 0
	(tmpwriter2,tmpconclude2,_) <- makeIoWriter' tmppath2 undefined False
	m <- runReaderT(do
		-- Save writer position; determine file size.
		lift$_seekAtEnd$stream tmpwriter
		newFileSize <- lift$_getPosition$stream tmpwriter
		lift$_seek(stream tmpwriter) 0
		whileM_
			(lift$liftM(/=newFileSize) (_getPosition$stream tmpwriter))
			(do
				m <- lift$readIORef counter
				sz <- lift$readIORef newSize
				lift$forLoop counter(liftM(maybe False id).getBitAt tmpwriter)
				n <- lift$readIORef counter
				writeIntegral m
				writeIntegral n
				writeIntegral sz
				lift$writeIORef newSize$!sz+n-m
				lift$forLoop counter(liftM(maybe False not).getBitAt tmpwriter)
			)
		-- Stick on an extra piece that lets pointer fixups reconcile extra stuff.
		m <- lift$readIORef counter
		sz <- lift$readIORef newSize
		writeIntegral m
		writeIntegral(maxBound :: Word32)
		writeIntegral sz
		return m)
		tmpwriter2

	return$!(tmpwriter2,tmpconclude2,m)

-- N.B. That this only works if 1) ranges are disjoint, or 2) to <= fromStart.
copyRange :: (Monad m) => SeekableWriter m Word8 -> Word32 -> Word32 -> Word32 -> m()
copyRange writer to fromStart fromEnd = runReaderT(
	mapM_
	(\ii -> do
	seekWriter ii
	ch <- liftM(maybe(error"end of file") id) consumeTokenW
	seekWriter$!ii-fromStart+to
	putToken ch)
	[fromStart..fromEnd-1])
	writer

sweep :: FilePath -> SeekableWriter IO Word8 -> IORef(Pointer,Pointer) -> SeekableStream IO Word8 -> IO()
sweep newpath writer ref tmpstream2 = do
	newSize <- newIORef 0
	runReaderT(do
		seekAtEnd
		compressSz <- getPosition
		seek 0
		whileM_
			(liftM(/=compressSz-12) getPosition)
			(do
			st <- readIntegral
			end <- readIntegral
			sz <- readIntegral
			lift$copyRange writer sz st end
			lift$writeIORef newSize$!sz+end-st)
		)
		tmpstream2	
	-- Once the file is all sweeped have to find the end of the file
	-- and truncate it.
	n <- readIORef newSize
	writeIORef ref$!(0,n)

_fixUpPointers :: (Monad m) => Word32 -> Word32 -> Word32 -> ReaderT(SeekableStream m Word8) m Word32
_fixUpPointers start end addr2 = do
	let half = (end-start)`quot`2+start
	seek$12*half
	st <- readIntegral
	en <- readIntegral
	if addr2 < st then
			_fixUpPointers start half addr2
		else if addr2 >= en then
			_fixUpPointers half end addr2
		else
			liftM(\m -> m+addr2-st) readIntegral

-- Read the address at 'n' and use the information in compressed.tmp to find
-- the corresponding fixed up address. Return the old address to be traversed
-- further.
_fixUpPointers2 :: (Monad m) => SeekableStream m Word8 -> Word32 -> Word32 -> ReaderT(SeekableWriter m Word8) m Word32
_fixUpPointers2 tmpstream2 len addr = do
	seekWriter addr
	addr2 <- local' stream$readIntegral
	n <- lift$runReaderT(_fixUpPointers 0 len addr2) tmpstream2
	seekWriter addr
	writeIntegral n
	return addr2

_fixUpPointers3 tmpwriter tmpstream2 len addr = do
	b <- lift$getBitAt' tmpwriter addr
	unless(b||addr==0)$do --Account for possible zero addresses.
		seekWriter addr
		hdr <- local' stream readHeader
		(nFields, nBytes) <- local' stream(nFieldsBytes hdr)
		n <- getWriterPosition
		-- Mark fields to prevent following addresses twice
		lift$mapM_(setBitAt tmpwriter) [addr..addr+fromIntegral nBytes-1]
		-- Adjust record's pointers
		addrs <- mapM(_fixUpPointers2 tmpstream2 len) [n,n+4..n+4*fromIntegral nFields-4]
		-- Recurse on the structure of the data.
		mapM_(_fixUpPointers3 tmpwriter tmpstream2 len) addrs

_fixUpPointers4 :: (MonadIO m) => SeekableStream m Word8 -> Word32 -> [IORef Word32] -> m()
_fixUpPointers4 tmpstream2 len moreAddresses =
	mapM_(\addrRef -> do
		addr2 <- liftIO$readIORef addrRef
		n <- runReaderT(_fixUpPointers 0 len addr2) tmpstream2
		liftIO$writeIORef addrRef n
		) moreAddresses

void' m = liftM(const()) m
_fixUpPointers5 :: (Monad m) => SeekableWriter m Word8 -> SeekableStream m Word8 -> Word32 -> Word32->ReaderT(SeekableWriter m Word8) m ()
_fixUpPointers5 tmpwriter tmpstream2 len addr = do
	seekWriterAtEnd
	x <- getWriterPosition
	seekWriter addr
	-- Going to scan the differential segment and mop up its addresses.
	void'$untilEOF(do
		-- Scan over zeroes.
		whileM_
			(liftM(maybe False(==0)) consumeTokenW)
			(return())
		n <- getWriterPosition
		unless(x==n)$do
			relSeekWriter(-1)
			n <- getWriterPosition
			hdr <- local' stream readHeader
			m <- getWriterPosition
			(nFields, nBytes) <- local' stream$nFieldsBytes hdr
			mapM_(_fixUpPointers2 tmpstream2 len) [m,m+4..m+4*fromIntegral nFields-4]
			seekWriter$n+fromIntegral nBytes
		)

fixUpPointers writer tmpwriter tmpstream2 moreAddresses = do
	len <- liftM(`quot`12)$runReaderT
		(seekAtEnd>>getPosition)
		tmpstream2
	runReaderT
		(do
		seekWriter 0
		addr <- local' stream$readIntegral
		_fixUpPointers3 tmpwriter tmpstream2 len addr
		)
		writer
	_fixUpPointers4 tmpstream2 len moreAddresses
	return len

{-# INLINE copyDifferentialRange #-}
copyDifferentialRange :: Pointer -> SeekableWriter IO Word8 -> SeekableStream IO Word8 -> IO Word32
copyDifferentialRange newFileSize writer oldstream =
	runReaderT
		(do
		n <- lift$runReaderT(seekWriterAtEnd>>getWriterPosition) writer
		seek newFileSize
		untilEOF'
			(lift$_consumeToken oldstream>>=maybe(return()) (_putToken writer))
		return n)
		oldstream

gcSection1 :: FilePath
	-> [IORef Pointer]
	-> IO((SeekableWriter IO Word8,IO(),FilePath),
		(SeekableWriter IO Word8,IO(),Pointer),
		Word32,
		SeekableWriter IO Word8,
		IO())
gcSection1 newpath moreAddresses = do
	-- Mark
	newhandle <- openBinaryFile newpath ReadWriteMode
	hClose newhandle
	(writer,conclude,ref) <- makeIoWriter' newpath undefined False
	(tmpwriter,tmpconclude,tmpref,tmppath) <- mark newpath(stream writer)
	mapM_(\ref -> readIORef ref>>=(`runReaderT` stream writer)._mark tmpwriter) moreAddresses

	-- Compress
	tmp2@(tmpwriter2,tmpconclude2,_) <- compress tmpwriter
	tmpconclude
	bracket
		(openFile tmppath ReadWriteMode)
		hClose
		(`hSetFileSize` 0)

	-- Fix
	-- Identifier 'tmpconclude' is rebound here.
	(tmpwriter,tmpconclude,_) <- makeIoWriter' tmppath undefined False
	len <- fixUpPointers writer tmpwriter(stream tmpwriter2) moreAddresses

	-- Sweep
	sweep newpath writer ref(stream tmpwriter2)
	return$!((tmpwriter,tmpconclude,tmppath), tmp2, len, writer, conclude)

gcSection2 :: FilePath
	-> FilePath
	-> ((SeekableWriter IO Word8,IO(),FilePath),
		(SeekableWriter IO Word8,IO(),Pointer),
		Word32,
		SeekableWriter IO Word8,
		IO())
	-> IO()
gcSection2 path newpath ((tmpwriter,tmpconclude,tmppath),(tmpwriter2,tmpconclude2,newFileSize),len,writer,conclude)
	= do
	-- Reconcile the differential segment.
	(oldwriter,oldconclude,_) <- makeIoWriter' path undefined True
	let oldstream = stream oldwriter
	n <- copyDifferentialRange newFileSize writer oldstream
	putStrLn$"newFileSize: "++show newFileSize++"; n: "++show n
	runReaderT(_fixUpPointers5 tmpwriter(stream tmpwriter2) len n) writer
	-- Update root block pointer.
	addr :: Word32 <- runReaderT(seek 0>>readIntegral) oldstream
	runReaderT(do
		seekWriter 0
		writeIntegral addr
		_fixUpPointers2(stream tmpwriter2) len 0) writer
	-- Close files using the thunks constructed by the respective calls to 'makeIoWriter''.
	oldconclude
	conclude
	tmpconclude
	tmpconclude2
	-- Atomically swap the newly swept file in place of the old one.
	renameFile newpath path

-- | Garbage collection for files. It is a good idea when using garbage collection, never
--   to be holding addresses into a file when releasing that file's lock, as at that point the
--   garbage collector may move the data referenced. There is also a pattern, where only
--   one thread runs the garbage collector; in that case, the collector thread may hold addresses
--   and have these updated when the garbage collector moves data.
markAndSweepGc :: FilePath -> [IORef Pointer] -> IO()
markAndSweepGc path moreAddresses = do
	dir <- getAppUserDataDirectory "tmp"
	createDirectoryIfMissing True dir
	(newpath,newhandle) <- openBinaryTempFile dir "new"
	hClose newhandle
	l <- dualLockShared path

	-- Copy file into a temporary.
	withFileLock(path++".lock.writer") Exclusive$ \_->catch
		(
		-- A copy is made for robustness.
		copyFile path newpath)
		(\(ex::SomeException)->throwIO ex)

	-- Carry out mark/compress/fix/sweep cycle on everything in the copy.
	-- Writers can write while this is going on (but can't change the root node address)>
	tuple <- catch(gcSection1 newpath moreAddresses)
		(\(ex::SomeException)->unlockShared l>>throwIO ex)

	l2 <- switchLocks l

	-- Carry out reconcile phase. This requires readers and writers to stop for a short time.
	withFileLock(path++".lock.writer") Exclusive$ \_->finally
		(gcSection2 path newpath tuple)
		(unlock l2)
-- This garbage collector includes a concept of /reconcile phase/. The idea here is to allow other processes
-- to continue writing data to a file while mark-and-sweep is continuing. It is called reconciling because
-- the contents of the live file have to be reconciled with the contents of the copy. This has to be done
-- without undue delay.
--
-- Reconciliation is accomplished by appending the data that changes differentially while the
-- garbage collector is running. This uses the fact that data is always appended to the end of the file.
-- Efficient operation is ensured by skipping over the mark-and-sweep phase for this data, and using the
-- old, out-of-date liveness graph information to adjust the pointers of the new data.