module Network.Minio.PutObject
( putObjectInternal,
ObjectData (..),
selectPartSizes,
)
where
import Conduit (takeC)
import qualified Conduit as C
import qualified Data.ByteString.Lazy as LBS
import qualified Data.Conduit.Binary as CB
import qualified Data.Conduit.Combinators as CC
import qualified Data.Conduit.List as CL
import qualified Data.List as List
import Lib.Prelude
import Network.Minio.Data
import Network.Minio.Errors
import Network.Minio.S3API
import Network.Minio.Utils
data ObjectData m
=
ODFile FilePath (Maybe Int64)
|
ODStream (C.ConduitM () ByteString m ()) (Maybe Int64)
putObjectInternal ::
Bucket ->
Object ->
PutObjectOptions ->
ObjectData Minio ->
Minio ETag
putObjectInternal :: Bucket
-> Bucket -> PutObjectOptions -> ObjectData Minio -> Minio Bucket
putObjectInternal Bucket
b Bucket
o PutObjectOptions
opts (ODStream ConduitM () ByteString Minio ()
src Maybe Int64
sizeMay) = do
case Maybe Int64
sizeMay of
Maybe Int64
Nothing -> Bucket
-> Bucket
-> PutObjectOptions
-> Maybe Int64
-> ConduitM () ByteString Minio ()
-> Minio Bucket
sequentialMultipartUpload Bucket
b Bucket
o PutObjectOptions
opts forall a. Maybe a
Nothing ConduitM () ByteString Minio ()
src
Just Int64
size ->
if
| Int64
size forall a. Ord a => a -> a -> Bool
<= Int64
64 forall a. Num a => a -> a -> a
* Int64
oneMiB -> do
ByteString
bs <- forall (m :: * -> *) r. Monad m => ConduitT () Void m r -> m r
C.runConduit forall a b. (a -> b) -> a -> b
$ ConduitM () ByteString Minio ()
src forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
C..| forall (m :: * -> *) a. Monad m => Int -> ConduitT a a m ()
takeC (forall a b. (Integral a, Num b) => a -> b
fromIntegral Int64
size) forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
C..| forall (m :: * -> *) o.
Monad m =>
ConduitT ByteString o m ByteString
CB.sinkLbs
Bucket -> Bucket -> [Header] -> ByteString -> Minio Bucket
putObjectSingle' Bucket
b Bucket
o (PutObjectOptions -> [Header]
pooToHeaders PutObjectOptions
opts) forall a b. (a -> b) -> a -> b
$ ByteString -> ByteString
LBS.toStrict ByteString
bs
| Int64
size forall a. Ord a => a -> a -> Bool
> Int64
maxObjectSize -> forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO forall a b. (a -> b) -> a -> b
$ Int64 -> MErrV
MErrVPutSizeExceeded Int64
size
| Bool
otherwise -> Bucket
-> Bucket
-> PutObjectOptions
-> Maybe Int64
-> ConduitM () ByteString Minio ()
-> Minio Bucket
sequentialMultipartUpload Bucket
b Bucket
o PutObjectOptions
opts (forall a. a -> Maybe a
Just Int64
size) ConduitM () ByteString Minio ()
src
putObjectInternal Bucket
b Bucket
o PutObjectOptions
opts (ODFile FilePath
fp Maybe Int64
sizeMay) = do
Either IOException (Bool, Maybe Int64)
hResE <- forall (m :: * -> *) a.
(MonadUnliftIO m, MonadResource m) =>
FilePath -> (Handle -> m a) -> m (Either IOException a)
withNewHandle FilePath
fp forall a b. (a -> b) -> a -> b
$ \Handle
h ->
forall (f :: * -> *) a b c.
Applicative f =>
(a -> b -> c) -> f a -> f b -> f c
liftA2 (,) (forall (m :: * -> *). MonadResource m => Handle -> m Bool
isHandleSeekable Handle
h) (forall (m :: * -> *). MonadUnliftIO m => Handle -> m (Maybe Int64)
getFileSize Handle
h)
(Bool
isSeekable, Maybe Int64
handleSizeMay) <-
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either
(forall a b. a -> b -> a
const forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Monad m => a -> m a
return (Bool
False, forall a. Maybe a
Nothing))
forall (m :: * -> *) a. Monad m => a -> m a
return
Either IOException (Bool, Maybe Int64)
hResE
let finalSizeMay :: Maybe Int64
finalSizeMay = forall a. [a] -> Maybe a
listToMaybe forall a b. (a -> b) -> a -> b
$ forall a. [Maybe a] -> [a]
catMaybes [Maybe Int64
sizeMay, Maybe Int64
handleSizeMay]
case Maybe Int64
finalSizeMay of
Maybe Int64
Nothing -> Bucket
-> Bucket
-> PutObjectOptions
-> Maybe Int64
-> ConduitM () ByteString Minio ()
-> Minio Bucket
sequentialMultipartUpload Bucket
b Bucket
o PutObjectOptions
opts forall a. Maybe a
Nothing forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) i.
MonadResource m =>
FilePath -> ConduitT i ByteString m ()
CB.sourceFile FilePath
fp
Just Int64
size ->
if
| Int64
size forall a. Ord a => a -> a -> Bool
<= Int64
64 forall a. Num a => a -> a -> a
* Int64
oneMiB ->
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO forall (m :: * -> *) a. Monad m => a -> m a
return
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< forall (m :: * -> *) a.
(MonadUnliftIO m, MonadResource m) =>
FilePath -> (Handle -> m a) -> m (Either IOException a)
withNewHandle FilePath
fp (\Handle
h -> Bucket
-> Bucket -> [Header] -> Handle -> Int64 -> Int64 -> Minio Bucket
putObjectSingle Bucket
b Bucket
o (PutObjectOptions -> [Header]
pooToHeaders PutObjectOptions
opts) Handle
h Int64
0 Int64
size)
| Int64
size forall a. Ord a => a -> a -> Bool
> Int64
maxObjectSize -> forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO forall a b. (a -> b) -> a -> b
$ Int64 -> MErrV
MErrVPutSizeExceeded Int64
size
| Bool
isSeekable -> Bucket
-> Bucket -> PutObjectOptions -> FilePath -> Int64 -> Minio Bucket
parallelMultipartUpload Bucket
b Bucket
o PutObjectOptions
opts FilePath
fp Int64
size
| Bool
otherwise ->
Bucket
-> Bucket
-> PutObjectOptions
-> Maybe Int64
-> ConduitM () ByteString Minio ()
-> Minio Bucket
sequentialMultipartUpload Bucket
b Bucket
o PutObjectOptions
opts (forall a. a -> Maybe a
Just Int64
size) forall a b. (a -> b) -> a -> b
$
forall (m :: * -> *) i.
MonadResource m =>
FilePath -> ConduitT i ByteString m ()
CB.sourceFile FilePath
fp
parallelMultipartUpload ::
Bucket ->
Object ->
PutObjectOptions ->
FilePath ->
Int64 ->
Minio ETag
parallelMultipartUpload :: Bucket
-> Bucket -> PutObjectOptions -> FilePath -> Int64 -> Minio Bucket
parallelMultipartUpload Bucket
b Bucket
o PutObjectOptions
opts FilePath
filePath Int64
size = do
Bucket
uploadId <- Bucket -> Bucket -> [Header] -> Minio Bucket
newMultipartUpload Bucket
b Bucket
o (PutObjectOptions -> [Header]
pooToHeaders PutObjectOptions
opts)
let partSizeInfo :: [(PartNumber, Int64, Int64)]
partSizeInfo = Int64 -> [(PartNumber, Int64, Int64)]
selectPartSizes Int64
size
let threads :: Word
threads = forall a. a -> Maybe a -> a
fromMaybe Word
10 forall a b. (a -> b) -> a -> b
$ PutObjectOptions -> Maybe Word
pooNumThreads PutObjectOptions
opts
[Either IOException PartTuple]
uploadedPartsE <-
forall (m :: * -> *) t a.
MonadUnliftIO m =>
Int -> (t -> m a) -> [t] -> m [a]
limitedMapConcurrently
(forall a b. (Integral a, Num b) => a -> b
fromIntegral Word
threads)
(Bucket
-> (PartNumber, Int64, Int64)
-> Minio (Either IOException PartTuple)
uploadPart Bucket
uploadId)
[(PartNumber, Int64, Int64)]
partSizeInfo
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO forall a b. (a -> b) -> a -> b
$ forall a b. [Either a b] -> [a]
lefts [Either IOException PartTuple]
uploadedPartsE
Bucket -> Bucket -> Bucket -> [PartTuple] -> Minio Bucket
completeMultipartUpload Bucket
b Bucket
o Bucket
uploadId forall a b. (a -> b) -> a -> b
$ forall a b. [Either a b] -> [b]
rights [Either IOException PartTuple]
uploadedPartsE
where
uploadPart :: Bucket
-> (PartNumber, Int64, Int64)
-> Minio (Either IOException PartTuple)
uploadPart Bucket
uploadId (PartNumber
partNum, Int64
offset, Int64
sz) =
forall (m :: * -> *) a.
(MonadUnliftIO m, MonadResource m) =>
FilePath -> (Handle -> m a) -> m (Either IOException a)
withNewHandle FilePath
filePath forall a b. (a -> b) -> a -> b
$ \Handle
h -> do
let payload :: Payload
payload = Handle -> Int64 -> Int64 -> Payload
PayloadH Handle
h Int64
offset Int64
sz
Bucket
-> Bucket
-> Bucket
-> PartNumber
-> [Header]
-> Payload
-> Minio PartTuple
putObjectPart Bucket
b Bucket
o Bucket
uploadId PartNumber
partNum [] Payload
payload
sequentialMultipartUpload ::
Bucket ->
Object ->
PutObjectOptions ->
Maybe Int64 ->
C.ConduitM () ByteString Minio () ->
Minio ETag
sequentialMultipartUpload :: Bucket
-> Bucket
-> PutObjectOptions
-> Maybe Int64
-> ConduitM () ByteString Minio ()
-> Minio Bucket
sequentialMultipartUpload Bucket
b Bucket
o PutObjectOptions
opts Maybe Int64
sizeMay ConduitM () ByteString Minio ()
src = do
Bucket
uploadId <- Bucket -> Bucket -> [Header] -> Minio Bucket
newMultipartUpload Bucket
b Bucket
o (PutObjectOptions -> [Header]
pooToHeaders PutObjectOptions
opts)
let partSizes :: [(PartNumber, Int64, Int64)]
partSizes = Int64 -> [(PartNumber, Int64, Int64)]
selectPartSizes forall a b. (a -> b) -> a -> b
$ forall b a. b -> (a -> b) -> Maybe a -> b
maybe Int64
maxObjectSize forall a. a -> a
identity Maybe Int64
sizeMay
([PartNumber]
pnums, [Int64]
_, [Int64]
sizes) = forall a b c. [(a, b, c)] -> ([a], [b], [c])
List.unzip3 [(PartNumber, Int64, Int64)]
partSizes
[PartTuple]
uploadedParts <-
forall (m :: * -> *) r. Monad m => ConduitT () Void m r -> m r
C.runConduit forall a b. (a -> b) -> a -> b
$
ConduitM () ByteString Minio ()
src
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
C..| forall (m :: * -> *).
Monad m =>
[Int] -> ConduitM ByteString ByteString m ()
chunkBSConduit (forall a b. (a -> b) -> [a] -> [b]
map forall a b. (Integral a, Num b) => a -> b
fromIntegral [Int64]
sizes)
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
C..| forall (m :: * -> *) a b. Monad m => (a -> b) -> ConduitT a b m ()
CL.map ByteString -> Payload
PayloadBS
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
C..| Bucket -> [PartNumber] -> ConduitT Payload PartTuple Minio ()
uploadPart' Bucket
uploadId [PartNumber]
pnums
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
C..| forall (m :: * -> *) a o. Monad m => ConduitT a o m [a]
CC.sinkList
Bucket -> Bucket -> Bucket -> [PartTuple] -> Minio Bucket
completeMultipartUpload Bucket
b Bucket
o Bucket
uploadId [PartTuple]
uploadedParts
where
uploadPart' :: Bucket -> [PartNumber] -> ConduitT Payload PartTuple Minio ()
uploadPart' Bucket
_ [] = forall (m :: * -> *) a. Monad m => a -> m a
return ()
uploadPart' Bucket
uid (PartNumber
pn : [PartNumber]
pns) = do
Maybe Payload
payloadMay <- forall (m :: * -> *) i o. Monad m => ConduitT i o m (Maybe i)
C.await
case Maybe Payload
payloadMay of
Maybe Payload
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
Just Payload
payload -> do
PartTuple
pinfo <- forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift forall a b. (a -> b) -> a -> b
$ Bucket
-> Bucket
-> Bucket
-> PartNumber
-> [Header]
-> Payload
-> Minio PartTuple
putObjectPart Bucket
b Bucket
o Bucket
uid PartNumber
pn [] Payload
payload
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
C.yield PartTuple
pinfo
Bucket -> [PartNumber] -> ConduitT Payload PartTuple Minio ()
uploadPart' Bucket
uid [PartNumber]
pns