--
-- MinIO Haskell SDK, (C) 2017-2019 MinIO, Inc.
--
-- Licensed under the Apache License, Version 2.0 (the "License");
-- you may not use this file except in compliance with the License.
-- You may obtain a copy of the License at
--
--     http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--

module Network.Minio.SelectAPI
  ( -- | The `selectObjectContent` allows querying CSV, JSON or Parquet
    -- format objects in AWS S3 and in MinIO using SQL Select
    -- statements. This allows significant reduction of data transfer
    -- from object storage for computation-intensive tasks, as relevant
    -- data is filtered close to the storage.
    selectObjectContent,
    SelectRequest,
    selectRequest,

    -- *** Input Serialization
    InputSerialization,
    defaultCsvInput,
    linesJsonInput,
    documentJsonInput,
    defaultParquetInput,
    setInputCSVProps,
    CompressionType (..),
    setInputCompressionType,

    -- *** CSV Format details

    -- | CSV format options such as delimiters and quote characters are
    -- specified using using the functions below. Options are combined
    -- monoidally.
    CSVProp,
    recordDelimiter,
    fieldDelimiter,
    quoteCharacter,
    quoteEscapeCharacter,
    commentCharacter,
    allowQuotedRecordDelimiter,
    FileHeaderInfo (..),
    fileHeaderInfo,
    QuoteFields (..),
    quoteFields,

    -- *** Output Serialization
    OutputSerialization,
    defaultCsvOutput,
    defaultJsonOutput,
    outputCSVFromProps,
    outputJSONFromRecordDelimiter,

    -- *** Progress messages
    setRequestProgressEnabled,

    -- *** Interpreting Select output

    -- | The conduit returned by `selectObjectContent` returns values of
    -- the `EventMessage` data type. This returns the query output
    -- messages formatted according to the chosen output serialization,
    -- interleaved with progress messages (if enabled by
    -- `setRequestProgressEnabled`), and at the end a statistics
    -- message.
    --
    -- If the application is interested in only the payload, then
    -- `getPayloadBytes` can be used. For example to simply print the
    -- payload to stdout:
    --
    -- > resultConduit <- selectObjectContent bucket object mySelectRequest
    -- > runConduit $ resultConduit .| getPayloadBytes .| stdoutC
    --
    -- Note that runConduit, the connect operator (.|) and stdoutC are
    -- all from the "conduit" package.
    getPayloadBytes,
    EventMessage (..),
    Progress (..),
    Stats,
  )
where

import Conduit ((.|))
import qualified Conduit as C
import qualified Data.Binary as Bin
import qualified Data.ByteString as B
import qualified Data.ByteString.Lazy as LB
import Data.Digest.CRC32 (crc32, crc32Update)
import Lib.Prelude
import qualified Network.HTTP.Conduit as NC
import qualified Network.HTTP.Types as HT
import Network.Minio.API
import Network.Minio.Data
import Network.Minio.Errors
import Network.Minio.Utils
import Network.Minio.XmlGenerator
import Network.Minio.XmlParser
import UnliftIO (MonadUnliftIO)

data EventStreamException
  = ESEPreludeCRCFailed
  | ESEMessageCRCFailed
  | ESEUnexpectedEndOfStream
  | ESEDecodeFail [Char]
  | ESEInvalidHeaderType
  | ESEInvalidHeaderValueType
  | ESEInvalidMessageType
  deriving (Eq, Show)

instance Exception EventStreamException

-- chunkSize in bytes is 32KiB
chunkSize :: Int
chunkSize = 32 * 1024

parseBinary :: Bin.Binary a => ByteString -> IO a
parseBinary b = do
  case Bin.decodeOrFail $ LB.fromStrict b of
    Left (_, _, msg) -> throwIO $ ESEDecodeFail msg
    Right (_, _, r) -> return r

bytesToHeaderName :: Text -> IO MsgHeaderName
bytesToHeaderName t = case t of
  ":message-type" -> return MessageType
  ":event-type" -> return EventType
  ":content-type" -> return ContentType
  ":error-code" -> return ErrorCode
  ":error-message" -> return ErrorMessage
  _ -> throwIO ESEInvalidHeaderType

parseHeaders ::
  MonadUnliftIO m =>
  Word32 ->
  C.ConduitM ByteString a m [MessageHeader]
parseHeaders 0 = return []
parseHeaders hdrLen = do
  bs1 <- readNBytes 1
  n :: Word8 <- liftIO $ parseBinary bs1

  headerKeyBytes <- readNBytes $ fromIntegral n
  let headerKey = decodeUtf8Lenient headerKeyBytes
  headerName <- liftIO $ bytesToHeaderName headerKey

  bs2 <- readNBytes 1
  headerValueType :: Word8 <- liftIO $ parseBinary bs2
  when (headerValueType /= 7) $ throwIO ESEInvalidHeaderValueType

  bs3 <- readNBytes 2
  vLen :: Word16 <- liftIO $ parseBinary bs3
  headerValueBytes <- readNBytes $ fromIntegral vLen
  let headerValue = decodeUtf8Lenient headerValueBytes
      m = (headerName, headerValue)
      k = 1 + fromIntegral n + 1 + 2 + fromIntegral vLen

  ms <- parseHeaders (hdrLen - k)
  return (m : ms)

-- readNBytes returns N bytes read from the string and throws an
-- exception if N bytes are not present on the stream.
readNBytes :: MonadUnliftIO m => Int -> C.ConduitM ByteString a m ByteString
readNBytes n = do
  b <- LB.toStrict <$> (C.takeCE n .| C.sinkLazy)
  if B.length b /= n
    then throwIO ESEUnexpectedEndOfStream
    else return b

crcCheck ::
  MonadUnliftIO m =>
  C.ConduitM ByteString ByteString m ()
crcCheck = do
  b <- readNBytes 12
  n :: Word32 <- liftIO $ parseBinary $ B.take 4 b
  preludeCRC :: Word32 <- liftIO $ parseBinary $ B.drop 8 b
  when (crc32 (B.take 8 b) /= preludeCRC) $
    throwIO ESEPreludeCRCFailed

  -- we do not yield the checksum
  C.yield $ B.take 8 b

  -- 12 bytes have been read off the current message. Now read the
  -- next (n-12)-4 bytes and accumulate the checksum, and yield it.
  let startCrc = crc32 b
  finalCrc <- accumulateYield (fromIntegral n -16) startCrc

  bs <- readNBytes 4
  expectedCrc :: Word32 <- liftIO $ parseBinary bs

  when (finalCrc /= expectedCrc) $
    throwIO ESEMessageCRCFailed

  -- we unconditionally recurse - downstream figures out when to
  -- quit reading the stream
  crcCheck
  where
    accumulateYield n checkSum = do
      let toRead = min n chunkSize
      b <- readNBytes toRead
      let c' = crc32Update checkSum b
          n' = n - B.length b
      C.yield b
      if n' > 0
        then accumulateYield n' c'
        else return c'

handleMessage :: MonadUnliftIO m => C.ConduitT ByteString EventMessage m ()
handleMessage = do
  b1 <- readNBytes 4
  msgLen :: Word32 <- liftIO $ parseBinary b1

  b2 <- readNBytes 4
  hdrLen :: Word32 <- liftIO $ parseBinary b2

  hs <- parseHeaders hdrLen

  let payloadLen = msgLen - hdrLen - 16
      getHdrVal h = fmap snd . headMay . filter ((h ==) . fst)
      eventHdrValue = getHdrVal EventType hs
      msgHdrValue = getHdrVal MessageType hs
      errCode = getHdrVal ErrorCode hs
      errMsg = getHdrVal ErrorMessage hs

  case msgHdrValue of
    Just "event" -> do
      case eventHdrValue of
        Just "Records" -> passThrough $ fromIntegral payloadLen
        Just "Cont" -> return ()
        Just "Progress" -> do
          bs <- readNBytes $ fromIntegral payloadLen
          progress <- parseSelectProgress bs
          C.yield $ ProgressEventMessage progress
        Just "Stats" -> do
          bs <- readNBytes $ fromIntegral payloadLen
          stats <- parseSelectProgress bs
          C.yield $ StatsEventMessage stats
        Just "End" -> return ()
        _ -> throwIO ESEInvalidMessageType
      when (eventHdrValue /= Just "End") handleMessage
    Just "error" -> do
      let reqMsgMay = RequestLevelErrorMessage <$> errCode <*> errMsg
      maybe (throwIO ESEInvalidMessageType) C.yield reqMsgMay
    _ -> throwIO ESEInvalidMessageType
  where
    passThrough 0 = return ()
    passThrough n = do
      let c = min n chunkSize
      b <- readNBytes c
      C.yield $ RecordPayloadEventMessage b
      passThrough $ n - B.length b

selectProtoConduit ::
  MonadUnliftIO m =>
  C.ConduitT ByteString EventMessage m ()
selectProtoConduit = crcCheck .| handleMessage

-- | selectObjectContent calls the SelectRequest on the given
-- object. It returns a Conduit of event messages that can be consumed
-- by the client.
selectObjectContent ::
  Bucket ->
  Object ->
  SelectRequest ->
  Minio (C.ConduitT () EventMessage Minio ())
selectObjectContent b o r = do
  let reqInfo =
        defaultS3ReqInfo
          { riMethod = HT.methodPost,
            riBucket = Just b,
            riObject = Just o,
            riPayload = PayloadBS $ mkSelectRequest r,
            riNeedsLocation = False,
            riQueryParams = [("select", Nothing), ("select-type", Just "2")]
          }
  --print $ mkSelectRequest r
  resp <- mkStreamRequest reqInfo
  return $ NC.responseBody resp .| selectProtoConduit

-- | A helper conduit that returns only the record payload bytes.
getPayloadBytes :: MonadIO m => C.ConduitT EventMessage ByteString m ()
getPayloadBytes = do
  evM <- C.await
  case evM of
    Just v -> do
      case v of
        RecordPayloadEventMessage b -> C.yield b
        RequestLevelErrorMessage c m -> liftIO $ throwIO $ SelectErr c m
        _ -> return ()
      getPayloadBytes
    Nothing -> return ()