{-# LANGUAGE GeneralizedNewtypeDeriving #-} {-# LANGUAGE ScopedTypeVariables #-} -- | A simple streaming interface to the AWS S3 storage service. module Pipes.Aws.S3 ( Bucket(..) , Object(..) -- * Downloading , fromS3 , fromS3' -- ** Convenient re-exports , responseBody -- * Uploading , ChunkSize , toS3 , toS3' ) where import Control.Monad (unless) import Data.String (IsString) import qualified Data.ByteString as BS import Data.ByteString (ByteString) import qualified Data.Text as T import Pipes import Pipes.Safe import qualified Pipes.Prelude as PP import qualified Pipes.ByteString as PBS import Control.Monad.Trans.Resource import Control.Monad.IO.Class import Network.HTTP.Client import Network.HTTP.Client.TLS import qualified Aws import qualified Aws.Core as Aws import qualified Aws.S3 as S3 -- | An AWS S3 bucket name newtype Bucket = Bucket T.Text deriving (Eq, Ord, Show, Read, IsString) -- | An AWS S3 object name newtype Object = Object T.Text deriving (Eq, Ord, Show, Read, IsString) -- | Download an object from S3 -- -- This initiates an S3 download, requiring that the caller provide a way to -- construct a 'Producer' from the initial 'Response' to the request (allowing -- the caller to, e.g., handle failure). -- -- For instance to merely produced the content of the response, -- -- @ -- 'fromS3' bucket object responseBody -- @ -- fromS3 :: MonadSafe m => Bucket -> Object -> (Response (Producer BS.ByteString m ()) -> Producer BS.ByteString m a) -> Producer BS.ByteString m a fromS3 bucket object handler = do cfg <- liftIO Aws.baseConfiguration fromS3' cfg bucket object handler -- | Download an object from S3 explicitly specifying an @aws@ 'Aws.Configuration', -- which provides credentials and logging configuration. fromS3' :: MonadSafe m => Aws.Configuration -> Bucket -> Object -> (Response (Producer BS.ByteString m ()) -> Producer BS.ByteString m a) -> Producer BS.ByteString m a fromS3' cfg (Bucket bucket) (Object object) handler = do let s3cfg = Aws.defServiceConfig :: S3.S3Configuration Aws.NormalQuery mgr <- liftIO $ newManager tlsManagerSettings req <- liftIO $ buildRequest cfg s3cfg $ S3.getObject bucket object Pipes.Safe.bracket (liftIO $ responseOpen req mgr) (liftIO . responseClose) $ \resp -> handler $ resp { responseBody = from $ brRead $ responseBody resp } -- Stolen from pipes-http withHTTP :: MonadSafe m => Request -> Manager -> (Response (Producer ByteString m ()) -> m a) -> m a withHTTP req mgr k = Pipes.Safe.bracket (liftIO $ responseOpen req mgr) (liftIO . responseClose) k' where k' resp = do let p = (from . brRead . responseBody) resp k (resp { responseBody = p}) from :: MonadIO m => IO ByteString -> Producer ByteString m () from io = go where go = do bs <- liftIO io unless (BS.null bs) $ do yield bs go buildRequest :: (MonadIO m, Aws.Transaction r a) => Aws.Configuration -> Aws.ServiceConfiguration r Aws.NormalQuery -> r -> m Request buildRequest cfg scfg req = do Just cred <- Aws.loadCredentialsDefault sigData <- liftIO $ Aws.signatureData Aws.Timestamp cred let signed = Aws.signQuery req scfg sigData liftIO $ Aws.queryToHttpRequest signed -- | To maintain healthy streaming uploads are performed in a chunked manner. -- This is the size of the upload chunk size. Due to S3 interface restrictions -- this must be at least five megabytes. type ChunkSize = Int type ETag = T.Text type PartN = Integer -- | Upload content to an S3 object explicitly specifying an @aws@ -- 'Aws.Configuration', which provides credentials and logging configuration. toS3 :: forall m a. MonadIO m => ChunkSize -> Bucket -> Object -> Producer BS.ByteString m a -> m a toS3 chunkSize bucket object consumer = do cfg <- Aws.baseConfiguration toS3' cfg chunkSize bucket object consumer -- | Upload content to an S3 object. -- -- This internally uses the S3 multi-part upload interface to achieve streaming -- upload behavior. toS3' :: forall m a. MonadIO m => Aws.Configuration -> ChunkSize -> Bucket -> Object -> Producer BS.ByteString m a -> m a toS3' cfg chunkSize (Bucket bucket) (Object object) consumer = do let s3cfg = Aws.defServiceConfig :: S3.S3Configuration Aws.NormalQuery mgr <- liftIO $ newManager tlsManagerSettings resp1 <- liftIO $ runResourceT $ Aws.pureAws cfg s3cfg mgr $ S3.postInitiateMultipartUpload bucket object let uploadId = S3.imurUploadId resp1 let uploadPart :: (PartN, BS.ByteString) -> m (PartN, ETag) uploadPart (partN, content) = do resp <- liftIO $ runResourceT $ Aws.pureAws cfg s3cfg mgr $ S3.uploadPart bucket object partN uploadId (RequestBodyBS content) return (partN, S3.uprETag resp) (parts, res) <- PP.toListM' $ consumer >-> enumFromP 1 >-> PP.mapM uploadPart resp2 <- liftIO $ runResourceT $ Aws.pureAws cfg s3cfg mgr $ S3.postCompleteMultipartUpload bucket object uploadId parts return res enumFromP :: (Monad m, Enum i) => i -> Pipe a (i, a) m r enumFromP = go where go i = await >>= \x -> yield (i, x) >> go (succ i)