-- -- Minio Haskell SDK, (C) 2017 Minio, Inc. -- -- Licensed under the Apache License, Version 2.0 (the "License"); -- you may not use this file except in compliance with the License. -- You may obtain a copy of the License at -- -- http://www.apache.org/licenses/LICENSE-2.0 -- -- Unless required by applicable law or agreed to in writing, software -- distributed under the License is distributed on an "AS IS" BASIS, -- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -- See the License for the specific language governing permissions and -- limitations under the License. -- module Network.Minio.PutObject ( putObjectInternal , ObjectData(..) , selectPartSizes , copyObjectInternal , selectCopyRanges , minPartSize ) where import qualified Data.Conduit as C import qualified Data.Conduit.Binary as CB import qualified Data.Conduit.Combinators as CC import qualified Data.Conduit.List as CL import qualified Data.List as List import Lib.Prelude import Network.Minio.Data import Network.Minio.Errors import Network.Minio.S3API import Network.Minio.Utils -- | max obj size is 5TiB maxObjectSize :: Int64 maxObjectSize = 5 * 1024 * 1024 * oneMiB -- | minimum size of parts used in multipart operations. minPartSize :: Int64 minPartSize = 64 * oneMiB oneMiB :: Int64 oneMiB = 1024 * 1024 maxMultipartParts :: Int64 maxMultipartParts = 10000 -- | A data-type to represent the source data for an object. A -- file-path or a producer-conduit may be provided. -- -- For files, a size may be provided - this is useful in cases when -- the file size cannot be automatically determined or if only some -- prefix of the file is desired. -- -- For streams also, a size may be provided. This is useful to limit -- the input - if it is not provided, upload will continue until the -- stream ends or the object reaches `maxObjectsize` size. data ObjectData m = ODFile FilePath (Maybe Int64) -- ^ Takes filepath and optional size. | ODStream (C.Producer m ByteString) (Maybe Int64) -- ^ Pass size in bytes as maybe if known. -- | Put an object from ObjectData. This high-level API handles -- objects of all sizes, and even if the object size is unknown. putObjectInternal :: Bucket -> Object -> ObjectData Minio -> Minio ETag putObjectInternal b o (ODStream src sizeMay) = sequentialMultipartUpload b o sizeMay src putObjectInternal b o (ODFile fp sizeMay) = do hResE <- withNewHandle fp $ \h -> liftM2 (,) (isHandleSeekable h) (getFileSize h) (isSeekable, handleSizeMay) <- either (const $ return (False, Nothing)) return hResE -- prefer given size to queried size. let finalSizeMay = listToMaybe $ catMaybes [sizeMay, handleSizeMay] case finalSizeMay of -- unable to get size, so assume non-seekable file and max-object size Nothing -> sequentialMultipartUpload b o (Just maxObjectSize) $ CB.sourceFile fp -- got file size, so check for single/multipart upload Just size -> if | size <= 64 * oneMiB -> either throwM return =<< withNewHandle fp (\h -> putObjectSingle b o [] h 0 size) | size > maxObjectSize -> throwM $ MErrVPutSizeExceeded size | isSeekable -> parallelMultipartUpload b o fp size | otherwise -> sequentialMultipartUpload b o (Just size) $ CB.sourceFile fp -- | Select part sizes - the logic is that the minimum part-size will -- be 64MiB. selectPartSizes :: Int64 -> [(PartNumber, Int64, Int64)] selectPartSizes size = uncurry (List.zip3 [1..]) $ List.unzip $ loop 0 size where ceil :: Double -> Int64 ceil = ceiling partSize = max minPartSize (ceil $ fromIntegral size / fromIntegral maxMultipartParts) m = fromIntegral partSize loop st sz | st > sz = [] | st + m >= sz = [(st, sz - st)] | otherwise = (st, m) : loop (st + m) sz parallelMultipartUpload :: Bucket -> Object -> FilePath -> Int64 -> Minio ETag parallelMultipartUpload b o filePath size = do -- get a new upload id. uploadId <- newMultipartUpload b o [] let partSizeInfo = selectPartSizes size -- perform upload with 10 threads uploadedPartsE <- limitedMapConcurrently 10 (uploadPart uploadId) partSizeInfo -- if there were any errors, rethrow exception. mapM_ throwM $ lefts uploadedPartsE -- if we get here, all parts were successfully uploaded. completeMultipartUpload b o uploadId $ rights uploadedPartsE where uploadPart uploadId (partNum, offset, sz) = withNewHandle filePath $ \h -> do let payload = PayloadH h offset sz putObjectPart b o uploadId partNum [] payload -- | Upload multipart object from conduit source sequentially sequentialMultipartUpload :: Bucket -> Object -> Maybe Int64 -> C.Producer Minio ByteString -> Minio ETag sequentialMultipartUpload b o sizeMay src = do -- get a new upload id. uploadId <- newMultipartUpload b o [] -- upload parts in loop let partSizes = selectPartSizes $ maybe maxObjectSize identity sizeMay (pnums, _, sizes) = List.unzip3 partSizes uploadedParts <- src C..| chunkBSConduit sizes C..| CL.map PayloadBS C..| uploadPart' uploadId pnums C.$$ CC.sinkList -- complete multipart upload completeMultipartUpload b o uploadId uploadedParts where uploadPart' _ [] = return () uploadPart' uid (pn:pns) = do payloadMay <- C.await case payloadMay of Nothing -> return () Just payload -> do pinfo <- lift $ putObjectPart b o uid pn [] payload C.yield pinfo uploadPart' uid pns -- | Copy an object using single or multipart copy strategy. copyObjectInternal :: Bucket -> Object -> CopyPartSource -> Minio ETag copyObjectInternal b' o cps = do -- validate and extract the src bucket and object (srcBucket, srcObject) <- maybe (throwM $ MErrVInvalidSrcObjSpec $ cpSource cps) return $ cpsToObject cps -- get source object size with a head request (ObjectInfo _ _ _ srcSize) <- headObject srcBucket srcObject -- check that byte offsets are valid if specified in cps when (isJust (cpSourceRange cps) && or [fst range < 0, snd range < fst range, snd range >= fromIntegral srcSize]) $ throwM $ MErrVInvalidSrcObjByteRange range -- 1. If sz > 64MiB (minPartSize) use multipart copy, OR -- 2. If startOffset /= 0 use multipart copy let destSize = (\(a, b) -> b - a + 1 ) $ maybe (0, srcSize - 1) identity $ cpSourceRange cps startOffset = maybe 0 fst $ cpSourceRange cps endOffset = maybe (srcSize - 1) snd $ cpSourceRange cps if destSize > minPartSize || (endOffset - startOffset + 1 /= srcSize) then multiPartCopyObject b' o cps srcSize else fst <$> copyObjectSingle b' o cps{cpSourceRange = Nothing} [] where range = maybe (0, 0) identity $ cpSourceRange cps -- | Given the input byte range of the source object, compute the -- splits for a multipart copy object procedure. Minimum part size -- used is minPartSize. selectCopyRanges :: (Int64, Int64) -> [(PartNumber, (Int64, Int64))] selectCopyRanges (st, end) = zip pns $ map (\(x, y) -> (st + x, st + x + y - 1)) $ zip startOffsets partSizes where size = end - st + 1 (pns, startOffsets, partSizes) = List.unzip3 $ selectPartSizes size -- | Perform a multipart copy object action. Since we cannot verify -- existing parts based on the source object, there is no resuming -- copy action support. multiPartCopyObject :: Bucket -> Object -> CopyPartSource -> Int64 -> Minio ETag multiPartCopyObject b o cps srcSize = do uid <- newMultipartUpload b o [] let byteRange = maybe (0, fromIntegral $ srcSize - 1) identity $ cpSourceRange cps partRanges = selectCopyRanges byteRange partSources = map (\(x, y) -> (x, cps {cpSourceRange = Just y})) partRanges copiedParts <- limitedMapConcurrently 10 (\(pn, cps') -> do (etag, _) <- copyObjectPart b o cps' uid pn [] return (pn, etag) ) partSources completeMultipartUpload b o uid copiedParts