module Network.Minio.SelectAPI
(
selectObjectContent
, SelectRequest
, selectRequest
, InputSerialization
, defaultCsvInput
, linesJsonInput
, documentJsonInput
, defaultParquetInput
, setInputCSVProps
, CompressionType(..)
, setInputCompressionType
, CSVProp
, recordDelimiter
, fieldDelimiter
, quoteCharacter
, quoteEscapeCharacter
, commentCharacter
, allowQuotedRecordDelimiter
, FileHeaderInfo(..)
, fileHeaderInfo
, QuoteFields(..)
, quoteFields
, OutputSerialization
, defaultCsvOutput
, defaultJsonOutput
, outputCSVFromProps
, outputJSONFromRecordDelimiter
, setRequestProgressEnabled
, getPayloadBytes
, EventMessage(..)
, Progress(..)
, Stats
) where
import Conduit ((.|))
import qualified Conduit as C
import qualified Data.Binary as Bin
import qualified Data.ByteString as B
import qualified Data.ByteString.Lazy as LB
import Data.Digest.CRC32 (crc32, crc32Update)
import qualified Network.HTTP.Conduit as NC
import qualified Network.HTTP.Types as HT
import UnliftIO (MonadUnliftIO)
import Lib.Prelude
import Network.Minio.API
import Network.Minio.Data
import Network.Minio.Errors
import Network.Minio.Utils
import Network.Minio.XmlGenerator
import Network.Minio.XmlParser
data EventStreamException = ESEPreludeCRCFailed
| ESEMessageCRCFailed
| ESEUnexpectedEndOfStream
| ESEDecodeFail [Char]
| ESEInvalidHeaderType
| ESEInvalidHeaderValueType
| ESEInvalidMessageType
deriving (Eq, Show)
instance Exception EventStreamException
chunkSize :: Int
chunkSize = 32 * 1024
parseBinary :: Bin.Binary a => ByteString -> IO a
parseBinary b = do
case Bin.decodeOrFail $ LB.fromStrict b of
Left (_, _, msg) -> throwIO $ ESEDecodeFail msg
Right (_, _, r) -> return r
bytesToHeaderName :: Text -> IO MsgHeaderName
bytesToHeaderName t = case t of
":message-type" -> return MessageType
":event-type" -> return EventType
":content-type" -> return ContentType
":error-code" -> return ErrorCode
":error-message" -> return ErrorMessage
_ -> throwIO ESEInvalidHeaderType
parseHeaders :: MonadUnliftIO m
=> Word32 -> C.ConduitM ByteString a m [MessageHeader]
parseHeaders 0 = return []
parseHeaders hdrLen = do
bs1 <- readNBytes 1
n :: Word8 <- liftIO $ parseBinary bs1
headerKeyBytes <- readNBytes $ fromIntegral n
let headerKey = decodeUtf8Lenient headerKeyBytes
headerName <- liftIO $ bytesToHeaderName headerKey
bs2 <- readNBytes 1
headerValueType :: Word8 <- liftIO $ parseBinary bs2
when (headerValueType /= 7) $ throwIO ESEInvalidHeaderValueType
bs3 <- readNBytes 2
vLen :: Word16 <- liftIO $ parseBinary bs3
headerValueBytes <- readNBytes $ fromIntegral vLen
let headerValue = decodeUtf8Lenient headerValueBytes
m = (headerName, headerValue)
k = 1 + fromIntegral n + 1 + 2 + fromIntegral vLen
ms <- parseHeaders (hdrLen - k)
return (m:ms)
readNBytes :: MonadUnliftIO m => Int -> C.ConduitM ByteString a m ByteString
readNBytes n = do
b <- LB.toStrict <$> (C.takeCE n .| C.sinkLazy)
if B.length b /= n
then throwIO ESEUnexpectedEndOfStream
else return b
crcCheck :: MonadUnliftIO m
=> C.ConduitM ByteString ByteString m ()
crcCheck = do
b <- readNBytes 12
n :: Word32 <- liftIO $ parseBinary $ B.take 4 b
preludeCRC :: Word32 <- liftIO $ parseBinary $ B.drop 8 b
when (crc32 (B.take 8 b) /= preludeCRC) $
throwIO ESEPreludeCRCFailed
C.yield $ B.take 8 b
let startCrc = crc32 b
finalCrc <- accumulateYield (fromIntegral n-16) startCrc
bs <- readNBytes 4
expectedCrc :: Word32 <- liftIO $ parseBinary bs
when (finalCrc /= expectedCrc) $
throwIO ESEMessageCRCFailed
crcCheck
where
accumulateYield n checkSum = do
let toRead = min n chunkSize
b <- readNBytes toRead
let c' = crc32Update checkSum b
n' = n - B.length b
C.yield b
if n' > 0
then accumulateYield n' c'
else return c'
handleMessage :: MonadUnliftIO m => C.ConduitT ByteString EventMessage m ()
handleMessage = do
b1 <- readNBytes 4
msgLen :: Word32 <- liftIO $ parseBinary b1
b2 <- readNBytes 4
hdrLen :: Word32 <- liftIO $ parseBinary b2
hs <- parseHeaders hdrLen
let payloadLen = msgLen - hdrLen - 16
getHdrVal h = fmap snd . headMay . filter ((h ==) . fst)
eventHdrValue = getHdrVal EventType hs
msgHdrValue = getHdrVal MessageType hs
errCode = getHdrVal ErrorCode hs
errMsg = getHdrVal ErrorMessage hs
case msgHdrValue of
Just "event" -> do
case eventHdrValue of
Just "Records" -> passThrough $ fromIntegral payloadLen
Just "Cont" -> return ()
Just "Progress" -> do
bs <- readNBytes $ fromIntegral payloadLen
progress <- parseSelectProgress bs
C.yield $ ProgressEventMessage progress
Just "Stats" -> do
bs <- readNBytes $ fromIntegral payloadLen
stats <- parseSelectProgress bs
C.yield $ StatsEventMessage stats
Just "End" -> return ()
_ -> throwIO ESEInvalidMessageType
when (eventHdrValue /= Just "End") handleMessage
Just "error" -> do
let reqMsgMay = RequestLevelErrorMessage <$> errCode <*> errMsg
maybe (throwIO ESEInvalidMessageType) C.yield reqMsgMay
_ -> throwIO ESEInvalidMessageType
where
passThrough 0 = return ()
passThrough n = do
let c = min n chunkSize
b <- readNBytes c
C.yield $ RecordPayloadEventMessage b
passThrough $ n - B.length b
selectProtoConduit :: MonadUnliftIO m
=> C.ConduitT ByteString EventMessage m ()
selectProtoConduit = crcCheck .| handleMessage
selectObjectContent :: Bucket -> Object -> SelectRequest
-> Minio (C.ConduitT () EventMessage Minio ())
selectObjectContent b o r = do
let reqInfo = defaultS3ReqInfo { riMethod = HT.methodPost
, riBucket = Just b
, riObject = Just o
, riPayload = PayloadBS $ mkSelectRequest r
, riNeedsLocation = False
, riQueryParams = [("select", Nothing), ("select-type", Just "2")]
}
resp <- mkStreamRequest reqInfo
return $ NC.responseBody resp .| selectProtoConduit
getPayloadBytes :: MonadIO m => C.ConduitT EventMessage ByteString m ()
getPayloadBytes = do
evM <- C.await
case evM of
Just v -> do
case v of
RecordPayloadEventMessage b -> C.yield b
RequestLevelErrorMessage c m -> liftIO $ throwIO $ SelectErr c m
_ -> return ()
getPayloadBytes
Nothing -> return ()