--
-- Minio Haskell SDK, (C) 2017 Minio, Inc.
--
-- Licensed under the Apache License, Version 2.0 (the "License");
-- you may not use this file except in compliance with the License.
-- You may obtain a copy of the License at
--
--     http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--

module Network.Minio.PutObject
  (
    putObjectInternal
  , ObjectData(..)
  , selectPartSizes
  ) where


import qualified Data.ByteString.Lazy     as LBS
import qualified Data.Conduit             as C
import qualified Data.Conduit.Binary      as CB
import qualified Data.Conduit.Combinators as CC
import qualified Data.Conduit.List        as CL
import qualified Data.List                as List

import           Lib.Prelude

import           Network.Minio.Data
import           Network.Minio.Errors
import           Network.Minio.S3API
import           Network.Minio.Utils


-- | A data-type to represent the source data for an object. A
-- file-path or a producer-conduit may be provided.
--
-- For files, a size may be provided - this is useful in cases when
-- the file size cannot be automatically determined or if only some
-- prefix of the file is desired.
--
-- For streams also, a size may be provided. This is useful to limit
-- the input - if it is not provided, upload will continue until the
-- stream ends or the object reaches `maxObjectsize` size.
data ObjectData m
  = ODFile FilePath (Maybe Int64) -- ^ Takes filepath and optional
                                  -- size.
  | ODStream (C.ConduitM () ByteString m ()) (Maybe Int64) -- ^ Pass
                                                           -- size
                                                           -- (bytes)
                                                           -- if
                                                           -- known.

-- | Put an object from ObjectData. This high-level API handles
-- objects of all sizes, and even if the object size is unknown.
putObjectInternal :: Bucket -> Object -> PutObjectOptions
                  -> ObjectData Minio -> Minio ETag
putObjectInternal b o opts (ODStream src sizeMay) = do
  case sizeMay of
    -- unable to get size, so assume non-seekable file and max-object size
    Nothing -> sequentialMultipartUpload b o opts (Just maxObjectSize) src

    -- got file size, so check for single/multipart upload
    Just size ->
      if | size <= 64 * oneMiB -> do
             bs <- C.runConduit $ src C..| CB.sinkLbs
             putObjectSingle' b o (pooToHeaders opts) $ LBS.toStrict bs
         | size > maxObjectSize -> throwIO $ MErrVPutSizeExceeded size
         | otherwise -> sequentialMultipartUpload b o opts (Just size) src

putObjectInternal b o opts (ODFile fp sizeMay) = do
  hResE <- withNewHandle fp $ \h ->
    liftM2 (,) (isHandleSeekable h) (getFileSize h)

  (isSeekable, handleSizeMay) <- either (const $ return (False, Nothing)) return
                                 hResE

  -- prefer given size to queried size.
  let finalSizeMay = listToMaybe $ catMaybes [sizeMay, handleSizeMay]

  case finalSizeMay of
    -- unable to get size, so assume non-seekable file and max-object size
    Nothing -> sequentialMultipartUpload b o opts (Just maxObjectSize) $
               CB.sourceFile fp

    -- got file size, so check for single/multipart upload
    Just size ->
      if | size <= 64 * oneMiB -> either throwIO return =<<
           withNewHandle fp (\h -> putObjectSingle b o (pooToHeaders opts) h 0 size)
         | size > maxObjectSize -> throwIO $ MErrVPutSizeExceeded size
         | isSeekable -> parallelMultipartUpload b o opts fp size
         | otherwise -> sequentialMultipartUpload b o opts (Just size) $
                        CB.sourceFile fp

parallelMultipartUpload :: Bucket -> Object -> PutObjectOptions
                        -> FilePath -> Int64 -> Minio ETag
parallelMultipartUpload b o opts filePath size = do
  -- get a new upload id.
  uploadId <- newMultipartUpload b o (pooToHeaders opts)

  let partSizeInfo = selectPartSizes size

  let threads = fromMaybe 10 $ pooNumThreads opts

  -- perform upload with 'threads' threads
  uploadedPartsE <- limitedMapConcurrently (fromIntegral threads)
                    (uploadPart uploadId) partSizeInfo

  -- if there were any errors, rethrow exception.
  mapM_ throwIO $ lefts uploadedPartsE

  -- if we get here, all parts were successfully uploaded.
  completeMultipartUpload b o uploadId $ rights uploadedPartsE

  where
    uploadPart uploadId (partNum, offset, sz) =
      withNewHandle filePath $ \h -> do
        let payload = PayloadH h offset sz
        putObjectPart b o uploadId partNum [] payload

-- | Upload multipart object from conduit source sequentially
sequentialMultipartUpload :: Bucket -> Object -> PutObjectOptions
                          -> Maybe Int64
                          -> C.ConduitM () ByteString Minio ()
                          -> Minio ETag
sequentialMultipartUpload b o opts sizeMay src = do
  -- get a new upload id.
  uploadId <- newMultipartUpload b o (pooToHeaders opts)

  -- upload parts in loop
  let partSizes = selectPartSizes $ maybe maxObjectSize identity sizeMay
      (pnums, _, sizes) = List.unzip3 partSizes
  uploadedParts <- C.runConduit
                 $ src
              C..| chunkBSConduit sizes
              C..| CL.map PayloadBS
              C..| uploadPart' uploadId pnums
              C..| CC.sinkList

  -- complete multipart upload
  completeMultipartUpload b o uploadId uploadedParts

  where
    uploadPart' _ [] = return ()
    uploadPart' uid (pn:pns) = do
      payloadMay <- C.await
      case payloadMay of
        Nothing -> return ()
        Just payload -> do pinfo <- lift $ putObjectPart b o uid pn [] payload
                           C.yield pinfo
                           uploadPart' uid pns