--
-- MinIO Haskell SDK, (C) 2017-2019 MinIO, Inc.
--
-- Licensed under the Apache License, Version 2.0 (the "License");
-- you may not use this file except in compliance with the License.
-- You may obtain a copy of the License at
--
--     http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--

module Network.Minio.SelectAPI
  ( -- | The `selectObjectContent` allows querying CSV, JSON or Parquet
    -- format objects in AWS S3 and in MinIO using SQL Select
    -- statements. This allows significant reduction of data transfer
    -- from object storage for computation-intensive tasks, as relevant
    -- data is filtered close to the storage.
    selectObjectContent,
    SelectRequest,
    selectRequest,

    -- *** Input Serialization
    InputSerialization,
    defaultCsvInput,
    linesJsonInput,
    documentJsonInput,
    defaultParquetInput,
    setInputCSVProps,
    CompressionType (..),
    setInputCompressionType,

    -- *** CSV Format details

    -- | CSV format options such as delimiters and quote characters are
    -- specified using using the functions below. Options are combined
    -- monoidally.
    CSVProp,
    recordDelimiter,
    fieldDelimiter,
    quoteCharacter,
    quoteEscapeCharacter,
    commentCharacter,
    allowQuotedRecordDelimiter,
    FileHeaderInfo (..),
    fileHeaderInfo,
    QuoteFields (..),
    quoteFields,

    -- *** Output Serialization
    OutputSerialization,
    defaultCsvOutput,
    defaultJsonOutput,
    outputCSVFromProps,
    outputJSONFromRecordDelimiter,

    -- *** Progress messages
    setRequestProgressEnabled,

    -- *** Interpreting Select output

    -- | The conduit returned by `selectObjectContent` returns values of
    -- the `EventMessage` data type. This returns the query output
    -- messages formatted according to the chosen output serialization,
    -- interleaved with progress messages (if enabled by
    -- `setRequestProgressEnabled`), and at the end a statistics
    -- message.
    --
    -- If the application is interested in only the payload, then
    -- `getPayloadBytes` can be used. For example to simply print the
    -- payload to stdout:
    --
    -- > resultConduit <- selectObjectContent bucket object mySelectRequest
    -- > runConduit $ resultConduit .| getPayloadBytes .| stdoutC
    --
    -- Note that runConduit, the connect operator (.|) and stdoutC are
    -- all from the "conduit" package.
    getPayloadBytes,
    EventMessage (..),
    Progress (..),
    Stats,
  )
where

import Conduit ((.|))
import qualified Conduit as C
import qualified Data.Binary as Bin
import qualified Data.ByteString as B
import qualified Data.ByteString.Lazy as LB
import Data.Digest.CRC32 (crc32, crc32Update)
import Lib.Prelude
import qualified Network.HTTP.Conduit as NC
import qualified Network.HTTP.Types as HT
import Network.Minio.API
import Network.Minio.Data
import Network.Minio.Errors
import Network.Minio.Utils
import Network.Minio.XmlGenerator
import Network.Minio.XmlParser
import UnliftIO (MonadUnliftIO)

data EventStreamException
  = ESEPreludeCRCFailed
  | ESEMessageCRCFailed
  | ESEUnexpectedEndOfStream
  | ESEDecodeFail [Char]
  | ESEInvalidHeaderType
  | ESEInvalidHeaderValueType
  | ESEInvalidMessageType
  deriving stock (EventStreamException -> EventStreamException -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: EventStreamException -> EventStreamException -> Bool
$c/= :: EventStreamException -> EventStreamException -> Bool
== :: EventStreamException -> EventStreamException -> Bool
$c== :: EventStreamException -> EventStreamException -> Bool
Eq, Int -> EventStreamException -> ShowS
[EventStreamException] -> ShowS
EventStreamException -> [Char]
forall a.
(Int -> a -> ShowS) -> (a -> [Char]) -> ([a] -> ShowS) -> Show a
showList :: [EventStreamException] -> ShowS
$cshowList :: [EventStreamException] -> ShowS
show :: EventStreamException -> [Char]
$cshow :: EventStreamException -> [Char]
showsPrec :: Int -> EventStreamException -> ShowS
$cshowsPrec :: Int -> EventStreamException -> ShowS
Show)

instance Exception EventStreamException

-- chunkSize in bytes is 32KiB
chunkSize :: Int
chunkSize :: Int
chunkSize = Int
32 forall a. Num a => a -> a -> a
* Int
1024

parseBinary :: (Bin.Binary a) => ByteString -> IO a
parseBinary :: forall a. Binary a => ByteString -> IO a
parseBinary ByteString
b = do
  case forall a.
Binary a =>
ByteString
-> Either
     (ByteString, ByteOffset, [Char]) (ByteString, ByteOffset, a)
Bin.decodeOrFail forall a b. (a -> b) -> a -> b
$ ByteString -> ByteString
LB.fromStrict ByteString
b of
    Left (ByteString
_, ByteOffset
_, [Char]
msg) -> forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO forall a b. (a -> b) -> a -> b
$ [Char] -> EventStreamException
ESEDecodeFail [Char]
msg
    Right (ByteString
_, ByteOffset
_, a
r) -> forall (m :: * -> *) a. Monad m => a -> m a
return a
r

bytesToHeaderName :: Text -> IO MsgHeaderName
bytesToHeaderName :: Text -> IO MsgHeaderName
bytesToHeaderName Text
t = case Text
t of
  Text
":message-type" -> forall (m :: * -> *) a. Monad m => a -> m a
return MsgHeaderName
MessageType
  Text
":event-type" -> forall (m :: * -> *) a. Monad m => a -> m a
return MsgHeaderName
EventType
  Text
":content-type" -> forall (m :: * -> *) a. Monad m => a -> m a
return MsgHeaderName
ContentType
  Text
":error-code" -> forall (m :: * -> *) a. Monad m => a -> m a
return MsgHeaderName
ErrorCode
  Text
":error-message" -> forall (m :: * -> *) a. Monad m => a -> m a
return MsgHeaderName
ErrorMessage
  Text
_ -> forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO EventStreamException
ESEInvalidHeaderType

parseHeaders ::
  (MonadUnliftIO m) =>
  Word32 ->
  C.ConduitM ByteString a m [MessageHeader]
parseHeaders :: forall (m :: * -> *) a.
MonadUnliftIO m =>
Word32 -> ConduitM ByteString a m [MessageHeader]
parseHeaders Word32
0 = forall (m :: * -> *) a. Monad m => a -> m a
return []
parseHeaders Word32
hdrLen = do
  ByteString
bs1 <- forall (m :: * -> *) a.
MonadUnliftIO m =>
Int -> ConduitM ByteString a m ByteString
readNBytes Int
1
  Word8
n :: Word8 <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. Binary a => ByteString -> IO a
parseBinary ByteString
bs1

  ByteString
headerKeyBytes <- forall (m :: * -> *) a.
MonadUnliftIO m =>
Int -> ConduitM ByteString a m ByteString
readNBytes forall a b. (a -> b) -> a -> b
$ forall a b. (Integral a, Num b) => a -> b
fromIntegral Word8
n
  let headerKey :: Text
headerKey = ByteString -> Text
decodeUtf8Lenient ByteString
headerKeyBytes
  MsgHeaderName
headerName <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ Text -> IO MsgHeaderName
bytesToHeaderName Text
headerKey

  ByteString
bs2 <- forall (m :: * -> *) a.
MonadUnliftIO m =>
Int -> ConduitM ByteString a m ByteString
readNBytes Int
1
  Word8
headerValueType :: Word8 <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. Binary a => ByteString -> IO a
parseBinary ByteString
bs2
  forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Word8
headerValueType forall a. Eq a => a -> a -> Bool
/= Word8
7) forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO EventStreamException
ESEInvalidHeaderValueType

  ByteString
bs3 <- forall (m :: * -> *) a.
MonadUnliftIO m =>
Int -> ConduitM ByteString a m ByteString
readNBytes Int
2
  Word16
vLen :: Word16 <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. Binary a => ByteString -> IO a
parseBinary ByteString
bs3
  ByteString
headerValueBytes <- forall (m :: * -> *) a.
MonadUnliftIO m =>
Int -> ConduitM ByteString a m ByteString
readNBytes forall a b. (a -> b) -> a -> b
$ forall a b. (Integral a, Num b) => a -> b
fromIntegral Word16
vLen
  let headerValue :: Text
headerValue = ByteString -> Text
decodeUtf8Lenient ByteString
headerValueBytes
      m :: MessageHeader
m = (MsgHeaderName
headerName, Text
headerValue)
      k :: Word32
k = Word32
1 forall a. Num a => a -> a -> a
+ forall a b. (Integral a, Num b) => a -> b
fromIntegral Word8
n forall a. Num a => a -> a -> a
+ Word32
1 forall a. Num a => a -> a -> a
+ Word32
2 forall a. Num a => a -> a -> a
+ forall a b. (Integral a, Num b) => a -> b
fromIntegral Word16
vLen

  [MessageHeader]
ms <- forall (m :: * -> *) a.
MonadUnliftIO m =>
Word32 -> ConduitM ByteString a m [MessageHeader]
parseHeaders (Word32
hdrLen forall a. Num a => a -> a -> a
- Word32
k)
  forall (m :: * -> *) a. Monad m => a -> m a
return (MessageHeader
m forall a. a -> [a] -> [a]
: [MessageHeader]
ms)

-- readNBytes returns N bytes read from the string and throws an
-- exception if N bytes are not present on the stream.
readNBytes :: (MonadUnliftIO m) => Int -> C.ConduitM ByteString a m ByteString
readNBytes :: forall (m :: * -> *) a.
MonadUnliftIO m =>
Int -> ConduitM ByteString a m ByteString
readNBytes Int
n = do
  ByteString
b <- ByteString -> ByteString
LB.toStrict forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (forall (m :: * -> *) seq.
(Monad m, IsSequence seq) =>
Index seq -> ConduitT seq seq m ()
C.takeCE Int
n forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.| forall (m :: * -> *) lazy strict o.
(Monad m, LazySequence lazy strict) =>
ConduitT strict o m lazy
C.sinkLazy)
  if ByteString -> Int
B.length ByteString
b forall a. Eq a => a -> a -> Bool
/= Int
n
    then forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO EventStreamException
ESEUnexpectedEndOfStream
    else forall (m :: * -> *) a. Monad m => a -> m a
return ByteString
b

crcCheck ::
  (MonadUnliftIO m) =>
  C.ConduitM ByteString ByteString m ()
crcCheck :: forall (m :: * -> *).
MonadUnliftIO m =>
ConduitM ByteString ByteString m ()
crcCheck = do
  ByteString
b <- forall (m :: * -> *) a.
MonadUnliftIO m =>
Int -> ConduitM ByteString a m ByteString
readNBytes Int
12
  Word32
n :: Word32 <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. Binary a => ByteString -> IO a
parseBinary forall a b. (a -> b) -> a -> b
$ Int -> ByteString -> ByteString
B.take Int
4 ByteString
b
  Word32
preludeCRC :: Word32 <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. Binary a => ByteString -> IO a
parseBinary forall a b. (a -> b) -> a -> b
$ Int -> ByteString -> ByteString
B.drop Int
8 ByteString
b
  forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (forall a. CRC32 a => a -> Word32
crc32 (Int -> ByteString -> ByteString
B.take Int
8 ByteString
b) forall a. Eq a => a -> a -> Bool
/= Word32
preludeCRC) forall a b. (a -> b) -> a -> b
$
    forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO EventStreamException
ESEPreludeCRCFailed

  -- we do not yield the checksum
  forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
C.yield forall a b. (a -> b) -> a -> b
$ Int -> ByteString -> ByteString
B.take Int
8 ByteString
b

  -- 12 bytes have been read off the current message. Now read the
  -- next (n-12)-4 bytes and accumulate the checksum, and yield it.
  let startCrc :: Word32
startCrc = forall a. CRC32 a => a -> Word32
crc32 ByteString
b
  Word32
finalCrc <- forall {m :: * -> *}.
MonadUnliftIO m =>
Int -> Word32 -> ConduitT ByteString ByteString m Word32
accumulateYield (forall a b. (Integral a, Num b) => a -> b
fromIntegral Word32
n forall a. Num a => a -> a -> a
- Int
16) Word32
startCrc

  ByteString
bs <- forall (m :: * -> *) a.
MonadUnliftIO m =>
Int -> ConduitM ByteString a m ByteString
readNBytes Int
4
  Word32
expectedCrc :: Word32 <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. Binary a => ByteString -> IO a
parseBinary ByteString
bs

  forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Word32
finalCrc forall a. Eq a => a -> a -> Bool
/= Word32
expectedCrc) forall a b. (a -> b) -> a -> b
$
    forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO EventStreamException
ESEMessageCRCFailed

  -- we unconditionally recurse - downstream figures out when to
  -- quit reading the stream
  forall (m :: * -> *).
MonadUnliftIO m =>
ConduitM ByteString ByteString m ()
crcCheck
  where
    accumulateYield :: Int -> Word32 -> ConduitT ByteString ByteString m Word32
accumulateYield Int
n Word32
checkSum = do
      let toRead :: Int
toRead = forall a. Ord a => a -> a -> a
min Int
n Int
chunkSize
      ByteString
b <- forall (m :: * -> *) a.
MonadUnliftIO m =>
Int -> ConduitM ByteString a m ByteString
readNBytes Int
toRead
      let c' :: Word32
c' = forall a. CRC32 a => Word32 -> a -> Word32
crc32Update Word32
checkSum ByteString
b
          n' :: Int
n' = Int
n forall a. Num a => a -> a -> a
- ByteString -> Int
B.length ByteString
b
      forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
C.yield ByteString
b
      if Int
n' forall a. Ord a => a -> a -> Bool
> Int
0
        then Int -> Word32 -> ConduitT ByteString ByteString m Word32
accumulateYield Int
n' Word32
c'
        else forall (m :: * -> *) a. Monad m => a -> m a
return Word32
c'

handleMessage :: (MonadUnliftIO m) => C.ConduitT ByteString EventMessage m ()
handleMessage :: forall (m :: * -> *).
MonadUnliftIO m =>
ConduitT ByteString EventMessage m ()
handleMessage = do
  ByteString
b1 <- forall (m :: * -> *) a.
MonadUnliftIO m =>
Int -> ConduitM ByteString a m ByteString
readNBytes Int
4
  Word32
msgLen :: Word32 <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. Binary a => ByteString -> IO a
parseBinary ByteString
b1

  ByteString
b2 <- forall (m :: * -> *) a.
MonadUnliftIO m =>
Int -> ConduitM ByteString a m ByteString
readNBytes Int
4
  Word32
hdrLen :: Word32 <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. Binary a => ByteString -> IO a
parseBinary ByteString
b2

  [MessageHeader]
hs <- forall (m :: * -> *) a.
MonadUnliftIO m =>
Word32 -> ConduitM ByteString a m [MessageHeader]
parseHeaders Word32
hdrLen

  let payloadLen :: Word32
payloadLen = Word32
msgLen forall a. Num a => a -> a -> a
- Word32
hdrLen forall a. Num a => a -> a -> a
- Word32
16
      getHdrVal :: b -> t (b, b) -> Maybe b
getHdrVal b
h = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall a b. (a, b) -> b
snd forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Maybe a
find ((b
h forall a. Eq a => a -> a -> Bool
==) forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a b. (a, b) -> a
fst)
      eventHdrValue :: Maybe Text
eventHdrValue = forall {t :: * -> *} {b} {b}.
(Foldable t, Eq b) =>
b -> t (b, b) -> Maybe b
getHdrVal MsgHeaderName
EventType [MessageHeader]
hs
      msgHdrValue :: Maybe Text
msgHdrValue = forall {t :: * -> *} {b} {b}.
(Foldable t, Eq b) =>
b -> t (b, b) -> Maybe b
getHdrVal MsgHeaderName
MessageType [MessageHeader]
hs
      errCode :: Maybe Text
errCode = forall {t :: * -> *} {b} {b}.
(Foldable t, Eq b) =>
b -> t (b, b) -> Maybe b
getHdrVal MsgHeaderName
ErrorCode [MessageHeader]
hs
      errMsg :: Maybe Text
errMsg = forall {t :: * -> *} {b} {b}.
(Foldable t, Eq b) =>
b -> t (b, b) -> Maybe b
getHdrVal MsgHeaderName
ErrorMessage [MessageHeader]
hs

  case Maybe Text
msgHdrValue of
    Just Text
"event" -> do
      case Maybe Text
eventHdrValue of
        Just Text
"Records" -> forall {m :: * -> *}.
MonadUnliftIO m =>
Int -> ConduitT ByteString EventMessage m ()
passThrough forall a b. (a -> b) -> a -> b
$ forall a b. (Integral a, Num b) => a -> b
fromIntegral Word32
payloadLen
        Just Text
"Cont" -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
        Just Text
"Progress" -> do
          ByteString
bs <- forall (m :: * -> *) a.
MonadUnliftIO m =>
Int -> ConduitM ByteString a m ByteString
readNBytes forall a b. (a -> b) -> a -> b
$ forall a b. (Integral a, Num b) => a -> b
fromIntegral Word32
payloadLen
          Progress
progress <- forall (m :: * -> *). MonadIO m => ByteString -> m Progress
parseSelectProgress ByteString
bs
          forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
C.yield forall a b. (a -> b) -> a -> b
$ Progress -> EventMessage
ProgressEventMessage Progress
progress
        Just Text
"Stats" -> do
          ByteString
bs <- forall (m :: * -> *) a.
MonadUnliftIO m =>
Int -> ConduitM ByteString a m ByteString
readNBytes forall a b. (a -> b) -> a -> b
$ forall a b. (Integral a, Num b) => a -> b
fromIntegral Word32
payloadLen
          Progress
stats <- forall (m :: * -> *). MonadIO m => ByteString -> m Progress
parseSelectProgress ByteString
bs
          forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
C.yield forall a b. (a -> b) -> a -> b
$ Progress -> EventMessage
StatsEventMessage Progress
stats
        Just Text
"End" -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
        Maybe Text
_ -> forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO EventStreamException
ESEInvalidMessageType
      forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Maybe Text
eventHdrValue forall a. Eq a => a -> a -> Bool
/= forall a. a -> Maybe a
Just Text
"End") forall (m :: * -> *).
MonadUnliftIO m =>
ConduitT ByteString EventMessage m ()
handleMessage
    Just Text
"error" -> do
      let reqMsgMay :: Maybe EventMessage
reqMsgMay = Text -> Text -> EventMessage
RequestLevelErrorMessage forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe Text
errCode forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Maybe Text
errMsg
      forall b a. b -> (a -> b) -> Maybe a -> b
maybe (forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO EventStreamException
ESEInvalidMessageType) forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
C.yield Maybe EventMessage
reqMsgMay
    Maybe Text
_ -> forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO EventStreamException
ESEInvalidMessageType
  where
    passThrough :: Int -> ConduitT ByteString EventMessage m ()
passThrough Int
0 = forall (m :: * -> *) a. Monad m => a -> m a
return ()
    passThrough Int
n = do
      let c :: Int
c = forall a. Ord a => a -> a -> a
min Int
n Int
chunkSize
      ByteString
b <- forall (m :: * -> *) a.
MonadUnliftIO m =>
Int -> ConduitM ByteString a m ByteString
readNBytes Int
c
      forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
C.yield forall a b. (a -> b) -> a -> b
$ ByteString -> EventMessage
RecordPayloadEventMessage ByteString
b
      Int -> ConduitT ByteString EventMessage m ()
passThrough forall a b. (a -> b) -> a -> b
$ Int
n forall a. Num a => a -> a -> a
- ByteString -> Int
B.length ByteString
b

selectProtoConduit ::
  (MonadUnliftIO m) =>
  C.ConduitT ByteString EventMessage m ()
selectProtoConduit :: forall (m :: * -> *).
MonadUnliftIO m =>
ConduitT ByteString EventMessage m ()
selectProtoConduit = forall (m :: * -> *).
MonadUnliftIO m =>
ConduitM ByteString ByteString m ()
crcCheck forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.| forall (m :: * -> *).
MonadUnliftIO m =>
ConduitT ByteString EventMessage m ()
handleMessage

-- | selectObjectContent calls the SelectRequest on the given
-- object. It returns a Conduit of event messages that can be consumed
-- by the client.
selectObjectContent ::
  Bucket ->
  Object ->
  SelectRequest ->
  Minio (C.ConduitT () EventMessage Minio ())
selectObjectContent :: Text
-> Text
-> SelectRequest
-> Minio (ConduitT () EventMessage Minio ())
selectObjectContent Text
b Text
o SelectRequest
r = do
  let reqInfo :: S3ReqInfo
reqInfo =
        S3ReqInfo
defaultS3ReqInfo
          { riMethod :: ByteString
riMethod = ByteString
HT.methodPost,
            riBucket :: Maybe Text
riBucket = forall a. a -> Maybe a
Just Text
b,
            riObject :: Maybe Text
riObject = forall a. a -> Maybe a
Just Text
o,
            riPayload :: Payload
riPayload = ByteString -> Payload
PayloadBS forall a b. (a -> b) -> a -> b
$ SelectRequest -> ByteString
mkSelectRequest SelectRequest
r,
            riNeedsLocation :: Bool
riNeedsLocation = Bool
False,
            riQueryParams :: Query
riQueryParams = [(ByteString
"select", forall a. Maybe a
Nothing), (ByteString
"select-type", forall a. a -> Maybe a
Just ByteString
"2")]
          }
  -- print $ mkSelectRequest r
  Response (ConduitM () ByteString Minio ())
resp <- S3ReqInfo -> Minio (Response (ConduitM () ByteString Minio ()))
mkStreamRequest S3ReqInfo
reqInfo
  forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall body. Response body -> body
NC.responseBody Response (ConduitM () ByteString Minio ())
resp forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.| forall (m :: * -> *).
MonadUnliftIO m =>
ConduitT ByteString EventMessage m ()
selectProtoConduit

-- | A helper conduit that returns only the record payload bytes.
getPayloadBytes :: (MonadIO m) => C.ConduitT EventMessage ByteString m ()
getPayloadBytes :: forall (m :: * -> *).
MonadIO m =>
ConduitT EventMessage ByteString m ()
getPayloadBytes = do
  Maybe EventMessage
evM <- forall (m :: * -> *) i o. Monad m => ConduitT i o m (Maybe i)
C.await
  case Maybe EventMessage
evM of
    Just EventMessage
v -> do
      case EventMessage
v of
        RecordPayloadEventMessage ByteString
b -> forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
C.yield ByteString
b
        RequestLevelErrorMessage Text
c Text
m -> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO forall a b. (a -> b) -> a -> b
$ Text -> Text -> ServiceErr
SelectErr Text
c Text
m
        EventMessage
_ -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
      forall (m :: * -> *).
MonadIO m =>
ConduitT EventMessage ByteString m ()
getPayloadBytes
    Maybe EventMessage
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return ()