{-# LANGUAGE OverloadedStrings,RankNTypes #-}

module Database.Alteryx.StreamingYxdb
       (
        blocksToDecompressedBytes,
        blocksToRecords,
        sinkRecords,
        getMetadata,
        recordsToBlocks,
        sourceFileBlocks,
        sourceFileRecords
       )where

import Conduit
import Control.Applicative
import Control.Lens hiding (from, to)
import Control.Monad as M
import Control.Monad.Primitive as M
import Control.Monad.Trans.Resource
import qualified Control.Monad.Trans.State.Lazy as State
import qualified Control.Newtype as NT
import Data.Array.Unboxed as A
import Data.Binary
import Data.Binary.Get
import Data.Binary.Put
import Data.ByteString as BS
import Data.ByteString.Builder as BSB
import Data.ByteString.Builder.Extra as BSB
import Data.ByteString.Lazy as BSL
import Data.Conduit
import Data.Conduit.Binary
import Data.Conduit.Combinators as CC
import Data.Conduit.Serialization.Binary
import Data.Monoid
import Data.Text as T
import Data.Time.Clock
import Data.Time.Clock.POSIX
import Data.Vector as V (toList)
import System.IO

import Database.Alteryx.Serialization
import Database.Alteryx.Types

readRange :: (MonadResource m) => FilePath -> Maybe Int -> Maybe Int -> m BS.ByteString
readRange filepath from to = sourceFileRange filepath (fromIntegral <$> from) (fromIntegral <$> to) $$ fold

getMetadata :: FilePath -> IO YxdbMetadata
getMetadata filepath = runResourceT $ do
  headerBS <- readRange filepath Nothing (Just headerPageSize)
  let header = decode $ BSL.fromStrict headerBS :: Header

  recordInfoBS <- readRange filepath (Just headerPageSize) (Just $ numMetadataBytesHeader header)
  let recordInfo = decode $ BSL.fromStrict recordInfoBS :: RecordInfo

  blockIndexBS <- readRange filepath (Just $ fromIntegral $ header ^. recordBlockIndexPos) Nothing
  let blockIndex = decode $ BSL.fromStrict blockIndexBS :: BlockIndex

  return YxdbMetadata {
    _metadataHeader     = header,
    _metadataRecordInfo = recordInfo,
    _metadataBlockIndex = blockIndex
    }

type BlockRange = (Int, Int)
type BlockRanges = [BlockRange]

sourceFileRecords :: (MonadResource m, MonadIO m) => FilePath -> Source m Record
sourceFileRecords filename = do
  metadata <- liftIO $ getMetadata filename
  let recordInfo = metadata ^. metadataRecordInfo

  sourceFileBlocks filename metadata $= blocksToRecords recordInfo

blockRanges :: YxdbMetadata -> BlockRanges
blockRanges metadata =
  let blockIndices =
        Prelude.map fromIntegral $
        A.elems $
        NT.unpack $
        metadata ^. metadataBlockIndex
      blockEnd = fromIntegral $ metadata ^. metadataHeader ^. recordBlockIndexPos
      ranges = Prelude.zip blockIndices (Prelude.tail $ blockIndices ++ return blockEnd)
  in ranges

sourceBlock :: (MonadResource m) => FilePath -> BlockRange -> Source m Block
sourceBlock  filepath (from, to) = do
  let numBytes = fromIntegral $ to - from
  rawBlock <- readRange filepath (Just from) (Just numBytes)
  let block = runGet (label ("yieldBlock: " ++ show (from, numBytes)) get) $
              BSL.fromStrict rawBlock :: Block
  yield block

sourceBlocks :: (MonadResource m) => FilePath -> BlockRanges -> Source m Block
sourceBlocks filepath ranges = forM_ ranges $ sourceBlock filepath

sourceFileBlocks :: (MonadResource m) => FilePath -> YxdbMetadata -> Source m Block
sourceFileBlocks filepath = sourceBlocks filepath . blockRanges

blocksToRecords :: (MonadThrow m) => RecordInfo -> Conduit Block m Record
blocksToRecords recordInfo =
  blocksToDecompressedBytes =$=
  conduitGet (getRecord recordInfo)

blocksToDecompressedBytes :: (MonadThrow m) => Conduit Block m BS.ByteString
blocksToDecompressedBytes = CC.concatMap (BSL.toChunks . NT.unpack)

type StatefulConduit a m b = Conduit a (State.StateT StreamingCSVStatistics m) b

recordsToBlocks :: (MonadThrow m) => RecordInfo -> StatefulConduit Record m Block
recordsToBlocks recordInfo = do
  allRecords <- CC.sinkList
  let records = Prelude.take recordsPerBlock allRecords
  if Prelude.null records
     then return ()
     else do
       let numRecords = Prelude.length records
       lift $ State.modify (& statisticsNumRecords %~ (+numRecords))
       let buildOneRecord :: Record -> BSB.Builder
           buildOneRecord record =
               let recordBSL = runPut $ putRecord recordInfo record
               in lazyByteStringThreshold miniblockThreshold recordBSL
       yield $ Block $ toLazyByteString $ mconcat $ Prelude.map buildOneRecord records
       recordsToBlocks recordInfo


blocksToYxdbBytes :: (MonadThrow m) => RecordInfo -> StatefulConduit Block m BS.ByteString
blocksToYxdbBytes recordInfo = do
  -- We fill the header with padding since we don't know enough to fill it in yet
  let headerBS = BS.replicate headerPageSize 0
  yield headerBS
  let metadataBS = BSL.toStrict $ runPut (put recordInfo)
  lift $ State.modify (& statisticsMetadataLength .~ BS.length metadataBS)
  yield metadataBS
  let putBlockWithIndex :: (MonadThrow m) => StatefulConduit Block m BS.ByteString
      putBlockWithIndex = do
        mBlock <- await
        case mBlock of
          Nothing -> return ()
          Just block -> do
            let bs = BSL.toStrict $ runPut $ put block
                bsLen = BS.length bs
            lift $ State.modify (& statisticsBlockLengths %~ (bsLen:))
            yield bs
            putBlockWithIndex
  putBlockWithIndex

computeHeaderFromStatistics :: StreamingCSVStatistics -> IO Header
computeHeaderFromStatistics statistics = do
    currentTime <- getCurrentTime
    return $ Header {
      _description         = "Alteryx database file generated by Michael Burge's 'yxdb-utils'",
      _fileId              = dbFileId WrigleyDb,
      _creationDate        = currentTime,
      _flags1              = 0,
      _flags2              = 0,
      _metaInfoLength      = fromIntegral $ (statistics ^. statisticsMetadataLength) `div` 2,
      _mystery             = 0,
      _spatialIndexPos     = 0,
      _recordBlockIndexPos = fromIntegral $ computeBlockIndexPositionFromStatistics statistics,
      _numRecords          = fromIntegral $ statistics ^. statisticsNumRecords,
      _compressionVersion  = 1,
      _reservedSpace       = BS.empty
    }

computeStartOfBlocksFromStatistics :: StreamingCSVStatistics -> Int
computeStartOfBlocksFromStatistics statistics =
    headerPageSize + (statistics ^. statisticsMetadataLength)

computeBlockIndexPositionFromStatistics :: StreamingCSVStatistics -> Integer
computeBlockIndexPositionFromStatistics statistics =
    let totalBytes    = Prelude.sum $ statistics ^. statisticsBlockLengths
        startOfBlocks = computeStartOfBlocksFromStatistics statistics
    in fromIntegral $ startOfBlocks + totalBytes

computeBlockIndexFromStatistics :: StreamingCSVStatistics -> BlockIndex
computeBlockIndexFromStatistics statistics =
    let startOfBlocks = computeStartOfBlocksFromStatistics statistics
        orderedBlockLengths = Prelude.reverse $ statistics ^. statisticsBlockLengths
        numBlocks = Prelude.length orderedBlockLengths
    in BlockIndex $
       listArray (0, numBlocks-1) $
       Prelude.map (fromIntegral . (startOfBlocks+)) $
       Prelude.init $
       Prelude.scanl (+) 0 orderedBlockLengths

toBS :: Binary a => a -> BS.ByteString
toBS = BSL.toStrict . runPut . put

sinkYxdbBytes :: (MonadThrow m, MonadIO m) => Handle -> Sink BS.ByteString (State.StateT StreamingCSVStatistics m) ()
sinkYxdbBytes handle = do
  CC.sinkHandle handle
  statistics <- lift State.get
  header <- liftIO $ computeHeaderFromStatistics statistics
  let blockIndex   = computeBlockIndexFromStatistics statistics
      headerBS     = toBS header
      blockIndexBS = toBS blockIndex
  liftIO $ do
    hSeek handle AbsoluteSeek 0
    BS.hPut handle headerBS
    hSeek handle SeekFromEnd 0
    BS.hPut handle blockIndexBS

sinkRecords :: (MonadThrow m, MonadIO m) => Handle -> RecordInfo -> Sink Record m ()
sinkRecords handle recordInfo =
    evalStateLC defaultStatistics $
      recordsToBlocks recordInfo =$=
      blocksToYxdbBytes recordInfo =$
      sinkYxdbBytes handle