{-# LANGUAGE ScopedTypeVariables #-}

-- | @pipes@ utilities for uploading data to AWS S3 objects.
module Pipes.Aws.S3.Upload
    ( -- | These internally use the S3 multi-part upload interface to achieve
      -- streaming upload behavior.
      --
      -- In the case of failure one of two exceptions will be thrown,
      --
      --   - 'EmptyS3UploadError': In the event that the 'Producer' fails to
      --     produce any content to upload
      --
      --   - 'FailedUploadError': In any other case.
      --
      -- The 'FailedUploadError' exception carries the 'UploadId' of the failed
      -- upload as well as the inner exception. Note that while the library makes
      -- an attempt to clean up the parts of the partial upload, there may still
      -- be remnants due to limitations in the @aws@ library.
      --
      toS3
    , toS3'
    , toS3WithManager

      -- * Chunk size
    , ChunkSize
    , defaultChunkSize

      -- * Error handling
    , EmptyS3UploadError(..)
    , FailedUploadError(..)
    , UploadId(..)
    ) where

import Control.Monad (when)
import Control.Exception (Exception)

import qualified Data.ByteString as BS
import qualified Data.Text as T

import Pipes
import Pipes.Safe
import qualified Pipes.Prelude as PP
import qualified Pipes.ByteString as PBS
import Control.Monad.Trans.Resource
import Network.HTTP.Client
import Network.HTTP.Client.TLS
import qualified Aws
import qualified Aws.S3 as S3

import Pipes.Aws.S3.Types

-- | Thrown when an upload with no data is attempted.
data EmptyS3UploadError = EmptyS3UploadError Bucket Object
                        deriving (Show)

instance Exception EmptyS3UploadError

-- | An identifier representing an active upload.
newtype UploadId = UploadId T.Text
                 deriving (Show, Eq, Ord)

-- | Thrown when an error occurs during an upload.
data FailedUploadError = FailedUploadError { failedUploadBucket    :: Bucket
                                           , failedUploadObject    :: Object
                                           , failedUploadException :: SomeException
                                           , failedUploadId        :: UploadId
                                           }
                       deriving (Show)

instance Exception FailedUploadError

-- | To maintain healthy streaming uploads are performed in a chunked manner.
-- This is the size of the upload chunk size in bytes. Due to S3 interface
-- restrictions this must be at least five megabytes.
type ChunkSize = Int

-- | A reasonable chunk size of 10 megabytes.
defaultChunkSize :: ChunkSize
defaultChunkSize = 10 * 1024 * 1024

type ETag = T.Text
type PartN = Integer

-- | Upload content to an S3 object.
--
-- May throw a 'EmptyS3UploadError' if the producer fails to provide any content.
toS3 :: forall m a. (MonadIO m, MonadCatch m)
     => ChunkSize -> Bucket -> Object
     -> Producer BS.ByteString m a
     -> m a
toS3 chunkSize bucket object consumer = do
    cfg <- Aws.baseConfiguration
    toS3' cfg Aws.defServiceConfig chunkSize bucket object consumer

-- | Upload content to an S3 object, explicitly specifying an
-- 'Aws.Configuration', which provides credentials and logging configuration.
--
-- May throw a 'EmptyS3UploadError' if the producer fails to provide any content.
toS3' :: forall m a. (MonadIO m, MonadCatch m)
      => Aws.Configuration                    -- ^ e.g. from 'Aws.baseConfiguration'
      -> S3.S3Configuration Aws.NormalQuery   -- ^ e.g. 'Aws.defServiceConfig'
      -> ChunkSize -> Bucket -> Object
      -> Producer BS.ByteString m a
      -> m a
toS3' cfg s3cfg chunkSize bucket object consumer = do
    mgr <- liftIO $ newManager tlsManagerSettings
    toS3WithManager mgr cfg s3cfg chunkSize bucket object consumer

-- | Download an object from S3 explicitly specifying an @http-client@ 'Manager'
-- and @aws@ 'Aws.Configuration' (which provides credentials and logging
-- configuration).
--
-- This can be more efficient when submitting many small requests as it allows
-- re-use of the 'Manager' across requests. Note that the 'Manager' provided
-- must support TLS; such a manager can be created with
--
-- @
-- import qualified Aws.Core as Aws
-- import qualified Network.HTTP.Client as HTTP.Client
-- import qualified Network.HTTP.Client.TLS as HTTP.Client.TLS
--
-- putObject :: MonadSafe m => Bucket -> Object -> Producer BS.ByteString m () -> m ()
-- putObject bucket object prod = do
--     cfg <- liftIO 'Aws.baseConfiguration'
--     mgr <- liftIO $ 'newManager' 'HTTP.Client.TLS.tlsManagerSettings'
--     'toS3WithManager' mgr cfg 'Aws.defServiceConfig' defaultChunkSize bucket object prod
-- @
--
-- May throw a 'EmptyS3UploadError' if the producer fails to provide any content.
toS3WithManager :: forall m a. (MonadIO m, MonadCatch m)
      => Manager
      -> Aws.Configuration                    -- ^ e.g. from 'Aws.baseConfiguration'
      -> S3.S3Configuration Aws.NormalQuery   -- ^ e.g. 'Aws.defServiceConfig'
      -> ChunkSize -> Bucket -> Object
      -> Producer BS.ByteString m a
      -> m a
toS3WithManager mgr cfg s3cfg chunkSize bucket object consumer = do
    let Bucket bucketName = bucket
        Object objectName = object
    resp1 <- liftIO $ runResourceT
             $ Aws.pureAws cfg s3cfg mgr
             $ S3.postInitiateMultipartUpload bucketName objectName
    let uploadId = S3.imurUploadId resp1
        abortUpload err
            -- Otherwise we apparently get a 'Missing root element' error
            -- when aborting.
          | Just (EmptyS3UploadError _ _) <- fromException err = throwM err
          | otherwise = do
              resp <- liftIO $ runResourceT $ Aws.aws cfg s3cfg mgr
                             $ S3.postAbortMultipartUpload bucketName objectName uploadId
              case Aws.responseResult resp of
                Left err' -> throwM err'
                Right _   -> throwM $ FailedUploadError bucket object err (UploadId uploadId)

    handleAll abortUpload $ do
        let uploadPart :: (PartN, BS.ByteString) -> m (PartN, ETag)
            uploadPart (partN, content) = do
                resp <- liftIO $ runResourceT
                        $ Aws.pureAws cfg s3cfg mgr
                        $ S3.uploadPart bucketName objectName
                                        partN uploadId (RequestBodyBS content)
                return (partN, S3.uprETag resp)

        (parts, res) <- PP.toListM' $ PBS.chunksOf' chunkSize consumer
                                  >-> PP.filter (not . BS.null)
                                  >-> enumFromP 1
                                  >-> PP.mapM uploadPart

        -- We handle this specifically to provide a more sensible error than
        -- "Missing root element"
        when (null parts)
            $ throwM (EmptyS3UploadError bucket object)

        _ <- liftIO $ runResourceT
            $ Aws.pureAws cfg s3cfg mgr
            $ S3.postCompleteMultipartUpload bucketName objectName uploadId parts
        return res

enumFromP :: (Monad m, Enum i) => i -> Pipe a (i, a) m r
enumFromP = go
  where
    go i = await >>= \x -> yield (i, x) >> go (succ i)