--
-- MinIO Haskell SDK, (C) 2017-2019 MinIO, Inc.
--
-- Licensed under the Apache License, Version 2.0 (the "License");
-- you may not use this file except in compliance with the License.
-- You may obtain a copy of the License at
--
--     http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--

module Network.Minio.PutObject
  ( putObjectInternal,
    ObjectData (..),
    selectPartSizes,
  )
where

import Conduit (takeC)
import qualified Conduit as C
import qualified Data.ByteString.Lazy as LBS
import qualified Data.Conduit.Binary as CB
import qualified Data.Conduit.Combinators as CC
import qualified Data.Conduit.List as CL
import qualified Data.List as List
import Lib.Prelude
import Network.Minio.Data
import Network.Minio.Errors
import Network.Minio.S3API
import Network.Minio.Utils

-- | A data-type to represent the source data for an object. A
-- file-path or a producer-conduit may be provided.
--
-- For files, a size may be provided - this is useful in cases when
-- the file size cannot be automatically determined or if only some
-- prefix of the file is desired.
--
-- For streams also, a size may be provided. This is useful to limit
-- the input - if it is not provided, upload will continue until the
-- stream ends or the object reaches `maxObjectSize` size.
data ObjectData m
  = -- | Takes filepath and optional
    -- size.
    ODFile FilePath (Maybe Int64)
  | -- | Pass
    -- size
    -- (bytes)
    -- if
    -- known.
    ODStream (C.ConduitM () ByteString m ()) (Maybe Int64)

-- | Put an object from ObjectData. This high-level API handles
-- objects of all sizes, and even if the object size is unknown.
putObjectInternal ::
  Bucket ->
  Object ->
  PutObjectOptions ->
  ObjectData Minio ->
  Minio ETag
putObjectInternal :: Bucket
-> Bucket -> PutObjectOptions -> ObjectData Minio -> Minio Bucket
putObjectInternal Bucket
b Bucket
o PutObjectOptions
opts (ODStream ConduitM () ByteString Minio ()
src Maybe Int64
sizeMay) = do
  case Maybe Int64
sizeMay of
    -- unable to get size, so assume non-seekable file
    Maybe Int64
Nothing -> Bucket
-> Bucket
-> PutObjectOptions
-> Maybe Int64
-> ConduitM () ByteString Minio ()
-> Minio Bucket
sequentialMultipartUpload Bucket
b Bucket
o PutObjectOptions
opts forall a. Maybe a
Nothing ConduitM () ByteString Minio ()
src
    -- got file size, so check for single/multipart upload
    Just Int64
size ->
      if
          | Int64
size forall a. Ord a => a -> a -> Bool
<= Int64
64 forall a. Num a => a -> a -> a
* Int64
oneMiB -> do
              ByteString
bs <- forall (m :: * -> *) r. Monad m => ConduitT () Void m r -> m r
C.runConduit forall a b. (a -> b) -> a -> b
$ ConduitM () ByteString Minio ()
src forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
C..| forall (m :: * -> *) a. Monad m => Int -> ConduitT a a m ()
takeC (forall a b. (Integral a, Num b) => a -> b
fromIntegral Int64
size) forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
C..| forall (m :: * -> *) o.
Monad m =>
ConduitT ByteString o m ByteString
CB.sinkLbs
              Bucket -> Bucket -> [Header] -> ByteString -> Minio Bucket
putObjectSingle' Bucket
b Bucket
o (PutObjectOptions -> [Header]
pooToHeaders PutObjectOptions
opts) forall a b. (a -> b) -> a -> b
$ ByteString -> ByteString
LBS.toStrict ByteString
bs
          | Int64
size forall a. Ord a => a -> a -> Bool
> Int64
maxObjectSize -> forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO forall a b. (a -> b) -> a -> b
$ Int64 -> MErrV
MErrVPutSizeExceeded Int64
size
          | Bool
otherwise -> Bucket
-> Bucket
-> PutObjectOptions
-> Maybe Int64
-> ConduitM () ByteString Minio ()
-> Minio Bucket
sequentialMultipartUpload Bucket
b Bucket
o PutObjectOptions
opts (forall a. a -> Maybe a
Just Int64
size) ConduitM () ByteString Minio ()
src
putObjectInternal Bucket
b Bucket
o PutObjectOptions
opts (ODFile FilePath
fp Maybe Int64
sizeMay) = do
  Either IOException (Bool, Maybe Int64)
hResE <- forall (m :: * -> *) a.
(MonadUnliftIO m, MonadResource m) =>
FilePath -> (Handle -> m a) -> m (Either IOException a)
withNewHandle FilePath
fp forall a b. (a -> b) -> a -> b
$ \Handle
h ->
    forall (f :: * -> *) a b c.
Applicative f =>
(a -> b -> c) -> f a -> f b -> f c
liftA2 (,) (forall (m :: * -> *). MonadResource m => Handle -> m Bool
isHandleSeekable Handle
h) (forall (m :: * -> *). MonadUnliftIO m => Handle -> m (Maybe Int64)
getFileSize Handle
h)

  (Bool
isSeekable, Maybe Int64
handleSizeMay) <-
    forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either
      (forall a b. a -> b -> a
const forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Monad m => a -> m a
return (Bool
False, forall a. Maybe a
Nothing))
      forall (m :: * -> *) a. Monad m => a -> m a
return
      Either IOException (Bool, Maybe Int64)
hResE

  -- prefer given size to queried size.
  let finalSizeMay :: Maybe Int64
finalSizeMay = forall a. [a] -> Maybe a
listToMaybe forall a b. (a -> b) -> a -> b
$ forall a. [Maybe a] -> [a]
catMaybes [Maybe Int64
sizeMay, Maybe Int64
handleSizeMay]

  case Maybe Int64
finalSizeMay of
    -- unable to get size, so assume non-seekable file
    Maybe Int64
Nothing -> Bucket
-> Bucket
-> PutObjectOptions
-> Maybe Int64
-> ConduitM () ByteString Minio ()
-> Minio Bucket
sequentialMultipartUpload Bucket
b Bucket
o PutObjectOptions
opts forall a. Maybe a
Nothing forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) i.
MonadResource m =>
FilePath -> ConduitT i ByteString m ()
CB.sourceFile FilePath
fp
    -- got file size, so check for single/multipart upload
    Just Int64
size ->
      if
          | Int64
size forall a. Ord a => a -> a -> Bool
<= Int64
64 forall a. Num a => a -> a -> a
* Int64
oneMiB ->
              forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO forall (m :: * -> *) a. Monad m => a -> m a
return
                forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< forall (m :: * -> *) a.
(MonadUnliftIO m, MonadResource m) =>
FilePath -> (Handle -> m a) -> m (Either IOException a)
withNewHandle FilePath
fp (\Handle
h -> Bucket
-> Bucket -> [Header] -> Handle -> Int64 -> Int64 -> Minio Bucket
putObjectSingle Bucket
b Bucket
o (PutObjectOptions -> [Header]
pooToHeaders PutObjectOptions
opts) Handle
h Int64
0 Int64
size)
          | Int64
size forall a. Ord a => a -> a -> Bool
> Int64
maxObjectSize -> forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO forall a b. (a -> b) -> a -> b
$ Int64 -> MErrV
MErrVPutSizeExceeded Int64
size
          | Bool
isSeekable -> Bucket
-> Bucket -> PutObjectOptions -> FilePath -> Int64 -> Minio Bucket
parallelMultipartUpload Bucket
b Bucket
o PutObjectOptions
opts FilePath
fp Int64
size
          | Bool
otherwise ->
              Bucket
-> Bucket
-> PutObjectOptions
-> Maybe Int64
-> ConduitM () ByteString Minio ()
-> Minio Bucket
sequentialMultipartUpload Bucket
b Bucket
o PutObjectOptions
opts (forall a. a -> Maybe a
Just Int64
size) forall a b. (a -> b) -> a -> b
$
                forall (m :: * -> *) i.
MonadResource m =>
FilePath -> ConduitT i ByteString m ()
CB.sourceFile FilePath
fp

parallelMultipartUpload ::
  Bucket ->
  Object ->
  PutObjectOptions ->
  FilePath ->
  Int64 ->
  Minio ETag
parallelMultipartUpload :: Bucket
-> Bucket -> PutObjectOptions -> FilePath -> Int64 -> Minio Bucket
parallelMultipartUpload Bucket
b Bucket
o PutObjectOptions
opts FilePath
filePath Int64
size = do
  -- get a new upload id.
  Bucket
uploadId <- Bucket -> Bucket -> [Header] -> Minio Bucket
newMultipartUpload Bucket
b Bucket
o (PutObjectOptions -> [Header]
pooToHeaders PutObjectOptions
opts)

  let partSizeInfo :: [(PartNumber, Int64, Int64)]
partSizeInfo = Int64 -> [(PartNumber, Int64, Int64)]
selectPartSizes Int64
size

  let threads :: Word
threads = forall a. a -> Maybe a -> a
fromMaybe Word
10 forall a b. (a -> b) -> a -> b
$ PutObjectOptions -> Maybe Word
pooNumThreads PutObjectOptions
opts

  -- perform upload with 'threads' threads
  [Either IOException PartTuple]
uploadedPartsE <-
    forall (m :: * -> *) t a.
MonadUnliftIO m =>
Int -> (t -> m a) -> [t] -> m [a]
limitedMapConcurrently
      (forall a b. (Integral a, Num b) => a -> b
fromIntegral Word
threads)
      (Bucket
-> (PartNumber, Int64, Int64)
-> Minio (Either IOException PartTuple)
uploadPart Bucket
uploadId)
      [(PartNumber, Int64, Int64)]
partSizeInfo

  -- if there were any errors, rethrow exception.
  forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO forall a b. (a -> b) -> a -> b
$ forall a b. [Either a b] -> [a]
lefts [Either IOException PartTuple]
uploadedPartsE

  -- if we get here, all parts were successfully uploaded.
  Bucket -> Bucket -> Bucket -> [PartTuple] -> Minio Bucket
completeMultipartUpload Bucket
b Bucket
o Bucket
uploadId forall a b. (a -> b) -> a -> b
$ forall a b. [Either a b] -> [b]
rights [Either IOException PartTuple]
uploadedPartsE
  where
    uploadPart :: Bucket
-> (PartNumber, Int64, Int64)
-> Minio (Either IOException PartTuple)
uploadPart Bucket
uploadId (PartNumber
partNum, Int64
offset, Int64
sz) =
      forall (m :: * -> *) a.
(MonadUnliftIO m, MonadResource m) =>
FilePath -> (Handle -> m a) -> m (Either IOException a)
withNewHandle FilePath
filePath forall a b. (a -> b) -> a -> b
$ \Handle
h -> do
        let payload :: Payload
payload = Handle -> Int64 -> Int64 -> Payload
PayloadH Handle
h Int64
offset Int64
sz
        Bucket
-> Bucket
-> Bucket
-> PartNumber
-> [Header]
-> Payload
-> Minio PartTuple
putObjectPart Bucket
b Bucket
o Bucket
uploadId PartNumber
partNum [] Payload
payload

-- | Upload multipart object from conduit source sequentially
sequentialMultipartUpload ::
  Bucket ->
  Object ->
  PutObjectOptions ->
  Maybe Int64 ->
  C.ConduitM () ByteString Minio () ->
  Minio ETag
sequentialMultipartUpload :: Bucket
-> Bucket
-> PutObjectOptions
-> Maybe Int64
-> ConduitM () ByteString Minio ()
-> Minio Bucket
sequentialMultipartUpload Bucket
b Bucket
o PutObjectOptions
opts Maybe Int64
sizeMay ConduitM () ByteString Minio ()
src = do
  -- get a new upload id.
  Bucket
uploadId <- Bucket -> Bucket -> [Header] -> Minio Bucket
newMultipartUpload Bucket
b Bucket
o (PutObjectOptions -> [Header]
pooToHeaders PutObjectOptions
opts)

  -- upload parts in loop
  let partSizes :: [(PartNumber, Int64, Int64)]
partSizes = Int64 -> [(PartNumber, Int64, Int64)]
selectPartSizes forall a b. (a -> b) -> a -> b
$ forall b a. b -> (a -> b) -> Maybe a -> b
maybe Int64
maxObjectSize forall a. a -> a
identity Maybe Int64
sizeMay
      ([PartNumber]
pnums, [Int64]
_, [Int64]
sizes) = forall a b c. [(a, b, c)] -> ([a], [b], [c])
List.unzip3 [(PartNumber, Int64, Int64)]
partSizes
  [PartTuple]
uploadedParts <-
    forall (m :: * -> *) r. Monad m => ConduitT () Void m r -> m r
C.runConduit forall a b. (a -> b) -> a -> b
$
      ConduitM () ByteString Minio ()
src
        forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
C..| forall (m :: * -> *).
Monad m =>
[Int] -> ConduitM ByteString ByteString m ()
chunkBSConduit (forall a b. (a -> b) -> [a] -> [b]
map forall a b. (Integral a, Num b) => a -> b
fromIntegral [Int64]
sizes)
        forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
C..| forall (m :: * -> *) a b. Monad m => (a -> b) -> ConduitT a b m ()
CL.map ByteString -> Payload
PayloadBS
        forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
C..| Bucket -> [PartNumber] -> ConduitT Payload PartTuple Minio ()
uploadPart' Bucket
uploadId [PartNumber]
pnums
        forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
C..| forall (m :: * -> *) a o. Monad m => ConduitT a o m [a]
CC.sinkList

  -- complete multipart upload
  Bucket -> Bucket -> Bucket -> [PartTuple] -> Minio Bucket
completeMultipartUpload Bucket
b Bucket
o Bucket
uploadId [PartTuple]
uploadedParts
  where
    uploadPart' :: Bucket -> [PartNumber] -> ConduitT Payload PartTuple Minio ()
uploadPart' Bucket
_ [] = forall (m :: * -> *) a. Monad m => a -> m a
return ()
    uploadPart' Bucket
uid (PartNumber
pn : [PartNumber]
pns) = do
      Maybe Payload
payloadMay <- forall (m :: * -> *) i o. Monad m => ConduitT i o m (Maybe i)
C.await
      case Maybe Payload
payloadMay of
        Maybe Payload
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
        Just Payload
payload -> do
          PartTuple
pinfo <- forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift forall a b. (a -> b) -> a -> b
$ Bucket
-> Bucket
-> Bucket
-> PartNumber
-> [Header]
-> Payload
-> Minio PartTuple
putObjectPart Bucket
b Bucket
o Bucket
uid PartNumber
pn [] Payload
payload
          forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
C.yield PartTuple
pinfo
          Bucket -> [PartNumber] -> ConduitT Payload PartTuple Minio ()
uploadPart' Bucket
uid [PartNumber]
pns