module Network.Minio.SelectAPI
(
selectObjectContent,
SelectRequest,
selectRequest,
InputSerialization,
defaultCsvInput,
linesJsonInput,
documentJsonInput,
defaultParquetInput,
setInputCSVProps,
CompressionType (..),
setInputCompressionType,
CSVProp,
recordDelimiter,
fieldDelimiter,
quoteCharacter,
quoteEscapeCharacter,
commentCharacter,
allowQuotedRecordDelimiter,
FileHeaderInfo (..),
fileHeaderInfo,
QuoteFields (..),
quoteFields,
OutputSerialization,
defaultCsvOutput,
defaultJsonOutput,
outputCSVFromProps,
outputJSONFromRecordDelimiter,
setRequestProgressEnabled,
getPayloadBytes,
EventMessage (..),
Progress (..),
Stats,
)
where
import Conduit ((.|))
import qualified Conduit as C
import qualified Data.Binary as Bin
import qualified Data.ByteString as B
import qualified Data.ByteString.Lazy as LB
import Data.Digest.CRC32 (crc32, crc32Update)
import Lib.Prelude
import qualified Network.HTTP.Conduit as NC
import qualified Network.HTTP.Types as HT
import Network.Minio.API
import Network.Minio.Data
import Network.Minio.Errors
import Network.Minio.Utils
import Network.Minio.XmlGenerator
import Network.Minio.XmlParser
import UnliftIO (MonadUnliftIO)
data EventStreamException
= ESEPreludeCRCFailed
| ESEMessageCRCFailed
| ESEUnexpectedEndOfStream
| ESEDecodeFail [Char]
|
|
| ESEInvalidMessageType
deriving stock (EventStreamException -> EventStreamException -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: EventStreamException -> EventStreamException -> Bool
$c/= :: EventStreamException -> EventStreamException -> Bool
== :: EventStreamException -> EventStreamException -> Bool
$c== :: EventStreamException -> EventStreamException -> Bool
Eq, Int -> EventStreamException -> ShowS
[EventStreamException] -> ShowS
EventStreamException -> [Char]
forall a.
(Int -> a -> ShowS) -> (a -> [Char]) -> ([a] -> ShowS) -> Show a
showList :: [EventStreamException] -> ShowS
$cshowList :: [EventStreamException] -> ShowS
show :: EventStreamException -> [Char]
$cshow :: EventStreamException -> [Char]
showsPrec :: Int -> EventStreamException -> ShowS
$cshowsPrec :: Int -> EventStreamException -> ShowS
Show)
instance Exception EventStreamException
chunkSize :: Int
chunkSize :: Int
chunkSize = Int
32 forall a. Num a => a -> a -> a
* Int
1024
parseBinary :: (Bin.Binary a) => ByteString -> IO a
parseBinary :: forall a. Binary a => ByteString -> IO a
parseBinary ByteString
b = do
case forall a.
Binary a =>
ByteString
-> Either
(ByteString, ByteOffset, [Char]) (ByteString, ByteOffset, a)
Bin.decodeOrFail forall a b. (a -> b) -> a -> b
$ ByteString -> ByteString
LB.fromStrict ByteString
b of
Left (ByteString
_, ByteOffset
_, [Char]
msg) -> forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO forall a b. (a -> b) -> a -> b
$ [Char] -> EventStreamException
ESEDecodeFail [Char]
msg
Right (ByteString
_, ByteOffset
_, a
r) -> forall (m :: * -> *) a. Monad m => a -> m a
return a
r
bytesToHeaderName :: Text -> IO MsgHeaderName
Text
t = case Text
t of
Text
":message-type" -> forall (m :: * -> *) a. Monad m => a -> m a
return MsgHeaderName
MessageType
Text
":event-type" -> forall (m :: * -> *) a. Monad m => a -> m a
return MsgHeaderName
EventType
Text
":content-type" -> forall (m :: * -> *) a. Monad m => a -> m a
return MsgHeaderName
ContentType
Text
":error-code" -> forall (m :: * -> *) a. Monad m => a -> m a
return MsgHeaderName
ErrorCode
Text
":error-message" -> forall (m :: * -> *) a. Monad m => a -> m a
return MsgHeaderName
ErrorMessage
Text
_ -> forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO EventStreamException
ESEInvalidHeaderType
parseHeaders ::
(MonadUnliftIO m) =>
Word32 ->
C.ConduitM ByteString a m [MessageHeader]
Word32
0 = forall (m :: * -> *) a. Monad m => a -> m a
return []
parseHeaders Word32
hdrLen = do
ByteString
bs1 <- forall (m :: * -> *) a.
MonadUnliftIO m =>
Int -> ConduitM ByteString a m ByteString
readNBytes Int
1
Word8
n :: Word8 <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. Binary a => ByteString -> IO a
parseBinary ByteString
bs1
ByteString
headerKeyBytes <- forall (m :: * -> *) a.
MonadUnliftIO m =>
Int -> ConduitM ByteString a m ByteString
readNBytes forall a b. (a -> b) -> a -> b
$ forall a b. (Integral a, Num b) => a -> b
fromIntegral Word8
n
let headerKey :: Text
headerKey = ByteString -> Text
decodeUtf8Lenient ByteString
headerKeyBytes
MsgHeaderName
headerName <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ Text -> IO MsgHeaderName
bytesToHeaderName Text
headerKey
ByteString
bs2 <- forall (m :: * -> *) a.
MonadUnliftIO m =>
Int -> ConduitM ByteString a m ByteString
readNBytes Int
1
Word8
headerValueType :: Word8 <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. Binary a => ByteString -> IO a
parseBinary ByteString
bs2
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Word8
headerValueType forall a. Eq a => a -> a -> Bool
/= Word8
7) forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO EventStreamException
ESEInvalidHeaderValueType
ByteString
bs3 <- forall (m :: * -> *) a.
MonadUnliftIO m =>
Int -> ConduitM ByteString a m ByteString
readNBytes Int
2
Word16
vLen :: Word16 <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. Binary a => ByteString -> IO a
parseBinary ByteString
bs3
ByteString
headerValueBytes <- forall (m :: * -> *) a.
MonadUnliftIO m =>
Int -> ConduitM ByteString a m ByteString
readNBytes forall a b. (a -> b) -> a -> b
$ forall a b. (Integral a, Num b) => a -> b
fromIntegral Word16
vLen
let headerValue :: Text
headerValue = ByteString -> Text
decodeUtf8Lenient ByteString
headerValueBytes
m :: MessageHeader
m = (MsgHeaderName
headerName, Text
headerValue)
k :: Word32
k = Word32
1 forall a. Num a => a -> a -> a
+ forall a b. (Integral a, Num b) => a -> b
fromIntegral Word8
n forall a. Num a => a -> a -> a
+ Word32
1 forall a. Num a => a -> a -> a
+ Word32
2 forall a. Num a => a -> a -> a
+ forall a b. (Integral a, Num b) => a -> b
fromIntegral Word16
vLen
[MessageHeader]
ms <- forall (m :: * -> *) a.
MonadUnliftIO m =>
Word32 -> ConduitM ByteString a m [MessageHeader]
parseHeaders (Word32
hdrLen forall a. Num a => a -> a -> a
- Word32
k)
forall (m :: * -> *) a. Monad m => a -> m a
return (MessageHeader
m forall a. a -> [a] -> [a]
: [MessageHeader]
ms)
readNBytes :: (MonadUnliftIO m) => Int -> C.ConduitM ByteString a m ByteString
readNBytes :: forall (m :: * -> *) a.
MonadUnliftIO m =>
Int -> ConduitM ByteString a m ByteString
readNBytes Int
n = do
ByteString
b <- ByteString -> ByteString
LB.toStrict forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (forall (m :: * -> *) seq.
(Monad m, IsSequence seq) =>
Index seq -> ConduitT seq seq m ()
C.takeCE Int
n forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.| forall (m :: * -> *) lazy strict o.
(Monad m, LazySequence lazy strict) =>
ConduitT strict o m lazy
C.sinkLazy)
if ByteString -> Int
B.length ByteString
b forall a. Eq a => a -> a -> Bool
/= Int
n
then forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO EventStreamException
ESEUnexpectedEndOfStream
else forall (m :: * -> *) a. Monad m => a -> m a
return ByteString
b
crcCheck ::
(MonadUnliftIO m) =>
C.ConduitM ByteString ByteString m ()
crcCheck :: forall (m :: * -> *).
MonadUnliftIO m =>
ConduitM ByteString ByteString m ()
crcCheck = do
ByteString
b <- forall (m :: * -> *) a.
MonadUnliftIO m =>
Int -> ConduitM ByteString a m ByteString
readNBytes Int
12
Word32
n :: Word32 <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. Binary a => ByteString -> IO a
parseBinary forall a b. (a -> b) -> a -> b
$ Int -> ByteString -> ByteString
B.take Int
4 ByteString
b
Word32
preludeCRC :: Word32 <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. Binary a => ByteString -> IO a
parseBinary forall a b. (a -> b) -> a -> b
$ Int -> ByteString -> ByteString
B.drop Int
8 ByteString
b
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (forall a. CRC32 a => a -> Word32
crc32 (Int -> ByteString -> ByteString
B.take Int
8 ByteString
b) forall a. Eq a => a -> a -> Bool
/= Word32
preludeCRC) forall a b. (a -> b) -> a -> b
$
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO EventStreamException
ESEPreludeCRCFailed
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
C.yield forall a b. (a -> b) -> a -> b
$ Int -> ByteString -> ByteString
B.take Int
8 ByteString
b
let startCrc :: Word32
startCrc = forall a. CRC32 a => a -> Word32
crc32 ByteString
b
Word32
finalCrc <- forall {m :: * -> *}.
MonadUnliftIO m =>
Int -> Word32 -> ConduitT ByteString ByteString m Word32
accumulateYield (forall a b. (Integral a, Num b) => a -> b
fromIntegral Word32
n forall a. Num a => a -> a -> a
- Int
16) Word32
startCrc
ByteString
bs <- forall (m :: * -> *) a.
MonadUnliftIO m =>
Int -> ConduitM ByteString a m ByteString
readNBytes Int
4
Word32
expectedCrc :: Word32 <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. Binary a => ByteString -> IO a
parseBinary ByteString
bs
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Word32
finalCrc forall a. Eq a => a -> a -> Bool
/= Word32
expectedCrc) forall a b. (a -> b) -> a -> b
$
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO EventStreamException
ESEMessageCRCFailed
forall (m :: * -> *).
MonadUnliftIO m =>
ConduitM ByteString ByteString m ()
crcCheck
where
accumulateYield :: Int -> Word32 -> ConduitT ByteString ByteString m Word32
accumulateYield Int
n Word32
checkSum = do
let toRead :: Int
toRead = forall a. Ord a => a -> a -> a
min Int
n Int
chunkSize
ByteString
b <- forall (m :: * -> *) a.
MonadUnliftIO m =>
Int -> ConduitM ByteString a m ByteString
readNBytes Int
toRead
let c' :: Word32
c' = forall a. CRC32 a => Word32 -> a -> Word32
crc32Update Word32
checkSum ByteString
b
n' :: Int
n' = Int
n forall a. Num a => a -> a -> a
- ByteString -> Int
B.length ByteString
b
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
C.yield ByteString
b
if Int
n' forall a. Ord a => a -> a -> Bool
> Int
0
then Int -> Word32 -> ConduitT ByteString ByteString m Word32
accumulateYield Int
n' Word32
c'
else forall (m :: * -> *) a. Monad m => a -> m a
return Word32
c'
handleMessage :: (MonadUnliftIO m) => C.ConduitT ByteString EventMessage m ()
handleMessage :: forall (m :: * -> *).
MonadUnliftIO m =>
ConduitT ByteString EventMessage m ()
handleMessage = do
ByteString
b1 <- forall (m :: * -> *) a.
MonadUnliftIO m =>
Int -> ConduitM ByteString a m ByteString
readNBytes Int
4
Word32
msgLen :: Word32 <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. Binary a => ByteString -> IO a
parseBinary ByteString
b1
ByteString
b2 <- forall (m :: * -> *) a.
MonadUnliftIO m =>
Int -> ConduitM ByteString a m ByteString
readNBytes Int
4
Word32
hdrLen :: Word32 <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. Binary a => ByteString -> IO a
parseBinary ByteString
b2
[MessageHeader]
hs <- forall (m :: * -> *) a.
MonadUnliftIO m =>
Word32 -> ConduitM ByteString a m [MessageHeader]
parseHeaders Word32
hdrLen
let payloadLen :: Word32
payloadLen = Word32
msgLen forall a. Num a => a -> a -> a
- Word32
hdrLen forall a. Num a => a -> a -> a
- Word32
16
getHdrVal :: b -> t (b, b) -> Maybe b
getHdrVal b
h = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall a b. (a, b) -> b
snd forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Maybe a
find ((b
h forall a. Eq a => a -> a -> Bool
==) forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a b. (a, b) -> a
fst)
eventHdrValue :: Maybe Text
eventHdrValue = forall {t :: * -> *} {b} {b}.
(Foldable t, Eq b) =>
b -> t (b, b) -> Maybe b
getHdrVal MsgHeaderName
EventType [MessageHeader]
hs
msgHdrValue :: Maybe Text
msgHdrValue = forall {t :: * -> *} {b} {b}.
(Foldable t, Eq b) =>
b -> t (b, b) -> Maybe b
getHdrVal MsgHeaderName
MessageType [MessageHeader]
hs
errCode :: Maybe Text
errCode = forall {t :: * -> *} {b} {b}.
(Foldable t, Eq b) =>
b -> t (b, b) -> Maybe b
getHdrVal MsgHeaderName
ErrorCode [MessageHeader]
hs
errMsg :: Maybe Text
errMsg = forall {t :: * -> *} {b} {b}.
(Foldable t, Eq b) =>
b -> t (b, b) -> Maybe b
getHdrVal MsgHeaderName
ErrorMessage [MessageHeader]
hs
case Maybe Text
msgHdrValue of
Just Text
"event" -> do
case Maybe Text
eventHdrValue of
Just Text
"Records" -> forall {m :: * -> *}.
MonadUnliftIO m =>
Int -> ConduitT ByteString EventMessage m ()
passThrough forall a b. (a -> b) -> a -> b
$ forall a b. (Integral a, Num b) => a -> b
fromIntegral Word32
payloadLen
Just Text
"Cont" -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
Just Text
"Progress" -> do
ByteString
bs <- forall (m :: * -> *) a.
MonadUnliftIO m =>
Int -> ConduitM ByteString a m ByteString
readNBytes forall a b. (a -> b) -> a -> b
$ forall a b. (Integral a, Num b) => a -> b
fromIntegral Word32
payloadLen
Progress
progress <- forall (m :: * -> *). MonadIO m => ByteString -> m Progress
parseSelectProgress ByteString
bs
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
C.yield forall a b. (a -> b) -> a -> b
$ Progress -> EventMessage
ProgressEventMessage Progress
progress
Just Text
"Stats" -> do
ByteString
bs <- forall (m :: * -> *) a.
MonadUnliftIO m =>
Int -> ConduitM ByteString a m ByteString
readNBytes forall a b. (a -> b) -> a -> b
$ forall a b. (Integral a, Num b) => a -> b
fromIntegral Word32
payloadLen
Progress
stats <- forall (m :: * -> *). MonadIO m => ByteString -> m Progress
parseSelectProgress ByteString
bs
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
C.yield forall a b. (a -> b) -> a -> b
$ Progress -> EventMessage
StatsEventMessage Progress
stats
Just Text
"End" -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
Maybe Text
_ -> forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO EventStreamException
ESEInvalidMessageType
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Maybe Text
eventHdrValue forall a. Eq a => a -> a -> Bool
/= forall a. a -> Maybe a
Just Text
"End") forall (m :: * -> *).
MonadUnliftIO m =>
ConduitT ByteString EventMessage m ()
handleMessage
Just Text
"error" -> do
let reqMsgMay :: Maybe EventMessage
reqMsgMay = Text -> Text -> EventMessage
RequestLevelErrorMessage forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe Text
errCode forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Maybe Text
errMsg
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO EventStreamException
ESEInvalidMessageType) forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
C.yield Maybe EventMessage
reqMsgMay
Maybe Text
_ -> forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO EventStreamException
ESEInvalidMessageType
where
passThrough :: Int -> ConduitT ByteString EventMessage m ()
passThrough Int
0 = forall (m :: * -> *) a. Monad m => a -> m a
return ()
passThrough Int
n = do
let c :: Int
c = forall a. Ord a => a -> a -> a
min Int
n Int
chunkSize
ByteString
b <- forall (m :: * -> *) a.
MonadUnliftIO m =>
Int -> ConduitM ByteString a m ByteString
readNBytes Int
c
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
C.yield forall a b. (a -> b) -> a -> b
$ ByteString -> EventMessage
RecordPayloadEventMessage ByteString
b
Int -> ConduitT ByteString EventMessage m ()
passThrough forall a b. (a -> b) -> a -> b
$ Int
n forall a. Num a => a -> a -> a
- ByteString -> Int
B.length ByteString
b
selectProtoConduit ::
(MonadUnliftIO m) =>
C.ConduitT ByteString EventMessage m ()
selectProtoConduit :: forall (m :: * -> *).
MonadUnliftIO m =>
ConduitT ByteString EventMessage m ()
selectProtoConduit = forall (m :: * -> *).
MonadUnliftIO m =>
ConduitM ByteString ByteString m ()
crcCheck forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.| forall (m :: * -> *).
MonadUnliftIO m =>
ConduitT ByteString EventMessage m ()
handleMessage
selectObjectContent ::
Bucket ->
Object ->
SelectRequest ->
Minio (C.ConduitT () EventMessage Minio ())
selectObjectContent :: Text
-> Text
-> SelectRequest
-> Minio (ConduitT () EventMessage Minio ())
selectObjectContent Text
b Text
o SelectRequest
r = do
let reqInfo :: S3ReqInfo
reqInfo =
S3ReqInfo
defaultS3ReqInfo
{ riMethod :: ByteString
riMethod = ByteString
HT.methodPost,
riBucket :: Maybe Text
riBucket = forall a. a -> Maybe a
Just Text
b,
riObject :: Maybe Text
riObject = forall a. a -> Maybe a
Just Text
o,
riPayload :: Payload
riPayload = ByteString -> Payload
PayloadBS forall a b. (a -> b) -> a -> b
$ SelectRequest -> ByteString
mkSelectRequest SelectRequest
r,
riNeedsLocation :: Bool
riNeedsLocation = Bool
False,
riQueryParams :: Query
riQueryParams = [(ByteString
"select", forall a. Maybe a
Nothing), (ByteString
"select-type", forall a. a -> Maybe a
Just ByteString
"2")]
}
Response (ConduitM () ByteString Minio ())
resp <- S3ReqInfo -> Minio (Response (ConduitM () ByteString Minio ()))
mkStreamRequest S3ReqInfo
reqInfo
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall body. Response body -> body
NC.responseBody Response (ConduitM () ByteString Minio ())
resp forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.| forall (m :: * -> *).
MonadUnliftIO m =>
ConduitT ByteString EventMessage m ()
selectProtoConduit
getPayloadBytes :: (MonadIO m) => C.ConduitT EventMessage ByteString m ()
getPayloadBytes :: forall (m :: * -> *).
MonadIO m =>
ConduitT EventMessage ByteString m ()
getPayloadBytes = do
Maybe EventMessage
evM <- forall (m :: * -> *) i o. Monad m => ConduitT i o m (Maybe i)
C.await
case Maybe EventMessage
evM of
Just EventMessage
v -> do
case EventMessage
v of
RecordPayloadEventMessage ByteString
b -> forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
C.yield ByteString
b
RequestLevelErrorMessage Text
c Text
m -> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO forall a b. (a -> b) -> a -> b
$ Text -> Text -> ServiceErr
SelectErr Text
c Text
m
EventMessage
_ -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
forall (m :: * -> *).
MonadIO m =>
ConduitT EventMessage ByteString m ()
getPayloadBytes
Maybe EventMessage
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return ()