-- -- MinIO Haskell SDK, (C) 2017-2019 MinIO, Inc. -- -- Licensed under the Apache License, Version 2.0 (the "License"); -- you may not use this file except in compliance with the License. -- You may obtain a copy of the License at -- -- http://www.apache.org/licenses/LICENSE-2.0 -- -- Unless required by applicable law or agreed to in writing, software -- distributed under the License is distributed on an "AS IS" BASIS, -- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -- See the License for the specific language governing permissions and -- limitations under the License. -- module Network.Minio.SelectAPI ( -- | The `selectObjectContent` allows querying CSV, JSON or Parquet -- format objects in AWS S3 and in MinIO using SQL Select -- statements. This allows significant reduction of data transfer -- from object storage for computation-intensive tasks, as relevant -- data is filtered close to the storage. selectObjectContent , SelectRequest , selectRequest -- *** Input Serialization , InputSerialization , defaultCsvInput , linesJsonInput , documentJsonInput , defaultParquetInput , setInputCSVProps , CompressionType(..) , setInputCompressionType -- *** CSV Format details -- | CSV format options such as delimiters and quote characters are -- specified using using the functions below. Options are combined -- monoidally. , CSVProp , recordDelimiter , fieldDelimiter , quoteCharacter , quoteEscapeCharacter , commentCharacter , allowQuotedRecordDelimiter , FileHeaderInfo(..) , fileHeaderInfo , QuoteFields(..) , quoteFields -- *** Output Serialization , OutputSerialization , defaultCsvOutput , defaultJsonOutput , outputCSVFromProps , outputJSONFromRecordDelimiter -- *** Progress messages , setRequestProgressEnabled -- *** Interpreting Select output -- | The conduit returned by `selectObjectContent` returns values of -- the `EventMessage` data type. This returns the query output -- messages formatted according to the chosen output serialization, -- interleaved with progress messages (if enabled by -- `setRequestProgressEnabled`), and at the end a statistics -- message. -- -- If the application is interested in only the payload, then -- `getPayloadBytes` can be used. For example to simply print the -- payload to stdout: -- -- > resultConduit <- selectObjectContent bucket object mySelectRequest -- > runConduit $ resultConduit .| getPayloadBytes .| stdoutC -- -- Note that runConduit, the connect operator (.|) and stdoutC are -- all from the "conduit" package. , getPayloadBytes , EventMessage(..) , Progress(..) , Stats ) where import Conduit ((.|)) import qualified Conduit as C import qualified Data.Binary as Bin import qualified Data.ByteString as B import qualified Data.ByteString.Lazy as LB import Data.Digest.CRC32 (crc32, crc32Update) import qualified Network.HTTP.Conduit as NC import qualified Network.HTTP.Types as HT import UnliftIO (MonadUnliftIO) import Lib.Prelude import Network.Minio.API import Network.Minio.Data import Network.Minio.Errors import Network.Minio.Utils import Network.Minio.XmlGenerator import Network.Minio.XmlParser data EventStreamException = ESEPreludeCRCFailed | ESEMessageCRCFailed | ESEUnexpectedEndOfStream | ESEDecodeFail [Char] | ESEInvalidHeaderType | ESEInvalidHeaderValueType | ESEInvalidMessageType deriving (Eq, Show) instance Exception EventStreamException -- chunkSize in bytes is 32KiB chunkSize :: Int chunkSize = 32 * 1024 parseBinary :: Bin.Binary a => ByteString -> IO a parseBinary b = do case Bin.decodeOrFail $ LB.fromStrict b of Left (_, _, msg) -> throwIO $ ESEDecodeFail msg Right (_, _, r) -> return r bytesToHeaderName :: Text -> IO MsgHeaderName bytesToHeaderName t = case t of ":message-type" -> return MessageType ":event-type" -> return EventType ":content-type" -> return ContentType ":error-code" -> return ErrorCode ":error-message" -> return ErrorMessage _ -> throwIO ESEInvalidHeaderType parseHeaders :: MonadUnliftIO m => Word32 -> C.ConduitM ByteString a m [MessageHeader] parseHeaders 0 = return [] parseHeaders hdrLen = do bs1 <- readNBytes 1 n :: Word8 <- liftIO $ parseBinary bs1 headerKeyBytes <- readNBytes $ fromIntegral n let headerKey = decodeUtf8Lenient headerKeyBytes headerName <- liftIO $ bytesToHeaderName headerKey bs2 <- readNBytes 1 headerValueType :: Word8 <- liftIO $ parseBinary bs2 when (headerValueType /= 7) $ throwIO ESEInvalidHeaderValueType bs3 <- readNBytes 2 vLen :: Word16 <- liftIO $ parseBinary bs3 headerValueBytes <- readNBytes $ fromIntegral vLen let headerValue = decodeUtf8Lenient headerValueBytes m = (headerName, headerValue) k = 1 + fromIntegral n + 1 + 2 + fromIntegral vLen ms <- parseHeaders (hdrLen - k) return (m:ms) -- readNBytes returns N bytes read from the string and throws an -- exception if N bytes are not present on the stream. readNBytes :: MonadUnliftIO m => Int -> C.ConduitM ByteString a m ByteString readNBytes n = do b <- LB.toStrict <$> (C.takeCE n .| C.sinkLazy) if B.length b /= n then throwIO ESEUnexpectedEndOfStream else return b crcCheck :: MonadUnliftIO m => C.ConduitM ByteString ByteString m () crcCheck = do b <- readNBytes 12 n :: Word32 <- liftIO $ parseBinary $ B.take 4 b preludeCRC :: Word32 <- liftIO $ parseBinary $ B.drop 8 b when (crc32 (B.take 8 b) /= preludeCRC) $ throwIO ESEPreludeCRCFailed -- we do not yield the checksum C.yield $ B.take 8 b -- 12 bytes have been read off the current message. Now read the -- next (n-12)-4 bytes and accumulate the checksum, and yield it. let startCrc = crc32 b finalCrc <- accumulateYield (fromIntegral n-16) startCrc bs <- readNBytes 4 expectedCrc :: Word32 <- liftIO $ parseBinary bs when (finalCrc /= expectedCrc) $ throwIO ESEMessageCRCFailed -- we unconditionally recurse - downstream figures out when to -- quit reading the stream crcCheck where accumulateYield n checkSum = do let toRead = min n chunkSize b <- readNBytes toRead let c' = crc32Update checkSum b n' = n - B.length b C.yield b if n' > 0 then accumulateYield n' c' else return c' handleMessage :: MonadUnliftIO m => C.ConduitT ByteString EventMessage m () handleMessage = do b1 <- readNBytes 4 msgLen :: Word32 <- liftIO $ parseBinary b1 b2 <- readNBytes 4 hdrLen :: Word32 <- liftIO $ parseBinary b2 hs <- parseHeaders hdrLen let payloadLen = msgLen - hdrLen - 16 getHdrVal h = fmap snd . headMay . filter ((h ==) . fst) eventHdrValue = getHdrVal EventType hs msgHdrValue = getHdrVal MessageType hs errCode = getHdrVal ErrorCode hs errMsg = getHdrVal ErrorMessage hs case msgHdrValue of Just "event" -> do case eventHdrValue of Just "Records" -> passThrough $ fromIntegral payloadLen Just "Cont" -> return () Just "Progress" -> do bs <- readNBytes $ fromIntegral payloadLen progress <- parseSelectProgress bs C.yield $ ProgressEventMessage progress Just "Stats" -> do bs <- readNBytes $ fromIntegral payloadLen stats <- parseSelectProgress bs C.yield $ StatsEventMessage stats Just "End" -> return () _ -> throwIO ESEInvalidMessageType when (eventHdrValue /= Just "End") handleMessage Just "error" -> do let reqMsgMay = RequestLevelErrorMessage <$> errCode <*> errMsg maybe (throwIO ESEInvalidMessageType) C.yield reqMsgMay _ -> throwIO ESEInvalidMessageType where passThrough 0 = return () passThrough n = do let c = min n chunkSize b <- readNBytes c C.yield $ RecordPayloadEventMessage b passThrough $ n - B.length b selectProtoConduit :: MonadUnliftIO m => C.ConduitT ByteString EventMessage m () selectProtoConduit = crcCheck .| handleMessage -- | selectObjectContent calls the SelectRequest on the given -- object. It returns a Conduit of event messages that can be consumed -- by the client. selectObjectContent :: Bucket -> Object -> SelectRequest -> Minio (C.ConduitT () EventMessage Minio ()) selectObjectContent b o r = do let reqInfo = defaultS3ReqInfo { riMethod = HT.methodPost , riBucket = Just b , riObject = Just o , riPayload = PayloadBS $ mkSelectRequest r , riNeedsLocation = False , riQueryParams = [("select", Nothing), ("select-type", Just "2")] } --print $ mkSelectRequest r resp <- mkStreamRequest reqInfo return $ NC.responseBody resp .| selectProtoConduit -- | A helper conduit that returns only the record payload bytes. getPayloadBytes :: MonadIO m => C.ConduitT EventMessage ByteString m () getPayloadBytes = do evM <- C.await case evM of Just v -> do case v of RecordPayloadEventMessage b -> C.yield b RequestLevelErrorMessage c m -> liftIO $ throwIO $ SelectErr c m _ -> return () getPayloadBytes Nothing -> return ()