{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE TypeApplications #-}
module Parquet.Reader where
import Data.Foldable (traverse_)
import qualified Data.Conduit.List as CL
import qualified Data.Conduit.Binary as CB
import Control.Monad.Logger (MonadLogger, runNoLoggingT)
import Control.Monad.Logger.CallStack (logWarn)
import Data.Functor ((<$))
import Parquet.Stream.Reader
(Value(..), readColumnChunk, ColumnValue(..), decodeConduit)
import Control.Lens
import qualified Data.Map as M
import Control.Arrow ((&&&))
import qualified Data.Text as T
import qualified Data.HashMap.Strict as HM
import Control.Monad.Except
import qualified Data.List.NonEmpty as NE
import qualified Data.ByteString as BS
import qualified Data.ByteString.Char8 as BS8
import qualified Conduit as C
import System.IO
(IOMode(ReadMode), openFile, SeekMode(AbsoluteSeek, SeekFromEnd), hSeek)
import Network.HTTP.Types.Status (statusIsSuccessful)
import Network.HTTP.Simple
(getResponseBody, getResponseStatus, httpSource, parseRequest, Header)
import Network.HTTP.Client (Request(requestHeaders))
import qualified Parquet.ThriftTypes as TT
import Parquet.Utils (failOnExcept)
import qualified Data.Binary.Get as BG
import Parquet.ParquetObject
newtype ParquetSource m = ParquetSource (Integer -> C.ConduitT () BS.ByteString m ())
type Url = String
readMetadata
:: (MonadError T.Text m, MonadIO m) => ParquetSource m -> m TT.FileMetadata
readMetadata (ParquetSource source) = do
bs <- C.runConduit (source (-8) C..| CB.take 8)
case BG.runGetOrFail BG.getWord32le bs of
Left err -> fail $ "Could not fetch metadata size: " <> show err
Right (_, _, metadataSize) ->
fmap (snd . fst)
$ C.runConduit
$ source (-(8 + fromIntegral metadataSize))
C..| decodeConduit metadataSize
`C.fuseBoth` pure ()
localParquetFile :: C.MonadResource m => FilePath -> ParquetSource m
localParquetFile fp = ParquetSource $ \pos -> C.sourceIOHandle $ do
h <- openFile fp ReadMode
if pos > 0 then hSeek h AbsoluteSeek pos else hSeek h SeekFromEnd pos
pure h
remoteParquetFile
:: (C.MonadResource m, C.MonadThrow m, C.MonadIO m) => Url -> ParquetSource m
remoteParquetFile url = ParquetSource $ \pos -> do
req <- parseRequest url
let
rangedReq = req { requestHeaders = mkRangeHeader pos : requestHeaders req }
httpSource rangedReq call
where
mkRangeHeader :: Integer -> Header
mkRangeHeader pos =
let rangeVal = if pos > 0 then show pos <> "-" else show pos
in ("Range", "bytes=" <> BS8.pack rangeVal)
call req =
let status = getResponseStatus req
in
if statusIsSuccessful status
then getResponseBody req
else
fail
$ "Non-success response code from remoteParquetFile call: "
++ show status
readWholeParquetFile
:: ( C.MonadThrow m
, MonadIO m
, MonadError T.Text m
, C.MonadResource m
, MonadLogger m
)
=> String
-> m [ParquetObject]
readWholeParquetFile inputFp = do
metadata <- readMetadata (localParquetFile inputFp)
C.runConduit
$ traverse_
(sourceRowGroup (localParquetFile inputFp) metadata)
(metadata ^. TT.pinchField @"row_groups")
C..| CL.consume
type Record = [(ColumnValue, [T.Text])]
sourceParquet :: FilePath -> C.ConduitT () ParquetObject (C.ResourceT IO) ()
sourceParquet fp = runExceptT (readMetadata (localParquetFile fp)) >>= \case
Left err -> fail $ "Could not read metadata: " <> show err
Right metadata -> C.transPipe runNoLoggingT $ traverse_
(sourceRowGroup (localParquetFile fp) metadata)
(metadata ^. TT.pinchField @"row_groups")
sourceRowGroupFromRemoteFile
:: (C.MonadResource m, C.MonadIO m, C.MonadThrow m, MonadLogger m)
=> String
-> TT.FileMetadata
-> TT.RowGroup
-> C.ConduitT () ParquetObject m ()
sourceRowGroupFromRemoteFile url metadata rg =
sourceRowGroup (remoteParquetFile url) metadata rg
sourceRowGroup
:: forall m
. (C.MonadResource m, C.MonadIO m, C.MonadThrow m, MonadLogger m)
=> ParquetSource m
-> TT.FileMetadata
-> TT.RowGroup
-> C.ConduitT () ParquetObject m ()
sourceRowGroup source metadata rg =
C.sequenceSources
(map
(\cc -> sourceColumnChunk source metadata cc
C..| CL.mapMaybe ((<$> mb_path cc) . (,))
)
(rg ^. TT.pinchField @"column_chunks")
)
C..| CL.mapMaybeM parse_record
where
mb_path :: TT.ColumnChunk -> Maybe [T.Text]
mb_path cc =
TT.unField
. TT._ColumnMetaData_path_in_schema
<$> (cc ^. TT.pinchField @"meta_data")
parse_record :: Record -> m (Maybe ParquetObject)
parse_record = fmap (fmap mconcat) $ traverse $ \(column, paths) ->
case NE.nonEmpty paths of
Nothing -> Nothing <$ logWarn
( "parse_record: Record with value "
<> T.pack (show (_cvValue column))
<> " does not have any paths. Record data is corrupted."
)
Just ne_paths -> parse_column (column, ne_paths)
parse_column :: (ColumnValue, NE.NonEmpty T.Text) -> m (Maybe ParquetObject)
parse_column (ColumnValue _ 1 _ v, path NE.:| []) =
pure $ Just $ MkParquetObject $ HM.fromList
[(path, value_to_parquet_value v)]
parse_column (ColumnValue _ _ _ _, _ NE.:| []) = do
logWarn
"parse_column: No more paths exist but we still have definition levels. Column data is corrupted."
pure Nothing
parse_column (ColumnValue r d md v, path NE.:| (p : px)) = do
mb_obj <- parse_column (ColumnValue r (d - 1) md v, p NE.:| px)
pure $ mb_obj <&> \obj ->
MkParquetObject $ HM.fromList [(path, ParquetObject obj)]
value_to_parquet_value :: Value -> ParquetValue
value_to_parquet_value Null = ParquetNull
value_to_parquet_value (ValueInt64 v ) = ParquetInt v
value_to_parquet_value (ValueByteString bs) = ParquetString bs
sourceColumnChunk
:: (C.MonadIO m, C.MonadResource m, C.MonadThrow m, MonadLogger m)
=> ParquetSource m
-> TT.FileMetadata
-> TT.ColumnChunk
-> C.ConduitT () ColumnValue m ()
sourceColumnChunk (ParquetSource source) metadata cc = do
let
schema_mapping =
M.fromList
$ map ((^. TT.pinchField @"name") &&& id)
$ metadata
^. TT.pinchField @"schema"
let offset = cc ^. TT.pinchField @"file_offset"
source (fromIntegral offset)
C..| C.transPipe failOnExcept (readColumnChunk schema_mapping cc)