{-
Copyright 2010-2012 Cognimeta Inc.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
compliance with the License. You may obtain a copy of the License at

     http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is
distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
or implied. See the License for the specific language governing permissions and limitations under the License.
-}

{-# LANGUAGE TemplateHaskell, TypeFamilies, Rank2Types, GADTs, TupleSections, DeriveDataTypeable, GeneralizedNewtypeDeriving, ScopedTypeVariables, FlexibleContexts #-}

module Database.Perdure.LocalStoreFile (
    RawStoreFile(..),
    storeFileWriteWords,
    storeFileReadWords,
    LocalStoreFile,
    withFileStoreFile,
    withRawDeviceStoreFile,
    withRawDeviceStoreFiles,
    module Database.Perdure.StoreFile,
    narrowBufsLen,
    storeFileWrite1,
    storeFileRead1
) where


import Prelude ()
import Cgm.Prelude
import Data.Typeable
import qualified Data.ByteString as B
import Control.Concurrent
import Data.Word
import qualified Data.Map as Map
import Cgm.Data.Super
import Cgm.Data.Len
import Cgm.Data.Monoid
import Cgm.Data.NEList
import Cgm.Data.Either
import Cgm.System.Endian
import Cgm.Control.Concurrent.TThread
import Cgm.Control.Concurrent.Await
import Cgm.System.Mem.Alloc
import Database.Perdure.Validator
import System.IO
import System.Posix.Files
import System.Posix.IO
import System.Posix.Types
import Data.Bits
import Control.Monad.Error hiding (sequence_)
import Database.Perdure.StoreFile(SyncableStoreFile(..))
--import System.Posix.Fsync -- not needed with raw devices

class SyncableStoreFile f => RawStoreFile f where
  storeFileWriteRaw :: f -> Len Word8 Word64 -> [ArrayRange (PrimArray Pinned Word8)] -> Async Bool ()
  storeFileReadRaw :: f -> Len Word8 Word64 -> ArrayRange (STPrimArray RealWorld Pinned Word8) -> Async Bool ()

-- The passed endianness is the desired endianness of the words on the media. Here when the endianness does not
-- match the platform endianness, we do a copy. This case is not very optimized since we anticipate always writing in the
-- platform endianness.
storeFileWriteWords :: (Endian w, RawStoreFile f) => f -> Len Word8 Word64 -> Endianness -> [PrimArray Pinned w] -> Async Bool ()
storeFileWriteWords f addr e bufs = storeFileWriteRaw f addr $ fmap fullArrayRange $ fromWords bufs where
  fromWords = bool (fmap $ unsafePrimArrayCast . mapImmArray swapBytes) (fmap unsafePrimArrayCast) (e == platformWordEndianness)

-- The passed endianness must be the endianness of the words on the media
storeFileReadWords :: (LgMultiple w Word8, Endian w, RawStoreFile f) => 
                      f -> Len Word8 Word64 -> Endianness -> Len w Word32 -> Async (Maybe (PrimArray Pinned w)) ()
storeFileReadWords f addr e l k = do
    buf <- stToIO $ mkArray $ refineLen $ apply super <$> l
    ($ k) $ mapAsync (bool (return Nothing) $ Just <$> processBuf buf) $ storeFileReadRaw f addr (fullArrayRange buf) where
      processBuf buf = stToIO (toWords <$> unsafeFreezeSTPrimArray buf) -- TODO optimize by swapping bytes in-place then freezing
      toWords = bool (mapImmArray unswapBytes . unsafePrimArrayCast) unsafePrimArrayCast (e == platformWordEndianness)
  
---------------------------------------------------------------------------

-- | A file or raw device where we can persist bytes.
newtype LocalStoreFile = LocalStoreFile (MVar DevOp) 
-- TODO: review to see if this queue is acceptable
type ByteAddr = Len Word8 Word64

data PosOp = PosOp ByteAddr RWOp deriving Show
data NonPosOp = Sync (IO ()) | FullBarrier
instance Show NonPosOp where
  show (Sync _) = "Sync"
  show FullBarrier = "FullBarrier"
  
type DevOp = Either PosOp NonPosOp

data RWOp = 
  WriteOp [ArrayRange (PrimArray Pinned Word8)] (Bool -> IO ()) |
  ReadOp (ArrayRange (STPrimArray RealWorld Pinned Word8)) (Bool -> IO ())
instance Show RWOp where  
  show (WriteOp as _) = "WriteOp "  show (sum $ fmap arrayLen as)
  show (ReadOp a _) = "ReadOp "  show (arrayLen a)

-- TODO investigate why if we do not at force the ByteAddr in PosOp, testStatesDag ends with STM error
-- Forcing the rest is not necessary for this bug but makes sense here. We do not want the file thread to do work
-- that should have been done by the client threads. NFData is tricky to use here so we put together this
-- custom method.
forceDevOp :: DevOp -> ()
forceDevOp = either
             (\(PosOp b r) -> (b `seq`) $
                              case r of
                                WriteOp as _ -> sum (fmap arrayLen as) `seq` ()
                                ReadOp a _ -> arrayLen a `seq` ())
             (\n -> case n of
                 Sync _ -> ()
                 FullBarrier -> ())

instance RawStoreFile LocalStoreFile where
  storeFileWriteRaw f seek bufs k = queue f $ Left $ PosOp seek $ WriteOp bufs k
  storeFileReadRaw f seek buf k = queue f $ Left $ PosOp seek $ ReadOp buf k

instance SyncableStoreFile LocalStoreFile where
  storeFileSync f k = queue f $ Right $ Sync k
  storeFileFullBarrier f = queue f $ Right FullBarrier

queue :: LocalStoreFile -> DevOp -> IO () 
queue (LocalStoreFile c) op = evaluate (forceDevOp op) >> putMVar c op {- >> putStrLn ("Queued " ++ show op) -}

-- TODO rework sync mechanism. It was fine when we assumed that all completed write tasks completed successfully, but if some are
-- to remote files that time out, we want to know when all write completed successfully.


narrowBufsLen :: Endian w => [PrimArray Pinned w] -> Len w Word32
narrowBufsLen = (fromMaybe (error "Array length cannot be expressed as a Word32") . unapply super <$>) . sum . fmap arrayLen

storeFileWrite1 :: (RawStoreFile f, Endian w) => f -> Len Word64 Word64 -> Endianness -> [PrimArray Pinned w] -> IO ()
storeFileWrite1 f addr e bufs =
    storeFileWriteWords f (refineLen addr) e bufs $ bool (error "storeFileWrite failed") $ return () -- TODO pass a meaningful k

storeFileRead1 :: (RawStoreFile f, Validator v, ValidatedElem v ~ w, Endian w, LgMultiple w Word8) => 
                   f -> Len Word64 Word64 -> Len w Word32 -> Endianness -> v -> Async (Maybe (ArrayRange (PrimArray Pinned w))) ()
storeFileRead1 f addr size e v k =
    ($ k) $ mapAsync (return . (>>= {-logWhenBad .-} validate v . fullArrayRange)) $ storeFileReadWords f (refineLen addr) e size {-where
      logWhenBad Nothing = trace "Validation failed" Nothing
      logWhenBad (Just x) = Just x-}

-- After inspection of GHC.IO.FD, it seems that hFlush only flushes to the OS
-- Call chain:  
-- hFlush calls flushWriteBuffer (See http://www.haskell.org/ghc//docs/6.12.2/html/libraries/base-4.2.0.1/src/GHC-IO-Handle.html)
-- which calls writeBuf (See http://darcs.haskell.org/packages/base/GHC/IO/FD.hs)
-- which calls RawIO's write (See http://www.haskell.org/ghc/docs/6.12.3/html/libraries/base-4.2.0.2/src/GHC-IO-BufferedIO.html#flushWriteBuffer0)

-- Some links on the subject
-- http://www.scribd.com/doc/19537350/A-Wander-Through-GHCs-New-IO-Library  
-- http://lwn.net/Articles/270891/
-- http://ldn.linuxfoundation.org/article/filesystems-data-preservation-fsync-and-benchmarks-pt-1
-- http://www.linux-archive.org/device-mapper-development/422441-block-fs-replace-hardbarrier-flush-fua-take-2-a.html
-- FSync FFI is available here: http://hackage.haskell.org/packages/archive/cautious-file/0.1.5/doc/html/src/System-Posix-Fsync.html
  

--

class Show a => Schedule a where
  emptySchedule :: a
  insertOp :: PosOp -> a -> a
  nextOp :: a -> Maybe (PosOp, a)
  
-- Multimap through a Map of NEList
minViewWithKeyNE :: Ord k => Map.Map k (NEList v) -> Maybe ((k, v), Map.Map k (NEList v))
minViewWithKeyNE = (getSingle <$>) . Map.minViewWithKey where
  getSingle ((k, ne), m) = flip onNEList ne $ \single ne' -> ((k, single), maybe id (Map.insert k) ne' m)

data CLook = CLook ByteAddr (Map.Map ByteAddr (NEList PosOp)) (Map.Map ByteAddr (NEList PosOp)) deriving Show
updateLow :: (Map.Map ByteAddr (NEList PosOp) -> Map.Map ByteAddr (NEList PosOp)) -> CLook -> CLook
updateLow u (CLook c low high) = CLook c (u low) high
updateHigh :: (Map.Map ByteAddr (NEList PosOp) -> Map.Map ByteAddr (NEList PosOp)) -> CLook -> CLook
updateHigh u (CLook c low high) = CLook c low (u high)
instance Schedule CLook where
  emptySchedule = CLook 0 Map.empty Map.empty
  insertOp op@(PosOp seek _) s@(CLook c _ _) = {- (\s' -> trace ("insertOp on " ++ show s) s') $ -} bool updateLow updateHigh (seek >= c) (Map.insertWith neAppend seek (neSingleton op)) s
  nextOp (CLook _ low high) = {- trace ("nextOp on " ++ show s) $ -} useHighMin `firstJust` useLowMin where
    useLowMin = (\((pos, op), low') -> (op, CLook pos low' high)) <$> minViewWithKeyNE low
    useHighMin = (\((pos, op), high') -> (op, CLook pos low high')) <$> minViewWithKeyNE high

--
    
class Show f => RawFile f where
  fileWriteRaw :: f -> Len Word8 Word64 -> [ArrayRange (PrimArray Pinned Word8)] -> ErrorT String IO ()
  fileReadRaw :: f -> Len Word8 Word64 -> ArrayRange (STPrimArray RealWorld Pinned Word8) -> ErrorT String IO ()
  fileFlush :: f -> IO ()
  
instance RawFile Handle where
  fileWriteRaw h seek bufs = lift $ hSeekX h seek >> sequence_ (withArrayPtr (hPutBufLen h) <$> bufs) -- TODO fix error handling
  fileReadRaw h seek buf = ErrorT $ fmap (boolEither "" () . (== arrayLen buf)) $ hSeekX h seek >> withArrayPtr (hGetBufLen h) buf
  fileFlush = hFlush  -- todo fsync or fdatasync on FD so we flush to the drive (and the platter?, see comment below)

performAll :: (Schedule s, RawFile f) => f -> s -> IO ()
performAll f s = maybe (return ()) (\(op, s') -> perform f op >> performAll f s') $ nextOp s

perform :: RawFile f => f -> PosOp -> IO ()
perform f (PosOp seek rw) = {- putStrLn ("about to perform " ++ show p) >> -} case rw of 
  WriteOp bufs k -> runErrorT (fileWriteRaw f seek bufs) >>= (log' >>) . either (const $ k False) (const $ k True) where
    --log = putStrLn ("Wrote " ++ showLen (sum $ fmap arrayLen bufs) ++ " at " ++ showLen seek ++ " on " ++ show f {- ++ ": " ++ show bufs -})
    log' = return ()
    --log = putStr "w"
  ReadOp buf k -> runErrorT (fileReadRaw f seek buf) >>= (log' >>) . either (const $ k False) (const $ k True) where
    --log = putStrLn ("Read " ++ showLen (arrayLen buf) ++ " at " ++ showLen seek ++ " from " ++ show f)
    log' = return ()

process :: (Schedule s, RawFile f) => MVar DevOp -> f -> s -> IO ()
process c f s = {- (putStrLn ("Process on " ++ show s) >>) $ -} maybe
                (takeMVar c >>= incoming) 
                (\(op, s') -> tryTakeMVar c >>= maybe (perform f op >> process c f s') incoming) $ 
                nextOp s where
  incoming :: DevOp -> IO ()
  incoming = (>>= process c f) . either 
             (return . flip insertOp s) 
             (\op -> emptySchedule <$ 
                     (performAll f s >> case op of
                         Sync k -> fileFlush f >> k
                         FullBarrier -> fileFlush f
                     )
             ) {- . (\op -> trace ("incoming") op) -}
  
-- Seeks to the given position, expanding the file as necessary
hSeekX :: Handle -> ByteAddr -> IO ()
hSeekX h seek = do
    size <- fromIntegral <$> hFileSize h
    when (size <= seek) $ do
      hSeek h AbsoluteSeek (fromIntegral size)
      hWriteZeros h $ seek - size
    hSeek h AbsoluteSeek (fromIntegral seek)

hWriteZeros :: Handle -> ByteAddr -> IO ()
hWriteZeros h numZeros = when (numZeros > 0) $ do
  let batch = min numZeros (16*1024*1024)
  B.hPut h (B.replicate (fromIntegral batch) 0)
  hWriteZeros h (numZeros - batch)

newtype ChildException = ChildException SomeException deriving (Typeable, Show)
instance Exception ChildException

-- | Opens the specified file as a LocalStoreFile, runs the provided function and closes the file.
-- Do not make concurrent calls on the same file, place concurrency in the passed function.
withFileStoreFile :: FilePath -> (LocalStoreFile -> IO a) -> ErrorT String IO a
withFileStoreFile path user = lift $ withBinaryFile path ReadWriteMode $ \h -> hSetBuffering h NoBuffering >> withRawFile h user

-- | Opens the specified raw device as a LocalStoreFile, runs the provided function and closes the device.
-- Do not make concurrent calls on the same device, place concurrency in the passed function.
withRawDeviceStoreFile :: FilePath -> (LocalStoreFile -> IO a) -> ErrorT String IO a
withRawDeviceStoreFile path user =
  ErrorT $ bracket (openFd path ReadWrite Nothing $ defaultFileFlags {exclusive = True, append = True}) closeFd $ 
  \fd -> runErrorT $
         do fs <- lift $ getFdStatus fd
            bool (throwError "Not a raw device") (lift $ withRawFile (RawDevice fd fs 9) user) $ isCharacterDevice fs

-- | Like nesting multiple calls to 'withRawDeviceStoreFile'.
withRawDeviceStoreFiles :: [FilePath] -> ([LocalStoreFile] -> IO a) -> ErrorT String IO a
withRawDeviceStoreFiles ps user = foldr (\p u fs ->  (>>= ErrorT . pure) $ withRawDeviceStoreFile p $ \f -> runErrorT $ u $ fs  [f]) (lift . user) ps []

toFileOffset :: Integral n => Len Word8 n -> FileOffset
toFileOffset = fromIntegral . getLen

toByteCount :: Integral n => Len Word8 n -> ByteCount
toByteCount = fromIntegral . getLen

fdSeekLen :: Fd -> ByteAddr -> IO ()
fdSeekLen fd seek = () <$ fdSeek fd AbsoluteSeek (toFileOffset seek)

-- TODO: consider adding support for a 'STPrimArray RealWorld Pinned Block', and a matching address type, that would enfoce the above requirements
-- However we would have to cast/view it as an array of Word8 later on.
-- | Array's size and start must be aligned on the block size, and the ByteAddr too.
fdReadArray :: Fd -> ByteAddr -> ArrayRange (STPrimArray RealWorld Pinned Word8) -> ErrorT String IO ()
fdReadArray fd start a = ErrorT $ fmap (boolEither "" () . (==) (toByteCount $ arrayLen a)) $ 
                         fdSeekLen fd start >> withArrayPtr (\ptr len -> fdReadBuf fd ptr $ toByteCount len) a

fdWriteArray :: Fd -> ByteAddr -> ArrayRange (STPrimArray RealWorld Pinned Word8) -> ErrorT String IO ()
fdWriteArray fd start a = ErrorT $ fmap (boolEither "" () . (==) (toByteCount $ arrayLen a)) $ 
                          fdSeekLen fd start >> withArrayPtr (\ptr len -> fdWriteBuf fd ptr $ toByteCount len) a

-- A bit of info on raw devices that i did not find easily elsewhere: http://www.win.tue.nl/~aeb/linux/lk/lk-11.html#ss11.4

data RawDevice = RawDevice Fd FileStatus Int
rawDeviceBlockBytes :: RawDevice -> Len Word8 Word
rawDeviceBlockBytes (RawDevice _ _ lg) = unsafeLen $ 1 `shiftL` lg
instance Show RawDevice where show (RawDevice _ fs _) = show $ specialDeviceID fs
-- TODO merge consecutive writes to improve performance (avoids many needless reads to preserve data that will be overwritten)
instance RawFile RawDevice where                              
  fileWriteRaw r@(RawDevice fd _ _) start bufs =
    let len = up $ sum $ arrayLen <$> bufs in
    withBlockArray r start len $ ((. fullArrayRange) .) $ \tStart t -> 
    do
      let bb = rawDeviceBlockBytes r
      let tLen = arrayLen t
      let tEnd = tStart + up tLen
      when (tStart < start) $ fdReadArray fd tStart $ headArrayRange bb t
      when (start + len < tEnd) $ fdReadArray fd (tEnd - up bb) $ skipArrayRange (tLen - bb)  t
      let dest = skipArrayRange (fromJust $ unapply super $ start - tStart) t
      _ <- lift $ stToIO $ foldlM (\d b -> skipArrayRange (arrayLen b) d <$ mapMArrayCopyImm return b d) dest bufs
      fdWriteArray fd tStart t
  fileReadRaw r@(RawDevice fd _ _) start buf =
    withBlockArray r start (up $ arrayLen buf) $ ((. fullArrayRange) .) $ \tStart t -> 
     do 
       -- liftIO $ putStrLn $ "Before fdReadArray " ++ show start
       fdReadArray fd tStart t
       let rangeToCopy = skipArrayRange (fromJust $ unapply super $ start - tStart) t
       lift $ stToIO (mapMArrayCopy return rangeToCopy buf)
  fileFlush _ = return ()
  
-- Takes start and length, and passes rounded start and an aligned buffer
withBlockArray :: MonadIO m => RawDevice -> ByteAddr -> ByteAddr -> (ByteAddr -> STPrimArray RealWorld Pinned Word8 -> m a) -> m a
withBlockArray r@(RawDevice _ _ lgBlockBytes) seek len f = 
  let blockBytes = rawDeviceBlockBytes r
      seek' = getLen seek
      len' = getLen len
      start = (seek' `shiftR` lgBlockBytes) `shiftL` lgBlockBytes
      end = ((seek' + len' + up (getLen blockBytes) - 1) `shiftR` lgBlockBytes) `shiftL` lgBlockBytes
  in liftIO (stToIO $ newAlignedPinnedWord8Array blockBytes $ unsafeLen $ fromJust $ unapply super $ end - start) >>= 
     f (unsafeLen start)
           -- . trace ("withBlockArray blockBytes=" ++ show blockBytes ++ " start=" ++ show (unsafeLen start) ++ " size=" ++ (show $ arrayLen r))
  
withRawFile :: (RawFile f, Show f) => f -> (LocalStoreFile -> IO a) -> IO a
withRawFile f user = do
  chan <- newEmptyMVar
  runWithDeamon 
    ("User of " ++ show f, user $ LocalStoreFile chan) 
    ("Server for " ++ show f, bracket_ (return ()) (return (){-putStrLn $ "Server for " ++ show f ++ " ended"-}) $ process chan f (emptySchedule :: CLook))