module Network.Minio.PutObject
( putObjectInternal,
ObjectData (..),
selectPartSizes,
)
where
import Conduit (takeC)
import qualified Conduit as C
import qualified Data.ByteString.Lazy as LBS
import qualified Data.Conduit.Binary as CB
import qualified Data.Conduit.Combinators as CC
import qualified Data.Conduit.List as CL
import qualified Data.List as List
import Lib.Prelude
import Network.Minio.Data
import Network.Minio.Errors
import Network.Minio.S3API
import Network.Minio.Utils
data ObjectData m
=
ODFile FilePath (Maybe Int64)
|
ODStream (C.ConduitM () ByteString m ()) (Maybe Int64)
putObjectInternal ::
Bucket ->
Object ->
PutObjectOptions ->
ObjectData Minio ->
Minio ETag
putObjectInternal :: Bucket
-> Bucket -> PutObjectOptions -> ObjectData Minio -> Minio Bucket
putObjectInternal Bucket
b Bucket
o PutObjectOptions
opts (ODStream ConduitM () ByteString Minio ()
src Maybe Int64
sizeMay) = do
case Maybe Int64
sizeMay of
Maybe Int64
Nothing -> Bucket
-> Bucket
-> PutObjectOptions
-> Maybe Int64
-> ConduitM () ByteString Minio ()
-> Minio Bucket
sequentialMultipartUpload Bucket
b Bucket
o PutObjectOptions
opts Maybe Int64
forall a. Maybe a
Nothing ConduitM () ByteString Minio ()
src
Just Int64
size ->
if
| Int64
size Int64 -> Int64 -> Bool
forall a. Ord a => a -> a -> Bool
<= Int64
64 Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
* Int64
oneMiB -> do
ByteString
bs <- ConduitT () Void Minio ByteString -> Minio ByteString
forall (m :: * -> *) r. Monad m => ConduitT () Void m r -> m r
C.runConduit (ConduitT () Void Minio ByteString -> Minio ByteString)
-> ConduitT () Void Minio ByteString -> Minio ByteString
forall a b. (a -> b) -> a -> b
$ ConduitM () ByteString Minio ()
src ConduitM () ByteString Minio ()
-> ConduitM ByteString Void Minio ByteString
-> ConduitT () Void Minio ByteString
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
C..| Int -> ConduitT ByteString ByteString Minio ()
forall (m :: * -> *) a. Monad m => Int -> ConduitT a a m ()
takeC (Int64 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int64
size) ConduitT ByteString ByteString Minio ()
-> ConduitM ByteString Void Minio ByteString
-> ConduitM ByteString Void Minio ByteString
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
C..| ConduitM ByteString Void Minio ByteString
forall (m :: * -> *) o.
Monad m =>
ConduitT ByteString o m ByteString
CB.sinkLbs
Bucket -> Bucket -> [Header] -> ByteString -> Minio Bucket
putObjectSingle' Bucket
b Bucket
o (PutObjectOptions -> [Header]
pooToHeaders PutObjectOptions
opts) (ByteString -> Minio Bucket) -> ByteString -> Minio Bucket
forall a b. (a -> b) -> a -> b
$ ByteString -> ByteString
LBS.toStrict ByteString
bs
| Int64
size Int64 -> Int64 -> Bool
forall a. Ord a => a -> a -> Bool
> Int64
maxObjectSize -> MErrV -> Minio Bucket
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO (MErrV -> Minio Bucket) -> MErrV -> Minio Bucket
forall a b. (a -> b) -> a -> b
$ Int64 -> MErrV
MErrVPutSizeExceeded Int64
size
| Bool
otherwise -> Bucket
-> Bucket
-> PutObjectOptions
-> Maybe Int64
-> ConduitM () ByteString Minio ()
-> Minio Bucket
sequentialMultipartUpload Bucket
b Bucket
o PutObjectOptions
opts (Int64 -> Maybe Int64
forall a. a -> Maybe a
Just Int64
size) ConduitM () ByteString Minio ()
src
putObjectInternal Bucket
b Bucket
o PutObjectOptions
opts (ODFile FilePath
fp Maybe Int64
sizeMay) = do
Either IOException (Bool, Maybe Int64)
hResE <- FilePath
-> (Handle -> Minio (Bool, Maybe Int64))
-> Minio (Either IOException (Bool, Maybe Int64))
forall (m :: * -> *) a.
(MonadUnliftIO m, MonadResource m) =>
FilePath -> (Handle -> m a) -> m (Either IOException a)
withNewHandle FilePath
fp ((Handle -> Minio (Bool, Maybe Int64))
-> Minio (Either IOException (Bool, Maybe Int64)))
-> (Handle -> Minio (Bool, Maybe Int64))
-> Minio (Either IOException (Bool, Maybe Int64))
forall a b. (a -> b) -> a -> b
$ \Handle
h ->
(Bool -> Maybe Int64 -> (Bool, Maybe Int64))
-> Minio Bool -> Minio (Maybe Int64) -> Minio (Bool, Maybe Int64)
forall (f :: * -> *) a b c.
Applicative f =>
(a -> b -> c) -> f a -> f b -> f c
liftA2 (,) (Handle -> Minio Bool
forall (m :: * -> *). MonadResource m => Handle -> m Bool
isHandleSeekable Handle
h) (Handle -> Minio (Maybe Int64)
forall (m :: * -> *). MonadUnliftIO m => Handle -> m (Maybe Int64)
getFileSize Handle
h)
(Bool
isSeekable, Maybe Int64
handleSizeMay) <-
(IOException -> Minio (Bool, Maybe Int64))
-> ((Bool, Maybe Int64) -> Minio (Bool, Maybe Int64))
-> Either IOException (Bool, Maybe Int64)
-> Minio (Bool, Maybe Int64)
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either
(Minio (Bool, Maybe Int64)
-> IOException -> Minio (Bool, Maybe Int64)
forall a b. a -> b -> a
const (Minio (Bool, Maybe Int64)
-> IOException -> Minio (Bool, Maybe Int64))
-> Minio (Bool, Maybe Int64)
-> IOException
-> Minio (Bool, Maybe Int64)
forall a b. (a -> b) -> a -> b
$ (Bool, Maybe Int64) -> Minio (Bool, Maybe Int64)
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool
False, Maybe Int64
forall a. Maybe a
Nothing))
(Bool, Maybe Int64) -> Minio (Bool, Maybe Int64)
forall (m :: * -> *) a. Monad m => a -> m a
return
Either IOException (Bool, Maybe Int64)
hResE
let finalSizeMay :: Maybe Int64
finalSizeMay = [Int64] -> Maybe Int64
forall a. [a] -> Maybe a
listToMaybe ([Int64] -> Maybe Int64) -> [Int64] -> Maybe Int64
forall a b. (a -> b) -> a -> b
$ [Maybe Int64] -> [Int64]
forall a. [Maybe a] -> [a]
catMaybes [Maybe Int64
sizeMay, Maybe Int64
handleSizeMay]
case Maybe Int64
finalSizeMay of
Maybe Int64
Nothing -> Bucket
-> Bucket
-> PutObjectOptions
-> Maybe Int64
-> ConduitM () ByteString Minio ()
-> Minio Bucket
sequentialMultipartUpload Bucket
b Bucket
o PutObjectOptions
opts Maybe Int64
forall a. Maybe a
Nothing (ConduitM () ByteString Minio () -> Minio Bucket)
-> ConduitM () ByteString Minio () -> Minio Bucket
forall a b. (a -> b) -> a -> b
$ FilePath -> ConduitM () ByteString Minio ()
forall (m :: * -> *) i.
MonadResource m =>
FilePath -> ConduitT i ByteString m ()
CB.sourceFile FilePath
fp
Just Int64
size ->
if
| Int64
size Int64 -> Int64 -> Bool
forall a. Ord a => a -> a -> Bool
<= Int64
64 Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
* Int64
oneMiB ->
(IOException -> Minio Bucket)
-> (Bucket -> Minio Bucket)
-> Either IOException Bucket
-> Minio Bucket
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either IOException -> Minio Bucket
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO Bucket -> Minio Bucket
forall (m :: * -> *) a. Monad m => a -> m a
return
(Either IOException Bucket -> Minio Bucket)
-> Minio (Either IOException Bucket) -> Minio Bucket
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< FilePath
-> (Handle -> Minio Bucket) -> Minio (Either IOException Bucket)
forall (m :: * -> *) a.
(MonadUnliftIO m, MonadResource m) =>
FilePath -> (Handle -> m a) -> m (Either IOException a)
withNewHandle FilePath
fp (\Handle
h -> Bucket
-> Bucket -> [Header] -> Handle -> Int64 -> Int64 -> Minio Bucket
putObjectSingle Bucket
b Bucket
o (PutObjectOptions -> [Header]
pooToHeaders PutObjectOptions
opts) Handle
h Int64
0 Int64
size)
| Int64
size Int64 -> Int64 -> Bool
forall a. Ord a => a -> a -> Bool
> Int64
maxObjectSize -> MErrV -> Minio Bucket
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO (MErrV -> Minio Bucket) -> MErrV -> Minio Bucket
forall a b. (a -> b) -> a -> b
$ Int64 -> MErrV
MErrVPutSizeExceeded Int64
size
| Bool
isSeekable -> Bucket
-> Bucket -> PutObjectOptions -> FilePath -> Int64 -> Minio Bucket
parallelMultipartUpload Bucket
b Bucket
o PutObjectOptions
opts FilePath
fp Int64
size
| Bool
otherwise ->
Bucket
-> Bucket
-> PutObjectOptions
-> Maybe Int64
-> ConduitM () ByteString Minio ()
-> Minio Bucket
sequentialMultipartUpload Bucket
b Bucket
o PutObjectOptions
opts (Int64 -> Maybe Int64
forall a. a -> Maybe a
Just Int64
size) (ConduitM () ByteString Minio () -> Minio Bucket)
-> ConduitM () ByteString Minio () -> Minio Bucket
forall a b. (a -> b) -> a -> b
$
FilePath -> ConduitM () ByteString Minio ()
forall (m :: * -> *) i.
MonadResource m =>
FilePath -> ConduitT i ByteString m ()
CB.sourceFile FilePath
fp
parallelMultipartUpload ::
Bucket ->
Object ->
PutObjectOptions ->
FilePath ->
Int64 ->
Minio ETag
parallelMultipartUpload :: Bucket
-> Bucket -> PutObjectOptions -> FilePath -> Int64 -> Minio Bucket
parallelMultipartUpload Bucket
b Bucket
o PutObjectOptions
opts FilePath
filePath Int64
size = do
Bucket
uploadId <- Bucket -> Bucket -> [Header] -> Minio Bucket
newMultipartUpload Bucket
b Bucket
o (PutObjectOptions -> [Header]
pooToHeaders PutObjectOptions
opts)
let partSizeInfo :: [(PartNumber, Int64, Int64)]
partSizeInfo = Int64 -> [(PartNumber, Int64, Int64)]
selectPartSizes Int64
size
let threads :: Word
threads = Word -> Maybe Word -> Word
forall a. a -> Maybe a -> a
fromMaybe Word
10 (Maybe Word -> Word) -> Maybe Word -> Word
forall a b. (a -> b) -> a -> b
$ PutObjectOptions -> Maybe Word
pooNumThreads PutObjectOptions
opts
[Either IOException PartTuple]
uploadedPartsE <-
Int
-> ((PartNumber, Int64, Int64)
-> Minio (Either IOException PartTuple))
-> [(PartNumber, Int64, Int64)]
-> Minio [Either IOException PartTuple]
forall (m :: * -> *) t a.
MonadUnliftIO m =>
Int -> (t -> m a) -> [t] -> m [a]
limitedMapConcurrently
(Word -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word
threads)
(Bucket
-> (PartNumber, Int64, Int64)
-> Minio (Either IOException PartTuple)
uploadPart Bucket
uploadId)
[(PartNumber, Int64, Int64)]
partSizeInfo
(IOException -> Minio Any) -> [IOException] -> Minio ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ IOException -> Minio Any
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO ([IOException] -> Minio ()) -> [IOException] -> Minio ()
forall a b. (a -> b) -> a -> b
$ [Either IOException PartTuple] -> [IOException]
forall a b. [Either a b] -> [a]
lefts [Either IOException PartTuple]
uploadedPartsE
Bucket -> Bucket -> Bucket -> [PartTuple] -> Minio Bucket
completeMultipartUpload Bucket
b Bucket
o Bucket
uploadId ([PartTuple] -> Minio Bucket) -> [PartTuple] -> Minio Bucket
forall a b. (a -> b) -> a -> b
$ [Either IOException PartTuple] -> [PartTuple]
forall a b. [Either a b] -> [b]
rights [Either IOException PartTuple]
uploadedPartsE
where
uploadPart :: Bucket
-> (PartNumber, Int64, Int64)
-> Minio (Either IOException PartTuple)
uploadPart Bucket
uploadId (PartNumber
partNum, Int64
offset, Int64
sz) =
FilePath
-> (Handle -> Minio PartTuple)
-> Minio (Either IOException PartTuple)
forall (m :: * -> *) a.
(MonadUnliftIO m, MonadResource m) =>
FilePath -> (Handle -> m a) -> m (Either IOException a)
withNewHandle FilePath
filePath ((Handle -> Minio PartTuple)
-> Minio (Either IOException PartTuple))
-> (Handle -> Minio PartTuple)
-> Minio (Either IOException PartTuple)
forall a b. (a -> b) -> a -> b
$ \Handle
h -> do
let payload :: Payload
payload = Handle -> Int64 -> Int64 -> Payload
PayloadH Handle
h Int64
offset Int64
sz
Bucket
-> Bucket
-> Bucket
-> PartNumber
-> [Header]
-> Payload
-> Minio PartTuple
putObjectPart Bucket
b Bucket
o Bucket
uploadId PartNumber
partNum [] Payload
payload
sequentialMultipartUpload ::
Bucket ->
Object ->
PutObjectOptions ->
Maybe Int64 ->
C.ConduitM () ByteString Minio () ->
Minio ETag
sequentialMultipartUpload :: Bucket
-> Bucket
-> PutObjectOptions
-> Maybe Int64
-> ConduitM () ByteString Minio ()
-> Minio Bucket
sequentialMultipartUpload Bucket
b Bucket
o PutObjectOptions
opts Maybe Int64
sizeMay ConduitM () ByteString Minio ()
src = do
Bucket
uploadId <- Bucket -> Bucket -> [Header] -> Minio Bucket
newMultipartUpload Bucket
b Bucket
o (PutObjectOptions -> [Header]
pooToHeaders PutObjectOptions
opts)
let partSizes :: [(PartNumber, Int64, Int64)]
partSizes = Int64 -> [(PartNumber, Int64, Int64)]
selectPartSizes (Int64 -> [(PartNumber, Int64, Int64)])
-> Int64 -> [(PartNumber, Int64, Int64)]
forall a b. (a -> b) -> a -> b
$ Int64 -> (Int64 -> Int64) -> Maybe Int64 -> Int64
forall b a. b -> (a -> b) -> Maybe a -> b
maybe Int64
maxObjectSize Int64 -> Int64
forall a. a -> a
identity Maybe Int64
sizeMay
([PartNumber]
pnums, [Int64]
_, [Int64]
sizes) = [(PartNumber, Int64, Int64)] -> ([PartNumber], [Int64], [Int64])
forall a b c. [(a, b, c)] -> ([a], [b], [c])
List.unzip3 [(PartNumber, Int64, Int64)]
partSizes
[PartTuple]
uploadedParts <-
ConduitT () Void Minio [PartTuple] -> Minio [PartTuple]
forall (m :: * -> *) r. Monad m => ConduitT () Void m r -> m r
C.runConduit (ConduitT () Void Minio [PartTuple] -> Minio [PartTuple])
-> ConduitT () Void Minio [PartTuple] -> Minio [PartTuple]
forall a b. (a -> b) -> a -> b
$
ConduitM () ByteString Minio ()
src
ConduitM () ByteString Minio ()
-> ConduitM ByteString Void Minio [PartTuple]
-> ConduitT () Void Minio [PartTuple]
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
C..| [Int] -> ConduitT ByteString ByteString Minio ()
forall (m :: * -> *).
Monad m =>
[Int] -> ConduitM ByteString ByteString m ()
chunkBSConduit ((Int64 -> Int) -> [Int64] -> [Int]
forall a b. (a -> b) -> [a] -> [b]
map Int64 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral [Int64]
sizes)
ConduitT ByteString ByteString Minio ()
-> ConduitM ByteString Void Minio [PartTuple]
-> ConduitM ByteString Void Minio [PartTuple]
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
C..| (ByteString -> Payload) -> ConduitT ByteString Payload Minio ()
forall (m :: * -> *) a b. Monad m => (a -> b) -> ConduitT a b m ()
CL.map ByteString -> Payload
PayloadBS
ConduitT ByteString Payload Minio ()
-> ConduitM Payload Void Minio [PartTuple]
-> ConduitM ByteString Void Minio [PartTuple]
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
C..| Bucket -> [PartNumber] -> ConduitT Payload PartTuple Minio ()
uploadPart' Bucket
uploadId [PartNumber]
pnums
ConduitT Payload PartTuple Minio ()
-> ConduitM PartTuple Void Minio [PartTuple]
-> ConduitM Payload Void Minio [PartTuple]
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
C..| ConduitM PartTuple Void Minio [PartTuple]
forall (m :: * -> *) a o. Monad m => ConduitT a o m [a]
CC.sinkList
Bucket -> Bucket -> Bucket -> [PartTuple] -> Minio Bucket
completeMultipartUpload Bucket
b Bucket
o Bucket
uploadId [PartTuple]
uploadedParts
where
uploadPart' :: Bucket -> [PartNumber] -> ConduitT Payload PartTuple Minio ()
uploadPart' Bucket
_ [] = () -> ConduitT Payload PartTuple Minio ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
uploadPart' Bucket
uid (PartNumber
pn : [PartNumber]
pns) = do
Maybe Payload
payloadMay <- ConduitT Payload PartTuple Minio (Maybe Payload)
forall (m :: * -> *) i. Monad m => Consumer i m (Maybe i)
C.await
case Maybe Payload
payloadMay of
Maybe Payload
Nothing -> () -> ConduitT Payload PartTuple Minio ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Just Payload
payload -> do
PartTuple
pinfo <- Minio PartTuple -> ConduitT Payload PartTuple Minio PartTuple
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (Minio PartTuple -> ConduitT Payload PartTuple Minio PartTuple)
-> Minio PartTuple -> ConduitT Payload PartTuple Minio PartTuple
forall a b. (a -> b) -> a -> b
$ Bucket
-> Bucket
-> Bucket
-> PartNumber
-> [Header]
-> Payload
-> Minio PartTuple
putObjectPart Bucket
b Bucket
o Bucket
uid PartNumber
pn [] Payload
payload
PartTuple -> ConduitT Payload PartTuple Minio ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
C.yield PartTuple
pinfo
Bucket -> [PartNumber] -> ConduitT Payload PartTuple Minio ()
uploadPart' Bucket
uid [PartNumber]
pns