module Network.Minio.PutObject
(
putObjectInternal
, ObjectData(..)
, selectPartSizes
, copyObjectInternal
, selectCopyRanges
, minPartSize
) where
import qualified Data.Conduit as C
import qualified Data.Conduit.Binary as CB
import qualified Data.Conduit.Combinators as CC
import qualified Data.Conduit.List as CL
import qualified Data.List as List
import Lib.Prelude
import Network.Minio.Data
import Network.Minio.Errors
import Network.Minio.S3API
import Network.Minio.Utils
maxObjectSize :: Int64
maxObjectSize = 5 * 1024 * 1024 * oneMiB
minPartSize :: Int64
minPartSize = 64 * oneMiB
oneMiB :: Int64
oneMiB = 1024 * 1024
maxMultipartParts :: Int64
maxMultipartParts = 10000
data ObjectData m =
ODFile FilePath (Maybe Int64)
| ODStream (C.Producer m ByteString) (Maybe Int64)
putObjectInternal :: Bucket -> Object -> ObjectData Minio -> Minio ETag
putObjectInternal b o (ODStream src sizeMay) = sequentialMultipartUpload b o sizeMay src
putObjectInternal b o (ODFile fp sizeMay) = do
hResE <- withNewHandle fp $ \h ->
liftM2 (,) (isHandleSeekable h) (getFileSize h)
(isSeekable, handleSizeMay) <- either (const $ return (False, Nothing)) return
hResE
let finalSizeMay = listToMaybe $ catMaybes [sizeMay, handleSizeMay]
case finalSizeMay of
Nothing -> sequentialMultipartUpload b o (Just maxObjectSize) $
CB.sourceFile fp
Just size ->
if | size <= 64 * oneMiB -> either throwM return =<<
withNewHandle fp (\h -> putObjectSingle b o [] h 0 size)
| size > maxObjectSize -> throwM $ MErrVPutSizeExceeded size
| isSeekable -> parallelMultipartUpload b o fp size
| otherwise -> sequentialMultipartUpload b o (Just size) $
CB.sourceFile fp
selectPartSizes :: Int64 -> [(PartNumber, Int64, Int64)]
selectPartSizes size = uncurry (List.zip3 [1..]) $
List.unzip $ loop 0 size
where
ceil :: Double -> Int64
ceil = ceiling
partSize = max minPartSize (ceil $ fromIntegral size /
fromIntegral maxMultipartParts)
m = fromIntegral partSize
loop st sz
| st > sz = []
| st + m >= sz = [(st, sz st)]
| otherwise = (st, m) : loop (st + m) sz
parallelMultipartUpload :: Bucket -> Object -> FilePath -> Int64
-> Minio ETag
parallelMultipartUpload b o filePath size = do
uploadId <- newMultipartUpload b o []
let partSizeInfo = selectPartSizes size
uploadedPartsE <- limitedMapConcurrently 10
(uploadPart uploadId) partSizeInfo
mapM_ throwM $ lefts uploadedPartsE
completeMultipartUpload b o uploadId $ rights uploadedPartsE
where
uploadPart uploadId (partNum, offset, sz) =
withNewHandle filePath $ \h -> do
let payload = PayloadH h offset sz
putObjectPart b o uploadId partNum [] payload
sequentialMultipartUpload :: Bucket -> Object -> Maybe Int64
-> C.Producer Minio ByteString -> Minio ETag
sequentialMultipartUpload b o sizeMay src = do
uploadId <- newMultipartUpload b o []
let partSizes = selectPartSizes $ maybe maxObjectSize identity sizeMay
(pnums, _, sizes) = List.unzip3 partSizes
uploadedParts <- src
C..| chunkBSConduit sizes
C..| CL.map PayloadBS
C..| uploadPart' uploadId pnums
C.$$ CC.sinkList
completeMultipartUpload b o uploadId uploadedParts
where
uploadPart' _ [] = return ()
uploadPart' uid (pn:pns) = do
payloadMay <- C.await
case payloadMay of
Nothing -> return ()
Just payload -> do pinfo <- lift $ putObjectPart b o uid pn [] payload
C.yield pinfo
uploadPart' uid pns
copyObjectInternal :: Bucket -> Object -> CopyPartSource
-> Minio ETag
copyObjectInternal b' o cps = do
(srcBucket, srcObject) <- maybe
(throwM $ MErrVInvalidSrcObjSpec $ cpSource cps)
return $ cpsToObject cps
(ObjectInfo _ _ _ srcSize) <- headObject srcBucket srcObject
when (isJust (cpSourceRange cps) &&
or [fst range < 0, snd range < fst range,
snd range >= fromIntegral srcSize]) $
throwM $ MErrVInvalidSrcObjByteRange range
let destSize = (\(a, b) -> b a + 1 ) $
maybe (0, srcSize 1) identity $ cpSourceRange cps
startOffset = maybe 0 fst $ cpSourceRange cps
endOffset = maybe (srcSize 1) snd $ cpSourceRange cps
if destSize > minPartSize || (endOffset startOffset + 1 /= srcSize)
then multiPartCopyObject b' o cps srcSize
else fst <$> copyObjectSingle b' o cps{cpSourceRange = Nothing} []
where
range = maybe (0, 0) identity $ cpSourceRange cps
selectCopyRanges :: (Int64, Int64) -> [(PartNumber, (Int64, Int64))]
selectCopyRanges (st, end) = zip pns $
map (\(x, y) -> (st + x, st + x + y 1)) $ zip startOffsets partSizes
where
size = end st + 1
(pns, startOffsets, partSizes) = List.unzip3 $ selectPartSizes size
multiPartCopyObject :: Bucket -> Object -> CopyPartSource -> Int64
-> Minio ETag
multiPartCopyObject b o cps srcSize = do
uid <- newMultipartUpload b o []
let byteRange = maybe (0, fromIntegral $ srcSize 1) identity $
cpSourceRange cps
partRanges = selectCopyRanges byteRange
partSources = map (\(x, y) -> (x, cps {cpSourceRange = Just y}))
partRanges
copiedParts <- limitedMapConcurrently 10
(\(pn, cps') -> do
(etag, _) <- copyObjectPart b o cps' uid pn []
return (pn, etag)
)
partSources
completeMultipartUpload b o uid copiedParts