{-- |
Module      : Database.TransferDB.DumpDB
Description : Database agnostic dump
Copyright   : (c) Mihai Giurgeanu, 2017
License     : GPL-3
Maintainer  : mihai.giurgeanu@gmail.com
Stability   : experimental
Portability : Portable
--}

{-# LANGUAGE FlexibleContexts, BangPatterns #-}
module Database.TransferDB.DumpDB where

import Prelude hiding (fail, log)

import System.IO (Handle, withBinaryFile, IOMode(ReadMode, WriteMode),
                  hSeek, SeekMode(AbsoluteSeek, SeekFromEnd), hFlush, hGetBuf, hPutBuf,
                  BufferMode(BlockBuffering), hSetBuffering, hSetBinaryMode)
import System.IO.Temp (withTempFile)

import System.Clock (Clock(Monotonic), TimeSpec(sec), getTime, diffTimeSpec, toNanoSecs)

import Control.Concurrent (forkIO)
import Control.Concurrent.STM (TVar, newTVar, modifyTVar, readTVar, writeTVar,
                               TQueue, newTQueue, readTQueue, writeTQueue,
                               TMVar, newTMVar, newEmptyTMVar, takeTMVar, putTMVar,
                               STM, atomically, check, orElse, retry)
import Control.Monad(foldM, replicateM_, join)
import Control.Monad.Trans.Reader (ReaderT, runReaderT, asks, ask, withReaderT)
import Control.Monad.Trans.Maybe (MaybeT, runMaybeT)
import Control.Monad.Trans.StringError (runStringErrorT)

import Control.Monad.IO.Class (MonadIO, liftIO)
import Control.Monad.Fail (MonadFail, fail)
import Control.Monad.Trans.Control (MonadBaseControl)

import Control.Logging (log, debugS, withStderrLogging)

import Data.Time.Clock  (getCurrentTime)
import Data.String      (fromString)
import Data.Text (Text)
import Data.List (intercalate)

import qualified Data.ByteString as B
import qualified Data.ByteString.Char8 as C

import qualified Data.Map.Strict as Map

import Foreign.Marshal.Alloc    (alloca, allocaBytes)
import Foreign.Ptr              (Ptr, castPtr)
import Foreign.Storable         (Storable(peek, poke))

import Database.TransferDB.DumpDB.Format
import Database.TransferDB.Commons (HasDBInfo(..), DBInfo(DBInfo), withConnection', connect', forAllTables, faillog, finally', finally)

import SQL.CLI (SQLHENV, SQLHDBC, SQLHSTMT, SQLINTEGER, SQLSMALLINT, SQLPOINTER, SQLHANDLE, SQLLEN, sql_handle_env, sql_handle_dbc, sql_handle_stmt, sql_no_nulls, sql_null_data)
import SQL.CLI.Utils (ColumnInfo(..), SQLConfig(..),
                      collectColumnsInfo, collectColumnsInfo', allocHandle, columns,
                      freeHandle, execDirect, forAllRecordsWithEndAndFail, forAllData, getData, disconnect)

import SQL.CLI.ODBC (odbcImplementation, setupEnv)
import SQL.ODBC     (sql_char,
                     sql_varchar,
                     sql_longvarchar,
                     sql_wchar,
                     sql_wvarchar,
                     sql_wlongvarchar,
                     sql_decimal,
                     sql_numeric,
                     sql_bit,
                     sql_tinyint,
                     sql_smallint,
                     sql_integer,
                     sql_bigint,
                     sql_real,
                     sql_float,
                     sql_double,
                     sql_binary,
                     sql_varbinary,
                     sql_longvarbinary,
                     sql_type_date,
                     sql_type_time,
                     sql_type_timestamp,
                     sql_interval_year,
                     sql_interval_month,
                     sql_interval_day,
                     sql_interval_hour,
                     sql_interval_minute,
                     sql_interval_second,
                     sql_interval_year_to_month,
                     sql_interval_day_to_hour,
                     sql_interval_day_to_minute,
                     sql_interval_day_to_second,
                     sql_interval_hour_to_minute,
                     sql_interval_hour_to_second,
                     sql_interval_minute_to_second,
                     sql_guid,
                     sql_c_char,
                     sql_c_wchar,
                     sql_c_bit,
                     sql_c_tinyint,
                     sql_c_short,
                     sql_c_long,
                     sql_c_sbigint,
                     sql_c_float,
                     sql_c_double,
                     sql_c_binary,
                     sql_c_type_date,
                     sql_c_type_time,
                     sql_c_type_timestamp,
                     sql_c_interval_year,
                     sql_c_interval_month,
                     sql_c_interval_day,
                     sql_c_interval_hour,
                     sql_c_interval_minute,
                     sql_c_interval_second,
                     sql_c_interval_year_to_month,
                     sql_c_interval_day_to_hour,
                     sql_c_interval_day_to_minute,
                     sql_c_interval_day_to_second,
                     sql_c_interval_hour_to_minute,
                     sql_c_interval_hour_to_second,
                     sql_c_interval_minute_to_second,
                     sql_c_guid,
                     odbcCTypeLen)


logsrc :: Text
logsrc = fromString "Database.TransferDB.DumpDB"

-- | the maximum size of a chunk of variable length value
maxChunkSize :: (Num a) => a
maxChunkSize = 2 * 1024 * 1024

-- | the buffer size used to copy files
bufSize :: Int
bufSize = fromIntegral maxChunkSize

-- | file buffering mode
fileBuffering :: BufferMode
fileBuffering = BlockBuffering (Just $ 10 * 1024 * 1024)

-- | keep statistics by statement handle; for each handle record the
-- number of records dumped so far and the total size dumped so far
type StatisticsMap = Map.Map C.ByteString (Int, Int)

-- | dump database options
data DumpConfig = DumpConfig {
  dump_DSN              :: String,        -- ^ ODBC data source name                 
  dump_UserName         :: String,        -- ^ user name                             
  dump_Password         :: String,        -- ^ password                              
  dump_Schema           :: String,        -- ^ schema to be dumped                   
  dump_Description      :: String,        -- ^ dump description supplied by the user 
  dump_FilePath         :: FilePath,      -- ^ the dump file name
  dump_ParallelThreads  :: Int,           -- ^ the number of threads to be run in parallel
  dump_StatisticsVar    :: TVar StatisticsMap, -- ^ the global statistics map
  dump_StartTime         :: TimeSpec       -- ^ the start time, used to compute the dump rate
  }


instance HasDBInfo DumpConfig where
  extractDBInfo cfg = DBInfo (dump_DSN cfg) (dump_UserName cfg) (dump_Password cfg) (dump_Schema cfg)

-- | restore database options
data RestoreConfig = RestoreConfig {
  restore_DSN              :: String,        -- ^ ODBC data source name                 
  restore_UserName         :: String,        -- ^ user name                             
  restore_Password         :: String,        -- ^ password                              
  restore_Schema           :: String,        -- ^ schema to be restored                   
  restore_FilePath         :: FilePath       -- ^ the dump file name                    
  }

instance HasDBInfo RestoreConfig where
  extractDBInfo cfg = DBInfo (restore_DSN cfg) (restore_UserName cfg) (restore_Password cfg) (restore_Schema cfg)

-- | dump a schema from an ODBC database to a binary file
dump :: (MonadIO m, MonadFail m, MonadBaseControl IO m) => ReaderT DumpConfig m ()
-- MonadBaseControl is required for logging


dump = withStderrLogging $ do
  filename <- asks dump_FilePath
  config <- ask
  liftIO $ withBinaryFile filename WriteMode (\handle -> do
                                           result <- runMaybeT $ runReaderT (hDump handle) config
                                           maybe (fail "Database dump failed") return result)

-- | dumps the database schema to the file represented by the given handle
hDump :: (MonadIO m, MonadFail m) => Handle -> ReaderT DumpConfig (MaybeT m) ()
hDump handle = do
  liftIO $ B.hPut handle $ writeVersion V1
  header <- makeHeader
  liftIO $ B.hPut handle $ writeHeader header
  dumpTables handle


-- | create the header of the dump file
makeHeader :: (MonadIO m) => ReaderT DumpConfig m HeaderV1
makeHeader = do
  timestamp   <- liftIO getCurrentTime
  description <- asks dump_Description
  return $ HeaderV1 maxChunkSize timestamp (C.pack description)

-- | global environment for dumping tables in parallel
data ThreadedDump = ThreadedDump {
  threads_TablesChan            :: TQueue String, -- ^ the channel to publish the name of tables
  threads_AllTablesPublishedVar :: TVar Bool,     -- ^ when this is true, no more tables will be published on the tables channel
  threads_WorkerThreadsVar      :: TVar Int,      -- ^ the number of worker threads that are running; worker threads are the threads that actually dump data
  threads_HandleVar             :: TMVar Handle,  -- ^ provides synchronized access to the dump file handle
  threads_Config                :: DumpConfig,    -- ^ the dump configuration
  threads_HEnv                  :: SQLHENV,       -- ^ shared environment allocated in the main thread; unixODBC has problem if the handle is allocated on another thread
  threads_AllocHandleChan       :: TQueue (SQLSMALLINT, SQLHANDLE, TMVar SQLHANDLE) -- ^ channel to call allocHandle on the main thread
  }

instance HasDBInfo ThreadedDump where
  extractDBInfo = extractDBInfo . threads_Config

-- | dumps all tables in a schema, on a given connection
dumpTables :: (MonadIO m, MonadFail m) => Handle -> ReaderT DumpConfig (MaybeT m) ()
dumpTables handle = do
  threads <- asks dump_ParallelThreads
  case threads of
    0 -> do liftIO $ log $ fromString "Dumping tables without using threads"
            _ <- withConnection' (\ _ hdbc -> withReaderT SingleThreaded $ forAllTables hdbc (0, 0) (dumpTable handle hdbc))
            return ()
    _ -> do liftIO $ log $ fromString $ "Dumping tables using " ++ (show threads) ++ " threads"
            tablesChan              <- liftIO $ atomically newTQueue
            allTablesPublished      <- liftIO $ atomically $ newTVar False
            workerThreads           <- liftIO $ atomically $ newTVar 0
            dumpFileHandle          <- liftIO $ atomically $ newTMVar handle
            henv                    <- setupEnv
            allocHandleChan         <- liftIO $ atomically $ newTQueue
            finally (liftIO $ log (fromString $ "free environment handle " ++ (show henv)) >> freeHandle sql_handle_env henv) $ do
              withReaderT (\ cfg -> ThreadedDump tablesChan allTablesPublished workerThreads dumpFileHandle cfg henv allocHandleChan) $ do
                startWorkerThreads
                publishTables
                waitForWorkToEnd
              
-- | publish the tables to the chanel read by the worker threads
publishTables :: (MonadIO m, MonadFail m) => ReaderT ThreadedDump m ()
publishTables =  do
  env    <- ask
  henv   <- asks threads_HEnv
  publishEndVar <- asks threads_AllTablesPublishedVar
  hdbc   <- connect' henv
  let freehdbc = liftIO $ freeHandle sql_handle_dbc hdbc
  liftIO $ do
    result <- runMaybeT $ runReaderT (finally freehdbc $ forAllTables hdbc 0 publishTable) env
    maybe (log $ fromString $ "publishTables failed") (\ n -> log $ fromString $ "all tables have been published: " ++ (show n)) result
    atomically $ writeTVar publishEndVar True
  return ()

-- | monadic action to publish the table name on the tables channel
publishTable :: (MonadIO m, MonadFail m) => Int -> String -> ReaderT ThreadedDump m Int
publishTable crt tableName = do
  liftIO $ log $ fromString $ "publish table: " ++ tableName
  tablesChan <- asks threads_TablesChan
  liftIO $ atomically $ writeTQueue tablesChan tableName
  return (crt + 1)
  

-- | start the worker threads; each worker thread will dump data in a temporary file,
-- then will append the contents of the temporary file to the contents of the dump file; the
-- append is done synchronized with the other worker threads, so only one worker thread will
-- append to the main dump file
startWorkerThreads :: (MonadIO m, MonadFail m) => ReaderT ThreadedDump m ()
startWorkerThreads = do
  count <- asks (dump_ParallelThreads . threads_Config)
  replicateM_ count startWorkerThread 


-- | start one worker thread
startWorkerThread :: (MonadIO m, MonadFail m) => ReaderT ThreadedDump m ()
startWorkerThread = do
  env    <- ask
  henv   <- asks threads_HEnv
  hdbc   <- connect' henv
  let freehdbc = disconnect hdbc >> (liftIO $ freeHandle sql_handle_dbc hdbc) 
  threadsCountVar <- asks threads_WorkerThreadsVar
  t <- liftIO $ atomically $ do crt <- readTVar threadsCountVar
                                writeTVar threadsCountVar (crt + 1)
                                return (crt + 1)
  liftIO $ log $ fromString $ "Starting thread " ++ (show t)
  _ <- liftIO $ forkIO $ withTempFile "." "transfer-db.dmp"
       (\ path handle -> do
           log $ fromString $ "started worker thread: " ++ path
           hSetBuffering handle fileBuffering
           hSetBinaryMode handle True
           result <- runMaybeT $ runReaderT (finally freehdbc $ dumpThread handle henv hdbc) env
           log $ fromString $ "worker thread ended " ++ (maybe "with failure: " (\ _ -> "with success: ") result) ++ path
           atomically $ modifyTVar threadsCountVar (subtract 1) 
       )
  return ()

-- | the worker thread; gets a table name from the tables channel and dumps the data in the
-- temporary file; in the end, appends the temporary file contents to the end of the dump file contents
dumpThread :: (MonadIO m, MonadFail m) => Handle -> SQLHENV -> SQLHDBC -> ReaderT ThreadedDump m ()
dumpThread htmpfile _ hdbc = do
  tablesChan            <- asks threads_TablesChan
  allTablesPublishedVar <- asks threads_AllTablesPublishedVar
  handleVar             <- asks threads_HandleVar
  env                   <- ask
  
  liftIO $ log $ fromString "entered dumpThread"
  let dumpNextTables  :: STM (IO ())
      dumpNextTables  = orElse dumpNextTables' waitTableOrEnd
      dumpNextTables' :: STM (IO ())
      dumpNextTables' = do
        tableName <- readTQueue tablesChan
        return $ do
          log $ fromString $ "start dumping table: " ++ tableName
          result <- runStringErrorT $ runReaderT (dumpTable htmpfile hdbc (0, 0) tableName) (MultiThreaded env)
          either (\s -> log $ fromString $ "dumping table " ++ tableName ++ " failed: " ++ s) (\ _ -> return ()) result
          join $ atomically $ dumpNextTables
      waitTableOrEnd :: STM (IO ())
      waitTableOrEnd = do
        allTablesPublished <- readTVar allTablesPublishedVar
        if allTablesPublished then return $ (log $ fromString $ "allTablesPublished = " ++ (show allTablesPublished)) >> finalizeDump else retry
      finalizeDump :: IO ()
      finalizeDump = do
        log $ fromString $ "finalizing dump thread"
        hdumpfile <- atomically $ takeTMVar handleVar
        copyTmpToDumpFile htmpfile hdumpfile
        atomically $ putTMVar handleVar hdumpfile
        

  liftIO $ join $ atomically dumpNextTables

-- | finalizes the dump by copying the temporary file back into
-- the dump file
copyTmpToDumpFile :: Handle -> Handle -> IO ()
copyTmpToDumpFile htmp hdmp = do
  hFlush htmp
  hSeek htmp AbsoluteSeek 0
  hSeek hdmp SeekFromEnd 0
  allocaBytes bufSize
    (\ buf -> let copyFile = do
                    sz <- hGetBuf htmp buf bufSize
                    if sz > 0
                      then do hPutBuf hdmp buf sz
                              if sz >= bufSize then copyFile else return ()
                      else return ()
              in copyFile )

-- | wait for worker threads to complete the work
waitForWorkToEnd :: (MonadIO m) => ReaderT ThreadedDump m ()
waitForWorkToEnd = do
  threadsCountVar <- asks threads_WorkerThreadsVar
  allocHandleChan <- asks threads_AllocHandleChan

  let waitIO = join $ atomically $ orElse (readTVar threadsCountVar >>= check . (<= 0) >> (return $ return ())) (allocHandleT allocHandleChan >>= \ io -> return (io >> waitIO))
  liftIO $ waitIO
  liftIO $ log $ fromString $ "all worker threads have finished"

-- | creates an IO action inside a STM monad to allocate a new handler in the current thread
allocHandleT :: (MonadIO m, MonadFail m) => TQueue (SQLSMALLINT, SQLHANDLE, TMVar SQLHANDLE) -> STM (m ())
allocHandleT chan = do
  (hType, hParent, retVar) <- readTQueue chan
  return $ allocHandle hType hParent >>= liftIO . atomically . (putTMVar retVar) 

-- | make a handle alloc request to the main thread and wait for result
allocHandleReq :: (MonadIO m, MonadFail m) => SQLSMALLINT -> SQLHANDLE -> ReaderT ThreadedDump m SQLHANDLE
allocHandleReq htype hparent = do
  allocHandleChan <- asks threads_AllocHandleChan
  resultVar       <- liftIO $ atomically $ newEmptyTMVar
  liftIO $ atomically $ writeTQueue allocHandleChan (htype, hparent, resultVar)
  liftIO $ atomically $ takeTMVar resultVar

-- | environment for either single threaded or multi threaded dump
data SingleOrMulti = SingleThreaded DumpConfig | MultiThreaded ThreadedDump
  
-- | extract dumpConfig from a 'SingleOrMulti' structure
dumpConfig :: SingleOrMulti -> DumpConfig
dumpConfig (SingleThreaded  x) = x
dumpConfig (MultiThreaded x)   = threads_Config x

instance HasDBInfo SingleOrMulti where
  extractDBInfo = extractDBInfo . dumpConfig

-- | either directly alloc handle or call 'allocHandleReq' to alloc a handle deppending
-- on it is run on a threaded or non threaded environment
allocHandleSM :: (MonadIO m, MonadFail m) => SQLSMALLINT -> SQLHANDLE -> ReaderT SingleOrMulti m SQLHANDLE
allocHandleSM htype hparent = do
  env <- ask
  case env of
    SingleThreaded _          -> allocHandle htype hparent
    MultiThreaded threadeEnv  -> withReaderT (const threadeEnv) $ allocHandleReq htype hparent

-- | workarround for unixODBC bug that requires that all handles should be allocated
-- on the main thread
collectColumnsInfoSM :: (MonadIO m, MonadFail m) => SQLHDBC      -- ^ connection handle
  -> String                                                      -- ^ schema name
  -> String                                                      -- ^ table name
  -> ReaderT SingleOrMulti m [ColumnInfo]
collectColumnsInfoSM hdbc schemaName tableName = do
  env <- ask
  case env of
    SingleThreaded _ -> withReaderT (const odbcImplementation) $ collectColumnsInfo hdbc schemaName tableName
    MultiThreaded  x -> withReaderT (const x) $ do
      hstmt <- allocHandleReq sql_handle_stmt hdbc
      columns hstmt Nothing (Just schemaName) (Just tableName) Nothing
      withReaderT (const odbcImplementation) $ collectColumnsInfo' hstmt

-- | dumps a single table
dumpTable :: (MonadIO m, MonadFail m) => Handle -> SQLHDBC -> (Int, Int) -> String -> ReaderT SingleOrMulti m (Int, Int)
dumpTable handle hdbc _ tableName = do
  schema <- extractSchema hdbc tableName
  liftIO $ debugS logsrc $ fromString $ "Schema " ++ (C.unpack $ schema_DBSchemaName schema) ++ "." ++ (C.unpack $ schema_TableName schema)
  liftIO $ sequence_ $ map debugFieldInfo $ schema_Fields schema 
  liftIO $ B.hPut handle $ writeSchema schema
  (!recs, !bytes) <- dumpTableData handle hdbc schema
  liftIO $ B.hPut handle writeEOT
  return (recs, bytes)
  
-- | logs the content of a 'FieldInfo' structure
debugFieldInfo :: FieldInfoV1 -> IO ()
debugFieldInfo f = do
  debugS logsrc $ fromString $ "\tfi_ColumnName:    " ++ (C.unpack $ fi_ColumnName f)
  debugS logsrc $ fromString $ "\tfi_DataType:      " ++ (show $ fi_DataType f)
  debugS logsrc $ fromString $ "\tfi_ColumnSize:    " ++ (maybe "(null)" show $ fi_ColumnSize f)
  debugS logsrc $ fromString $ "\tfi_BufferLength:  " ++ (maybe "(null)" show $ fi_BufferLength f)
  debugS logsrc $ fromString $ "\tfi_DecimalDigits: " ++ (maybe "(null)" show $ fi_DecimalDigits f)
  debugS logsrc $ fromString $ "\tfi_NumPrecRadix:  " ++ (maybe "(null)" show $ fi_NumPrecRadix f)
  debugS logsrc $ fromString $ "\tfi_Nullabe:       " ++ (show $ fi_Nullable f)
  
-- | extract schema infromation from the database, using an existing db connection
extractSchema :: (MonadIO m, MonadFail m) => SQLHDBC -> String -> ReaderT SingleOrMulti m SchemaV1
extractSchema hdbc tableName = do
  schemaName <- asks (dump_Schema.dumpConfig)
  fields     <- extractSchemaFields hdbc tableName
  return $ SchemaV1 (C.pack schemaName) (C.pack tableName) fields

-- | extract the fields information from the database
extractSchemaFields :: (MonadIO m, MonadFail m) => SQLHDBC -> String -> ReaderT SingleOrMulti m [FieldInfoV1]
extractSchemaFields hdbc tableName = do
  schemaName  <- asks (dump_Schema.dumpConfig)
  cols        <- collectColumnsInfoSM hdbc schemaName tableName
  return $ map makeFieldInfo cols

-- | transforms a 'ColumnInfo' structure into a 'FieldInfoV1' structure
makeFieldInfo :: ColumnInfo -> FieldInfoV1
makeFieldInfo ci = FieldInfoV1 { fi_ColumnName      = C.pack $ ci_ColumnName ci,
                                 fi_DataType        = ci_DataType ci,
                                 fi_ColumnSize      = ci_ColumnSize ci,
                                 fi_BufferLength    = ci_BufferLength ci,
                                 fi_DecimalDigits   = ci_DecimalDigits ci,
                                 fi_NumPrecRadix    = ci_NumPrecRadix ci,
                                 fi_Nullable        = ci_Nullable ci,
                                 fi_OrdinalPosition = ci_OrdinalPosition ci}

-- | dump the table records to the file, one by one; returns the (number of records,
-- size in bytes) of dumped data
dumpTableData :: (MonadIO m, MonadFail m) => Handle -> SQLHDBC -> SchemaV1 -> ReaderT SingleOrMulti m (Int, Int)
dumpTableData handle hdbc schema = do
  let tableName' = schema_QualifiedTableName schema
      tableName = C.unpack tableName'
  liftIO $ log $ fromString $ "dumping table: " ++ tableName
  select <- makeSelectSql schema
  hstmt  <- allocHandleSM sql_handle_stmt hdbc
  env    <- ask
  finally' ((log $ fromString $ "freeHandle for " ++ tableName) >> freeHandle sql_handle_stmt hstmt) $ do
    execDirect hstmt select (faillog $ "param data requested for select '" ++ select ++ "'")
    result <- liftIO $ allocaBytes (fromIntegral maxChunkSize)
              (\ p_transferBuf -> alloca
                (\ p_transferLenOrInd -> do
                    let dumpAction           = dumpRecord handle hstmt schema p_transferBuf maxChunkSize p_transferLenOrInd
                        endAction  hstmt x   = logstats tableName' x >> return x
                        failAction hstmt x s = logstats tableName' x >> fail s
                    runStringErrorT $ runReaderT (forAllRecordsWithEndAndFail hstmt dumpAction (endAction hstmt) (failAction hstmt)  (0,0)) env))
    (cnt, size) <- either (\ s -> faillog $ "transfer table " ++ (C.unpack $ schema_TableName schema) ++ " failed: " ++ s) return result
    liftIO $ log $ fromString $ "Finished dupming table " ++ (C.unpack $ schema_DBSchemaName schema)
      ++ "." ++ (C.unpack $ schema_TableName schema) ++ "; dumped " ++ (show cnt)
      ++ " records of " ++ (show size) ++ " bytes"
    size `seq` cnt `seq` return (cnt, size)

-- | dumps the data of one record into the file; it returns the (number of records, total bytes dumped)
dumpRecord :: (MonadIO m, MonadFail m) => Handle -> SQLHSTMT -> SchemaV1 -> SQLPOINTER -> SQLLEN -> Ptr SQLLEN -> (Int, Int) -> ReaderT SingleOrMulti m (Int, Int)
dumpRecord handle hstmt schema p_buf bufLen p_lenOrInd (cnt, sz) = do
  liftIO $ B.hPut handle writeRI
  -- TODO: if an error occurs writing any of the fields and one or more fields are not
  -- properly written in the file, the dump file will be corrupted
  let fields  = zip (map fromIntegral [1 .. length fields']) fields'
      fields' = schema_Fields schema
  sz' <- foldM (dumpField handle hstmt p_buf bufLen p_lenOrInd) sz fields
  let cnt' = cnt + 1
  if cnt' `mod` 100000 == 0
    then do let tableName = schema_QualifiedTableName schema
            logStatistics tableName cnt' sz'
    else return ()
  cnt' `seq` sz' `seq` return (cnt', sz')

-- | monadic action that logs the data dumped until now; it uses a map that records the count of records
-- and the size dumped for each statement handle
logStatistics :: (MonadIO m, MonadFail m) => C.ByteString -> Int -> Int -> ReaderT SingleOrMulti m ()
logStatistics logkey cnt sz = do
  sizesVar <- asks readSizesVar
  (totalCnt, totalSz) <- liftIO $ atomically $ do
    sizesMap <- readTVar sizesVar
    let updatedSizesMap = Map.insert logkey (cnt, sz) sizesMap
    writeTVar sizesVar updatedSizesMap
    return $ Map.foldr' (\ (c1, s1) (c2, s2) -> (c1 + c2, s1 + s2)) (0, 0) updatedSizesMap
  startTime <- asks readStartTime
  crtTime   <- liftIO $ getTime Monotonic
  let duration = diffTimeSpec crtTime startTime
      szRate   = (fromIntegral totalSz) * 1000000000 `div` (toNanoSecs duration)
  liftIO $ log $ fromString $ ">>>>>> Running for " ++ (show $ sec duration) ++ " seconds"
  liftIO $ log $ fromString $ ">>>>>> (" ++ (C.unpack logkey) ++ ") " ++ (show totalCnt) ++ " records / " ++ (show $ totalSz) ++ " bytes" ++ ", " ++ (show szRate) ++ " bytes/sec"

-- | uncurried form of ('lotStatistics' hstmt)
logstats :: (MonadIO m, MonadFail m) => C.ByteString -> (Int, Int) -> ReaderT SingleOrMulti m ()
logstats hstmt = uncurry $ logStatistics hstmt

-- | get the 'TVar' 'StatiscsMap' from the environment
readSizesVar :: SingleOrMulti -> TVar StatisticsMap
readSizesVar (SingleThreaded x) = dump_StatisticsVar x
readSizesVar (MultiThreaded  x) = (dump_StatisticsVar.threads_Config) x

-- | get the start time from the environment
readStartTime :: SingleOrMulti -> TimeSpec
readStartTime (SingleThreaded x) = dump_StartTime x
readStartTime (MultiThreaded  x) = (dump_StartTime.threads_Config) x

-- | dump data of a single field; adds the size of dumped data to the total size
-- received as parameter
dumpField :: (MonadIO m, MonadFail m) => Handle -> SQLHSTMT -> SQLPOINTER -> SQLLEN -> Ptr SQLLEN -> Int -> (SQLSMALLINT, FieldInfoV1) -> m Int
dumpField handle hstmt p_buf buflen p_lenOrInd sz (crt, fld) =
  runReaderT dumpField' $ DumpFieldSpec handle hstmt p_buf buflen p_lenOrInd sz crt fld

-- | dump field data parameters
data DumpFieldSpec = DumpFieldSpec {
  dmpfld_Handle   :: Handle,            -- ^ dump file handle
  dmpfld_HStmt    :: SQLHSTMT,          -- ^ table select statement
  dmpfld_Buf      :: SQLPOINTER,        -- ^ data transfer buffer
  dmpfld_BufLen   :: SQLLEN,            -- ^ the size of data transfer buffer
  dmpfld_LenOrInd :: Ptr SQLLEN,        -- ^ buffer to get null indicator or the actual size of transferred data
  dmpfld_Size     :: Int,               -- ^ the total data transferred prior to this field
  dmpfld_Crt      :: SQLSMALLINT,       -- ^ the number in the statement of the field to be dumped
  dmpfld_Field    :: FieldInfoV1        -- ^ the description of the field
  }

-- | ReaderT monadic action that dumps a field into the dump file; returns the total dump size,
-- adding to the prior size the size in bytes of dumped data
dumpField' :: (MonadIO m, MonadFail m) => ReaderT DumpFieldSpec m Int
dumpField' = do
  dataType <- asks $ fi_DataType . dmpfld_Field
  case dataType of
    _ | dataType == sql_char                      -> dumpVarLenField sql_c_char
      | dataType == sql_varchar                   -> dumpVarLenField sql_c_char
      | dataType == sql_longvarchar               -> dumpVarLenField sql_c_char
      | dataType == sql_wchar                     -> dumpVarLenField sql_c_wchar
      | dataType == sql_wvarchar                  -> dumpVarLenField sql_c_wchar
      | dataType == sql_wlongvarchar              -> dumpVarLenField sql_c_wchar
   
      | dataType == sql_decimal                   -> dumpVarLenField sql_c_char
      | dataType == sql_numeric                   -> dumpVarLenField sql_c_char

      | dataType == sql_bit                       -> dumpFixedField  sql_c_bit
      | dataType == sql_tinyint                   -> dumpFixedField  sql_c_tinyint
      | dataType == sql_smallint                  -> dumpFixedField  sql_c_short
      | dataType == sql_integer                   -> dumpFixedField  sql_c_long
      | dataType == sql_bigint                    -> dumpFixedField  sql_c_sbigint

      | dataType == sql_real                      -> dumpFixedField  sql_c_float
      | dataType == sql_float                     -> dumpFixedField  sql_c_double
      | dataType == sql_double                    -> dumpFixedField  sql_c_double

      | dataType == sql_binary                    -> dumpVarLenField sql_c_binary
      | dataType == sql_varbinary                 -> dumpVarLenField sql_c_binary
      | dataType == sql_longvarbinary             -> dumpVarLenField sql_c_binary
    
      | dataType == sql_type_date                 -> dumpFixedField  sql_c_type_date
      | dataType == sql_type_time                 -> dumpFixedField  sql_c_type_time
      | dataType == sql_type_timestamp            -> dumpFixedField  sql_c_type_timestamp

      | dataType == sql_interval_year             -> dumpFixedField  sql_c_interval_year
      | dataType == sql_interval_month            -> dumpFixedField  sql_c_interval_month
      | dataType == sql_interval_day              -> dumpFixedField  sql_c_interval_day
      | dataType == sql_interval_hour             -> dumpFixedField  sql_c_interval_hour
      | dataType == sql_interval_minute           -> dumpFixedField  sql_c_interval_minute
      | dataType == sql_interval_second           -> dumpFixedField  sql_c_interval_second
      | dataType == sql_interval_year_to_month    -> dumpFixedField  sql_c_interval_year_to_month
      | dataType == sql_interval_day_to_hour      -> dumpFixedField  sql_c_interval_day_to_hour
      | dataType == sql_interval_day_to_minute    -> dumpFixedField  sql_c_interval_day_to_minute
      | dataType == sql_interval_day_to_second    -> dumpFixedField  sql_c_interval_day_to_second
      | dataType == sql_interval_hour_to_minute   -> dumpFixedField  sql_c_interval_hour_to_minute
      | dataType == sql_interval_hour_to_second   -> dumpFixedField  sql_c_interval_hour_to_second
      | dataType == sql_interval_minute_to_second -> dumpFixedField  sql_c_interval_minute_to_second
      | dataType == sql_guid                      -> dumpFixedField  sql_c_guid
      | otherwise                                 -> dumpUnknownFieldType

-- | dumps a variable field length; the variable field length will be dumped in chunks
-- each chunk having two fields: a length of the chunk and the actual data of the chunk. The
-- length of the chunk will be encoded on one, two or four bytes, depending on the maximum
-- chunk length and the maximum length of the field taken from the table's schema.
dumpVarLenField :: (MonadIO m, MonadFail m) => SQLSMALLINT -> ReaderT DumpFieldSpec m Int
dumpVarLenField bufferType = do
  bufferSize <- asks dmpfld_BufLen
  hstmt      <- asks dmpfld_HStmt
  colnum     <- asks dmpfld_Crt
  p_buffer   <- asks dmpfld_Buf
  p_LenOrInd <- asks dmpfld_LenOrInd
  size       <- asks dmpfld_Size
  lenlen     <- octetLengthOfChunkSize
  
  (_, size') <- forAllData hstmt colnum bufferType p_buffer bufferSize p_LenOrInd (dumpChunk lenlen) (0, size)
  size' `seq` return size'

-- | the octet length of the chunk size; it is calculated based on the transfer buffer size
-- and on the maxumum size of the field.
octetLengthOfChunkSize :: (Num a, Monad m) => ReaderT DumpFieldSpec m a
octetLengthOfChunkSize = do
  bufferSize <- asks dmpfld_BufLen
  fieldSize  <- asks $ fi_BufferLength . dmpfld_Field
  let
    chunkSize = (fromIntegral bufferSize) - 1 -- for \0 character
    lenlen = maybe lenlen' (\sz -> if sz < chunkSize
                                   then if sz <= 256
                                        then 1
                                        else if sz <= 65536
                                             then 2
                                             else 4
                                   else lenlen') fieldSize
    lenlen' = if chunkSize <= 256
              then 1
              else if chunkSize <= 65536
                   then 2
                   else 4
  return lenlen

-- | dumps a chunk of data; increments the current size with the number of octets
-- dumped and returns this value
dumpChunk :: (MonadIO m, MonadFail m) => Int -> (Int, Int) -> ReaderT DumpFieldSpec m (Int, Int)
dumpChunk lenlen (chunkNo, size) = do
  nullable <- asks $ fi_Nullable.dmpfld_Field
  size' <- if chunkNo > 0 || nullable == sql_no_nulls 
           then dumpChunk' lenlen size
           else do p_lenOrInd    <- asks dmpfld_LenOrInd
                   lenOrInd      <- liftIO $ peek p_lenOrInd
                   if lenOrInd == sql_null_data
                     then dumpNullIndicator Null size
                     else do size'' <- dumpNullIndicator NotNull size
                             dumpChunk' lenlen size''
  size' `seq` return (chunkNo+1, size')

dumpNullIndicator :: (MonadIO m) => NullIndicator -> Int -> ReaderT DumpFieldSpec m Int
dumpNullIndicator indicator size = do
  let bs        = writeNullIndicator indicator
  handle        <- asks dmpfld_Handle
  liftIO $ B.hPut handle bs
  let size' = size + (B.length bs)
  size' `seq` return size'

-- | dumps a chunk of data for a non null field; increments the current size with the number of octets
-- dumped and returns this value
dumpChunk' :: (MonadIO m, MonadFail m) => Int -> Int -> ReaderT DumpFieldSpec m Int
dumpChunk' lenlen size = do
  columnName    <- asks $ fi_ColumnName.dmpfld_Field
  p_buf         <- asks dmpfld_Buf
  p_lenOrInd    <- asks dmpfld_LenOrInd
  handle        <- asks dmpfld_Handle
  lenOrInd      <- liftIO $ peek p_lenOrInd
  buffersize    <- asks dmpfld_BufLen
  bs            <- if lenOrInd > -1
                   then let datalen     = fromIntegral lenlen
                            buffersize' = (fromIntegral buffersize) - 1 -- reserve space for null terminator
                            chunklen    = if datalen > buffersize' then buffersize' else datalen
                        in return $ writeChunk (fromIntegral lenlen) chunklen (castPtr p_buf)
                   else fail $ "dumpChunk' received unexpected null field (" ++ (show lenOrInd) ++ "); column " ++ (C.unpack columnName) 
  liftIO $ B.hPut handle bs
  let size' = size + (B.length bs)
  size' `seq` return size'
  
-- | dumps a fixed length field; the buffer length of the field will be taken from the description
-- of the field in the table's schema; the dumped data will only contain the data of the field;
-- the first parameter represents the ODBC C data type of the data to be read from the database
dumpFixedField :: (MonadIO m, MonadFail m) => SQLSMALLINT -> ReaderT DumpFieldSpec m Int
dumpFixedField bufferType = do
  bufferSize <- asks dmpfld_BufLen
  hstmt      <- asks dmpfld_HStmt
  colnum     <- asks dmpfld_Crt
  p_buffer   <- asks dmpfld_Buf
  p_LenOrInd <- asks dmpfld_LenOrInd
  size       <- asks dmpfld_Size

  liftIO $ poke p_LenOrInd 0
  getData hstmt colnum bufferType p_buffer bufferSize p_LenOrInd

  lenOrInd   <- liftIO $ peek p_LenOrInd
  nullable   <- asks $ fi_Nullable.dmpfld_Field
  
  columnName <- asks $ fi_ColumnName.dmpfld_Field

  if nullable == sql_no_nulls
    then if lenOrInd == sql_null_data
         then fail $ "null value read from not nullable field " ++ (C.unpack columnName)
         else dumpFixedField' bufferType size lenOrInd
    else if lenOrInd == sql_null_data
         then dumpNullIndicator Null size
         else do size' <- dumpNullIndicator NotNull size
                 dumpFixedField' bufferType size' lenOrInd

-- | dumps the data of a not null field
dumpFixedField' :: (MonadIO m, MonadFail m) => SQLSMALLINT -> Int -> SQLLEN -> ReaderT DumpFieldSpec m Int
dumpFixedField' bufferType size lenOrInd = do
  let wellKnownSize = odbcCTypeLen bufferType
  columnName <- asks $ fi_ColumnName.dmpfld_Field
  handle     <- asks $ dmpfld_Handle
  p_buffer   <- asks $ dmpfld_Buf
  bs <- if maybe False (lenOrInd /=) wellKnownSize
        then fail $ "the actual length of the field (" ++ (show lenOrInd) ++ ") is different from the schema size of the field (" ++ (show wellKnownSize) ++ "): " ++ (C.unpack columnName)
        else return $ writePlainBuf (castPtr p_buffer) (fromIntegral lenOrInd)
  liftIO $ B.hPut handle bs
  let size' = size + (B.length bs)
  size' `seq` return size'


-- | dumps a field of an unknown type; the field will be converted to SQL_C_CHAR and dumped as
-- a string of characters
dumpUnknownFieldType :: (MonadIO m, MonadFail m) => ReaderT DumpFieldSpec m Int
dumpUnknownFieldType = fail "dumpUnknownFieldType not implemented"

-- | create the select for dumping table data
makeSelectSql :: (MonadIO m, MonadFail m) => SchemaV1 -> m String
makeSelectSql schema =
  let tableName  = C.unpack $ schema_TableName schema
      schemaName = C.unpack $ schema_DBSchemaName schema
      qualifiedTableName = case schemaName of
                             [] -> tableName
                             s  -> s ++ "." ++ tableName
      fieldsList = intercalate ", " $ map (C.unpack.fi_ColumnName) fields
      fields     = schema_Fields schema
  in
    return $ "select " ++ fieldsList ++ " from " ++ qualifiedTableName
    

-- | read the binary dump of a database and restores it in a destination ODBC data source
restore :: (MonadIO m, MonadFail m, MonadBaseControl IO m) => ReaderT RestoreConfig m () -- MonadBaseControl is required for logging
restore = withStderrLogging $ return ()