{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE TypeApplications #-}

module Parquet.Reader where

import Data.Foldable (traverse_)
import qualified Data.Conduit.List as CL
import qualified Data.Conduit.Binary as CB
import Control.Monad.Logger (MonadLogger, runNoLoggingT)
import Control.Monad.Logger.CallStack (logWarn)
import Data.Functor ((<$))
import Parquet.Stream.Reader
  (Value(..), readColumnChunk, ColumnValue(..), decodeConduit)
import Control.Lens
import qualified Data.Map as M
import Control.Arrow ((&&&))
import qualified Data.Text as T
import qualified Data.HashMap.Strict as HM
import Control.Monad.Except
import qualified Data.List.NonEmpty as NE

import qualified Data.ByteString as BS
import qualified Data.ByteString.Char8 as BS8
import qualified Conduit as C
import System.IO
  (IOMode(ReadMode), openFile, SeekMode(AbsoluteSeek, SeekFromEnd), hSeek)
import Network.HTTP.Types.Status (statusIsSuccessful)
import Network.HTTP.Simple
  (getResponseBody, getResponseStatus, httpSource, parseRequest, Header)
import Network.HTTP.Client (Request(requestHeaders))
import qualified Parquet.ThriftTypes as TT
import Parquet.Utils (failOnExcept)
import qualified Data.Binary.Get as BG
import Parquet.ParquetObject

newtype ParquetSource m = ParquetSource (Integer -> C.ConduitT () BS.ByteString m ())

type Url = String

readMetadata
  :: (MonadError T.Text m, MonadIO m) => ParquetSource m -> m TT.FileMetadata
readMetadata (ParquetSource source) = do
  bs <- C.runConduit (source (-8) C..| CB.take 8)
  case BG.runGetOrFail BG.getWord32le bs of
    Left err -> fail $ "Could not fetch metadata size: " <> show err
    Right (_, _, metadataSize) ->
      fmap (snd . fst)
        $            C.runConduit
        $            source (-(8 + fromIntegral metadataSize))
        C..|         decodeConduit metadataSize
        `C.fuseBoth` pure ()

localParquetFile :: C.MonadResource m => FilePath -> ParquetSource m
localParquetFile fp = ParquetSource $ \pos -> C.sourceIOHandle $ do
  h <- openFile fp ReadMode
  if pos > 0 then hSeek h AbsoluteSeek pos else hSeek h SeekFromEnd pos
  pure h

remoteParquetFile
  :: (C.MonadResource m, C.MonadThrow m, C.MonadIO m) => Url -> ParquetSource m
remoteParquetFile url = ParquetSource $ \pos -> do
  req <- parseRequest url
  let
    rangedReq = req { requestHeaders = mkRangeHeader pos : requestHeaders req }
  httpSource rangedReq call
 where
  mkRangeHeader :: Integer -> Header
  mkRangeHeader pos =
    let rangeVal = if pos > 0 then show pos <> "-" else show pos
    in ("Range", "bytes=" <> BS8.pack rangeVal)

  call req =
    let status = getResponseStatus req
    in
      if statusIsSuccessful status
        then getResponseBody req
        else
          fail
          $  "Non-success response code from remoteParquetFile call: "
          ++ show status

readWholeParquetFile
  :: ( C.MonadThrow m
     , MonadIO m
     , MonadError T.Text m
     , C.MonadResource m
     , MonadLogger m
     )
  => String
  -> m [ParquetObject]
readWholeParquetFile inputFp = do
  metadata <- readMetadata (localParquetFile inputFp)
  C.runConduit
    $    traverse_
           (sourceRowGroup (localParquetFile inputFp) metadata)
           (metadata ^. TT.pinchField @"row_groups")
    C..| CL.consume

type Record = [(ColumnValue, [T.Text])]

sourceParquet :: FilePath -> C.ConduitT () ParquetObject (C.ResourceT IO) ()
sourceParquet fp = runExceptT (readMetadata (localParquetFile fp)) >>= \case
  Left  err      -> fail $ "Could not read metadata: " <> show err
  Right metadata -> C.transPipe runNoLoggingT $ traverse_
    (sourceRowGroup (localParquetFile fp) metadata)
    (metadata ^. TT.pinchField @"row_groups")

sourceRowGroupFromRemoteFile
  :: (C.MonadResource m, C.MonadIO m, C.MonadThrow m, MonadLogger m)
  => String
  -> TT.FileMetadata
  -> TT.RowGroup
  -> C.ConduitT () ParquetObject m ()
sourceRowGroupFromRemoteFile url metadata rg =
  sourceRowGroup (remoteParquetFile url) metadata rg

-- | Streams the values for every column chunk and zips them into records.
--
-- Illustration:
--
-- _____________________
-- | col1 | col2 | col3 |
-- |  1   |   a  |   x  |
-- |  2   |   b  |   y  |
-- |  3   |   c  |   z  |
-- |______|______|______|
--
-- @sourceRowGroup@ yields the following values in a stream:
--
-- (1, a, x)
-- (2, b, y)
-- (3, c, z)
sourceRowGroup
  :: forall m
   . (C.MonadResource m, C.MonadIO m, C.MonadThrow m, MonadLogger m)
  => ParquetSource m
  -> TT.FileMetadata
  -> TT.RowGroup
  -> C.ConduitT () ParquetObject m ()
sourceRowGroup source metadata rg =
  C.sequenceSources
      (map
        (\cc -> sourceColumnChunk source metadata cc
          C..| CL.mapMaybe ((<$> mb_path cc) . (,))
        )
        (rg ^. TT.pinchField @"column_chunks")
      )
    C..| CL.mapMaybeM parse_record
 where
  mb_path :: TT.ColumnChunk -> Maybe [T.Text]
  mb_path cc =
    TT.unField
      .   TT._ColumnMetaData_path_in_schema
      <$> (cc ^. TT.pinchField @"meta_data")

  -- | Given a parquet record (a set of parquet columns, really), converts it to a single JSON-like object.
  --
  -- It does it by traversing the list of columns, converting them to objects and combining them.
  -- Parsing every column yields a ParquetObject and since ParquetObjects are monoids we combine them.
  parse_record :: Record -> m (Maybe ParquetObject)
  parse_record = fmap (fmap mconcat) $ traverse $ \(column, paths) ->
    case NE.nonEmpty paths of
      Nothing -> Nothing <$ logWarn
        (  "parse_record: Record with value "
        <> T.pack (show (_cvValue column))
        <> " does not have any paths. Record data is corrupted."
        )
      Just ne_paths -> parse_column (column, ne_paths)

  -- | Given a parquet column, converts it to a JSON-like object.
  --
  -- For a given column:
  -- { value = "something", definition_level = 3, path = ["field1", "field2", "field3"] }.
  --
  -- We create:
  -- {
  --   "field1": {
  --     "field2": {
  --       "field3": "something
  --     }
  --   }
  -- }
  parse_column :: (ColumnValue, NE.NonEmpty T.Text) -> m (Maybe ParquetObject)
  parse_column (ColumnValue _ 1 _ v, path NE.:| []) =
    pure $ Just $ MkParquetObject $ HM.fromList
      [(path, value_to_parquet_value v)]
  parse_column (ColumnValue _ _ _ _, _ NE.:| []) = do
   -- This case means that we are in the last path element but definition level is not 1.
   -- For example:
   --
  -- { value = "something", definition_level = 5, path = ["field1", "field2", "field3"] }.
   --
   -- And we reached to the following point:
   --
  -- { value = "something", definition_level = 3, path = ["field3"] }.
   --
   -- At this point we should construct the object {"field3": "something"} but this would require
   -- a definition level of 1.
   --
   -- Hence this record is corrupted.
    logWarn
      "parse_column: No more paths exist but we still have definition levels. Column data is corrupted."
    pure Nothing
  parse_column (ColumnValue r d md v, path NE.:| (p : px)) = do
    mb_obj <- parse_column (ColumnValue r (d - 1) md v, p NE.:| px)
    pure $ mb_obj <&> \obj ->
      MkParquetObject $ HM.fromList [(path, ParquetObject obj)]

  value_to_parquet_value :: Value -> ParquetValue
  value_to_parquet_value Null                 = ParquetNull
  value_to_parquet_value (ValueInt64      v ) = ParquetInt v
  value_to_parquet_value (ValueByteString bs) = ParquetString bs

sourceColumnChunk
  :: (C.MonadIO m, C.MonadResource m, C.MonadThrow m, MonadLogger m)
  => ParquetSource m
  -> TT.FileMetadata
  -> TT.ColumnChunk
  -> C.ConduitT () ColumnValue m ()
sourceColumnChunk (ParquetSource source) metadata cc = do
  let
    schema_mapping =
      M.fromList
        $  map ((^. TT.pinchField @"name") &&& id)
        $  metadata
        ^. TT.pinchField @"schema"
  let offset = cc ^. TT.pinchField @"file_offset"
  source (fromIntegral offset)
    C..| C.transPipe failOnExcept (readColumnChunk schema_mapping cc)