{-# LANGUAGE LambdaCase #-} {-# LANGUAGE NoImplicitPrelude #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE PackageImports #-} {-# LANGUAGE PatternSynonyms #-} {-# LANGUAGE ScopedTypeVariables #-} module Mismi.S3.Commands ( -- * Operations -- ** Existence headObject , exists , existsPrefix -- ** Size , getSize , size , sizeRecursively -- ** Delete , delete -- ** Copy , concatMultipart , copy , copyWithMode , copyMultipart -- *** Recursive copy , sync , syncWithMode -- ** Move , move -- ** Upload , upload , uploadWithMode , uploadRecursive , uploadRecursiveWithMode , multipartUpload , uploadSingle -- ** Write , write , writeWithMode -- ** Read , read , getObjects , getObjectsRecursively -- ** List , listObjects , list , listRecursively -- ** Download , download , downloadWithMode , downloadSingle , downloadWithRange , multipartDownload , downloadRecursive , downloadRecursiveWithMode -- ** Multipart , createMultipartUpload , listMultipartParts , listMultiparts , listOldMultiparts , listOldMultiparts' , abortMultipart , abortMultipart' -- *** Filter , filterOld , filterNDays -- ** Grant ACL , grantReadAccess -- * Utility , chunkFilesBySize ) where import Control.Arrow ((***)) import Control.Concurrent.Async (mapConcurrently_) import Control.Exception (ioError) import qualified Control.Exception as CE import Control.Lens ((.~), (^.), to, view) import Control.Monad.Catch (Handler(..), throwM, onException) import Control.Monad.Extra (concatMapM) import Control.Monad.IO.Class (MonadIO, liftIO) import Control.Monad.Reader (ask) import Control.Monad.Trans.Class (lift) import Control.Monad.Trans.Except (ExceptT (..), runExceptT, throwE) import Control.Monad.Trans.Bifunctor (firstT) import Control.Monad.Trans.Resource (allocate, runResourceT) import qualified Control.Retry as Retry import Data.Conduit (runConduit, (.|)) import qualified Data.Conduit as Conduit import qualified Data.Conduit.Binary as Conduit import qualified Data.Conduit.List as DC import Data.List (filter) import qualified Data.List as L import qualified Data.List.NonEmpty as NEL import Data.Maybe (maybeToList, catMaybes, isJust) import qualified Data.Text as T import qualified Data.Text.Encoding as T import qualified Data.Text.Lazy as TL import qualified Data.Text.Lazy.Encoding as TL import Data.Time.Clock (UTCTime, NominalDiffTime, getCurrentTime, addUTCTime) import Mismi.Amazonka (Env, send, paginate) import Mismi.Control import Mismi.S3.Core.Data import Mismi.S3.Data import Mismi.S3.Internal import qualified Mismi.S3.Patch.Network as N import qualified Mismi.S3.Patch.PutObjectACL as P import qualified Mismi.S3.Internal.Binary as XB import Mismi.S3.Internal.Queue (writeQueue) import Mismi.S3.Internal.Parallel (consume) import qualified Mismi.S3.Stream as Stream import Network.AWS.Data.Body (ChunkedBody (..), ChunkSize (..)) import Network.AWS.Data.Body (RqBody (..), RsBody (..), toBody) import Network.AWS.Data.Text (toText) import Network.AWS.S3 (BucketName (..)) import Network.AWS.S3 (GetObjectResponse, HeadObjectResponse) import Network.AWS.S3 (ListObjects) import Network.AWS.S3 (MetadataDirective (..)) import Network.AWS.S3 (MultipartUpload, Part) import Network.AWS.S3 (Object, ObjectKey (..)) import qualified Network.AWS.S3 as A import P import Prelude (toInteger) import System.IO (IO, IOMode (..), SeekMode (..)) import System.IO (hFileSize, hSetFileSize, withFile) import System.IO.Error (IOError) import System.Directory (createDirectoryIfMissing, doesFileExist, getDirectoryContents) import System.FilePath (FilePath, (), takeDirectory) import System.Posix.IO (OpenMode(..), openFd, closeFd, fdSeek, defaultFileFlags) import System.Posix.Files (fileSize, getFileStatus, isDirectory, isRegularFile) import qualified "unix-bytestring" System.Posix.IO.ByteString as UBS import System.IO.Error (userError) -- | Retrieves the 'HeadObjectResponse'. Handles any 404 response by converting to Maybe. -- headObject :: Address -> AWS (Maybe HeadObjectResponse) headObject a = handle404 . send . f' A.headObject $ a -- | Checks for the existence of 'Address'. -- exists :: Address -> AWS Bool exists a = headObject a >>= pure . isJust existsPrefix :: Address -> AWS Bool existsPrefix (Address (Bucket b) (Key k)) = fmap (\r -> length (view A.lorsContents r) == 1 || length (view A.lorsCommonPrefixes r) == 1) . send $ A.listObjects (BucketName b) & A.loPrefix .~ Just ((+/) k) & A.loDelimiter .~ Just '/' & A.loMaxKeys .~ Just 1 getSize :: Address -> AWS (Maybe Int) getSize a = size a >>= pure . fmap fromIntegral {-# DEPRECATED getSize "Use Mismi.S3.Commands.size instead" #-} size :: Address -> AWS (Maybe Bytes) size a = headObject a >>= pure . fmap (Bytes . fromIntegral) . maybe Nothing (^. A.horsContentLength) sizeRecursively :: Address -> AWS [Sized Address] sizeRecursively prefix = runConduit $ Stream.sizeRecursively prefix .| DC.consume -- | Delete 'Address' -- delete :: Address -> AWS () delete = void . send . f' A.deleteObject concatMultipart :: WriteMode -> Int -> [Address] -> Address -> ExceptT ConcatError AWS () concatMultipart mode fork inputs dest = do when (mode == Fail) . whenM (lift $ exists dest) . throwE $ ConcatDestinationExists $ dest when (null inputs) $ throwE NoInputFiles things <- fmap (join . catMaybes) . forM inputs $ \input -> do r <- lift $ size input case r of Nothing -> throwE $ ConcatSourceMissing input Just x -> let s = fromIntegral $ unBytes x minChunk = 5 * 1024 * 1024 -- 5 MiB chunk = 1024 * 1024 * 1024 -- 1 gb big = 5 * 1024 * 1024 -- 5 gb in case s == 0 of True -> pure Nothing False -> case s < minChunk of True -> throwE $ ConcatSourceTooSmall input s False -> case s < big of True -> pure $ Just [(input, 0, s)] False -> let chunks = calculateChunksCapped s chunk 4096 in pure . Just $ (\(a, b, _) -> (input, a, b)) <$> chunks when (null things) $ throwE NoInputFilesWithData e <- ask mpu <- lift $ createMultipartUpload dest let (is, bs, ls) = L.unzip3 things chunks = L.zip4 is bs ls [1..] rs <- liftIO $ consume (forM_ chunks . writeQueue) fork $ multipartCopyWorker e mpu dest let abort = lift $ abortMultipart' dest mpu case rs of Left f -> abort >> (throwE $ ConcatCopyError f) Right prts -> flip onException abort $ void . send $ f' A.completeMultipartUpload dest mpu & A.cMultipartUpload .~ pure (A.completedMultipartUpload & A.cmuParts .~ sortPartResponse (snd prts)) copy :: Address -> Address -> ExceptT CopyError AWS () copy s d = copyWithMode Overwrite s d copyWithMode :: WriteMode -> Address -> Address -> ExceptT CopyError AWS () copyWithMode mode s d = do unlessM (lift $ exists s) . throwE $ CopySourceMissing s when (mode == Fail) . whenM (lift $ exists d) . throwE $ CopyDestinationExists $ d sz' <- lift $ getSize s sz <- fromMaybeM (throwE $ CopySourceSize s) sz' let chunk = 100 * 1024 * 1024 -- 100 mb big = 1024 * 1024 * 1024 -- 1 gb case sz < big of True -> lift $ copySingle s d False -> copyMultipart s d sz chunk 100 copySingle :: Address -> Address -> AWS () copySingle (Address (Bucket sb) (Key sk)) (Address (Bucket b) (Key dk)) = void . send $ A.copyObject (BucketName b) (sb <> "/" <> sk) (ObjectKey dk) & A.coServerSideEncryption .~ Just sse & A.coMetadataDirective .~ Just MDCopy copyMultipart :: Address -> Address -> Int -> Int -> Int -> ExceptT CopyError AWS () copyMultipart source dest sz chunk fork = do e <- ask mpu <- lift $ createMultipartUpload dest -- target let chunks = calculateChunksCapped sz chunk 4096 things = (\(o, c, i) -> (source, o, c, i)) <$> chunks r <- liftIO $ consume (forM_ things . writeQueue) fork $ multipartCopyWorker e mpu dest let abort = lift $ abortMultipart' dest mpu case r of Left f -> abort >> (throwE $ MultipartCopyError f) Right prts -> flip onException abort $ void . send $ f' A.completeMultipartUpload dest mpu & A.cMultipartUpload .~ pure (A.completedMultipartUpload & A.cmuParts .~ sortPartResponse (snd prts)) -- Sort is required here because the completeMultipartUpload api expects an -- ascending list of part id's sortPartResponse :: [PartResponse] -> Maybe (NEL.NonEmpty A.CompletedPart) sortPartResponse prts = let z = L.sortOn (\(PartResponse i _) -> i) prts l = (\(PartResponse i etag) -> A.completedPart i etag) <$> z in NEL.nonEmpty l multipartCopyWorker :: Env -> Text -> Address -> (Address, Int, Int, Int) -> IO (Either Error PartResponse) multipartCopyWorker e mpu dest (source, o, c, i) = do let sb = unBucket $ bucket source sk = unKey $ key source db = unBucket $ bucket dest dk = unKey $ key dest req = A.uploadPartCopy (BucketName db) (sb <> "/" <> sk) (ObjectKey dk) i mpu & A.upcCopySourceRange .~ (Just $ bytesRange o (o + c - 1)) Retry.recovering (Retry.fullJitterBackoff 500000) [s3Condition] $ \_ -> do r <- runExceptT . runAWS e $ send req case r of Left z -> pure $! Left z Right z -> do pr <- fromMaybeM (throwM . Invariant $ "upcrsCopyPartResult") $ z ^. A.upcrsCopyPartResult m <- fromMaybeM (throwM . Invariant $ "cprETag") $ pr ^. A.cprETag pure $! Right $! PartResponse i m sync :: Address -> Address -> Int -> ExceptT SyncError AWS () sync = syncWithMode FailSync syncWithMode :: SyncMode -> Address -> Address -> Int -> ExceptT SyncError AWS () syncWithMode mode source dest fork = do e <- ask void . firstT SyncError . ExceptT . liftIO $ (consume (sinkQueue e (Stream.listRecursively source)) fork (syncWorker source dest mode e)) syncWorker :: Address -> Address -> SyncMode -> Env -> Address -> IO (Either SyncWorkerError ()) syncWorker input output mode env f = runExceptT . runAWST env SyncAws $ do n <- maybe (throwE $ SyncInvariant input f) pure $ removeCommonPrefix input f let out = withKey (// n) output liftCopy = firstT SyncCopyError cp = liftCopy $ copy f out foldSyncMode (ifM (lift $ exists out) (throwE $ OutputExists out) cp) (liftCopy $ copyWithMode Overwrite f out) (ifM (lift $ exists out) (pure ()) cp) mode move :: Address -> Address -> ExceptT CopyError AWS () move source destination' = copy source destination' >> lift (delete source) upload :: FilePath -> Address -> ExceptT UploadError AWS () upload = uploadWithMode Fail uploadRecursive :: FilePath -> Address -> Int -> ExceptT UploadError AWS () uploadRecursive = uploadRecursiveWithMode Fail uploadWithMode :: WriteMode -> FilePath -> Address -> ExceptT UploadError AWS () uploadWithMode m f a = do when (m == Fail) . whenM (lift $ exists a) . throwE $ UploadDestinationExists a unlessM (liftIO $ doesFileExist f) . throwE $ UploadSourceMissing f s <- liftIO $ withFile f ReadMode $ \h -> hFileSize h case s < bigChunkSize of True -> lift $ uploadSingle f a False -> -- Originally had a concurrency of 100 (instead of 20). -- -- Based on the reasoning behind downloadWithMode which resulted in a 5 -- as it's concurrency default. Testing showed that for upload 20 was a -- better default. case s > 1024 * 1024 * 1024 of True -> multipartUpload f a s (2 * bigChunkSize) 20 False -> multipartUpload f a s bigChunkSize 20 bigChunkSize :: Integer bigChunkSize = 100 * 1024 * 1024 uploadSingle :: FilePath -> Address -> AWS () uploadSingle file a = do rq <- N.chunkedFile (ChunkSize $ 1024 * 1024) file void . send $ f' A.putObject a rq & A.poServerSideEncryption .~ pure sse multipartUpload :: FilePath -> Address -> Integer -> Integer -> Int -> ExceptT UploadError AWS () multipartUpload file a fSize chunk fork = do e <- ask mpu <- lift $ createMultipartUpload a let chunks = calculateChunksCapped (fromInteger fSize) (fromInteger chunk) 4096 -- max 4096 prts returned r <- liftIO $ consume (forM_ chunks . writeQueue) fork $ multipartUploadWorker e mpu file a let abort = lift $ abortMultipart' a mpu case r of Left f -> abort >> (throwE $ MultipartUploadError f) Right prts -> flip onException abort $ void . send $ f' A.completeMultipartUpload a mpu & A.cMultipartUpload .~ pure (A.completedMultipartUpload & A.cmuParts .~ sortPartResponse (snd prts)) multipartUploadWorker :: Env -> Text -> FilePath -> Address -> (Int, Int, Int) -> IO (Either Error PartResponse) multipartUploadWorker e mpu file a (o, c, i) = withFile file ReadMode $ \h -> let cs = (1024 * 1024) -- 1 mb cl = toInteger c b = XB.slurpHandle h (toInteger o) (Just $ toInteger c) cb = ChunkedBody cs cl b req' = f' A.uploadPart a i mpu $ Chunked cb in Retry.recovering (Retry.fullJitterBackoff 500000) [s3Condition] $ \_ -> do r <- runExceptT . runAWS e $ send req' case r of Left z -> pure $! Left z Right z -> do m <- fromMaybeM (throwM MissingETag) $ z ^. A.uprsETag pure $! Right $! PartResponse i m s3Condition :: Applicative a => Retry.RetryStatus -> Handler a Bool s3Condition s = Handler $ \(ex :: S3Error) -> pure $ case ex of MissingETag -> Retry.rsIterNumber s < 5 _ -> False uploadRecursiveWithMode :: WriteMode -> FilePath -> Address -> Int -> ExceptT UploadError AWS () uploadRecursiveWithMode mode src (Address buck ky) fork = do es <- tryIO $ getFileStatus src case es of Left _ -> throwE $ UploadSourceMissing src Right st -> unless (isDirectory st) . throwE $ UploadSourceNotDirectory src files <- liftIO (listRecursivelyLocal src) mapM_ uploadFiles $ chunkFilesBySize fork (fromIntegral bigChunkSize) files where uploadFiles :: [(FilePath, Int64)] -> ExceptT UploadError AWS () uploadFiles [] = pure () uploadFiles [(f,s)] | fromIntegral s < bigChunkSize = lift . uploadSingle f $ uploadAddress f | otherwise = uploadWithMode mode f $ uploadAddress f uploadFiles xs = do e <- ask liftIO $ mapConcurrently_ (\ (f, _) -> unsafeRunAWS e . uploadSingle f $ uploadAddress f) xs prefixLen = L.length (src "a") - 1 uploadAddress :: FilePath -> Address uploadAddress fp = Address buck (ky // Key (T.pack $ L.drop prefixLen fp)) -- | Take a list of files and their sizes, and convert it to a list of tests -- where the total size of the files in the sub list is less than `maxSize` -- and the length of the sub lists is <= `maxCount`. chunkFilesBySize :: Int -> Int64 -> [(FilePath, Int64)] -> [[(FilePath, Int64)]] chunkFilesBySize maxCount maxSize = takeFiles 0 [] . L.sortOn snd where takeFiles :: Int64 -> [(FilePath, Int64)] -> [(FilePath, Int64)] -> [[(FilePath, Int64)]] takeFiles _ acc [] = [acc] takeFiles current acc ((x, s):xs) = if current + s < maxSize && L.length acc < maxCount then takeFiles (current + s) ((x, s):acc) xs else acc : takeFiles s [(x, s)] xs -- | Like `listRecursively` but for the local filesystem. -- Also returns listRecursivelyLocal :: MonadIO m => FilePath -> m [(FilePath, Int64)] listRecursivelyLocal topdir = do entries <- liftIO $ listDirectory topdir (dirs, files) <- liftIO . partitionDirsFilesWithSizes $ fmap (topdir ) entries others <- concatMapM listRecursivelyLocal dirs pure $ files <> others -- Not available with ghc 7.10 so copy it here. listDirectory :: FilePath -> IO [FilePath] listDirectory path = filter f <$> getDirectoryContents path where f filename = filename /= "." && filename /= ".." partitionDirsFilesWithSizes :: MonadIO m => [FilePath] -> m ([FilePath], [(FilePath, Int64)]) partitionDirsFilesWithSizes = pworker ([], []) where pworker (dirs, files) [] = pure (dirs, files) pworker (dirs, files) (x:xs) = do xstat <- liftIO $ getFileStatus x let xsize = fromIntegral $ fileSize xstat newDirs = if isDirectory xstat then x : dirs else dirs newFiles = if isRegularFile xstat then (x, xsize) : files else files pworker (newDirs, newFiles) xs write :: Address -> Text -> AWS WriteResult write = writeWithMode Fail writeWithMode :: WriteMode -> Address -> Text -> AWS WriteResult writeWithMode w a t = do result <- runExceptT $ do case w of Fail -> whenM (lift $ exists a) $ throwE (WriteDestinationExists a) Overwrite -> return () void . lift . send $ f' A.putObject a (toBody . T.encodeUtf8 $ t) & A.poServerSideEncryption .~ Just sse either pure (const $ pure WriteOk) result -- | Retrieve the object at 'Address'. Handles any 404 response by converting to Maybe. getObject' :: Address -> AWS (Maybe GetObjectResponse) getObject' = handle404 . send . f' A.getObject -- | Read contents of 'Address'. -- read :: Address -> AWS (Maybe Text) read a = withRetries 5 $ do r <- Stream.read a z <- liftIO . sequence $ (\x -> runResourceT . runConduit $ x .| Conduit.sinkLbs) <$> r pure $ fmap (T.concat . TL.toChunks . TL.decodeUtf8) z -- pair of prefixs and keys getObjects :: Address -> AWS ([Key], [Key]) getObjects (Address (Bucket buck) (Key ky)) = ((Key <$>) *** ((\(ObjectKey t) -> Key t) <$>)) <$> ff (A.listObjects (BucketName buck) & A.loPrefix .~ Just ((+/) ky) & A.loDelimiter .~ Just '/' ) where ff :: ListObjects -> AWS ([T.Text], [ObjectKey]) ff b = do r <- send b if r ^. A.lorsIsTruncated == Just True then do let d = (maybeToList =<< fmap (^. A.cpPrefix) (r ^. A.lorsCommonPrefixes), fmap (^. A.oKey) (r ^. A.lorsContents)) n <- ff $ b & A.loMarker .~ (r ^. A.lorsNextMarker) pure $ d <> n else pure (maybeToList =<< fmap (^. A.cpPrefix) (r ^. A.lorsCommonPrefixes), fmap (^. A.oKey) (r ^. A.lorsContents)) getObjectsRecursively :: Address -> AWS [Object] getObjectsRecursively (Address (Bucket b) (Key ky)) = getObjects' $ A.listObjects (BucketName b) & A.loPrefix .~ Just ((+/) ky) where -- Hoping this will have ok performance in cases where the results are large, it shouldnt -- affect correctness since we search through the list for it anyway go x ks = (NEL.toList ks <>) <$> getObjects' (x & A.loMarker .~ Just (toText $ NEL.last ks ^. A.oKey)) getObjects' :: ListObjects -> AWS [Object] getObjects' x = do resp <- send x if resp ^. A.lorsIsTruncated == Just True then maybe (throwM . Invariant $ "Truncated response with empty contents list.") (go x) (NEL.nonEmpty $ resp ^. A.lorsContents) else pure $ resp ^. A.lorsContents -- | Return a tuple of the prefixes and keys at the provided S3 Address. listObjects :: Address -> AWS ([Address], [Address]) listObjects a = (\(p, k) -> (Address (bucket a) <$> p, Address (bucket a) <$> k)) <$> getObjects a list :: Address -> AWS [Address] list a = runConduit $ Stream.list a .| DC.consume listRecursively :: Address -> AWS [Address] listRecursively a = runConduit $ Stream.listRecursively a .| DC.consume download :: Address -> FilePath -> ExceptT DownloadError AWS () download = downloadWithMode Fail downloadWithMode :: WriteMode -> Address -> FilePath -> ExceptT DownloadError AWS () downloadWithMode mode a f = do when (mode == Fail) . whenM (liftIO $ doesFileExist f) . throwE $ DownloadDestinationExists f liftIO $ createDirectoryIfMissing True (takeDirectory f) sz' <- lift $ getSize a sz <- maybe (throwE $ DownloadSourceMissing a) pure sz' if (sz > 200 * 1024 * 1024) then -- Originally had a concurrecy of 100 (instead of 5). Tested a number of -- values between 2 and 100 and found empirically that 5 gave the fastest -- downloads (less than 10% better), but significantly reduced the -- likelihood of triggering the S3 rate limiter (by a factor of 20) -- which in turn reduces the liklehood of `IOExceptions` and hung -- threads. multipartDownload a f sz 100 5 else downloadSingle a f downloadSingle :: Address -> FilePath -> ExceptT DownloadError AWS () downloadSingle a f = do r <- maybe (throwE $ DownloadSourceMissing a) pure =<< lift (getObject' a) liftIO . withRetries 5 . withFileSafe f $ \p -> runResourceT . runConduit $ (r ^. A.gorsBody ^. to _streamBody) .| Conduit.sinkFile p multipartDownload :: Address -> FilePath -> Int -> Integer -> Int -> ExceptT DownloadError AWS () multipartDownload source destination sz chunk fork = firstT MultipartError $ do e <- ask let chunks = calculateChunks sz (fromInteger $ chunk * 1024 * 1024) void . withFileSafe destination $ \f -> do liftIO $ withFile f WriteMode $ \h -> hSetFileSize h (toInteger sz) ExceptT . liftIO . consume (\q -> mapM (writeQueue q) chunks) fork $ \(o, c, _) -> runExceptT . runAWS e $ downloadWithRange source o (o + c) f downloadWithRange :: Address -> Int -> Int -> FilePath -> AWS () downloadWithRange a start end dest = withRetries 5 $ do -- Use a timeout of ten minutes. Arrivied at empirically. With a timeout of 5 -- minutes this was triggering too often. Want this to be the last resort. res <- timeoutAWS (10 * 60 * 1000 * 1000) $ do r <- send $ f' A.getObject a & A.goRange .~ (Just $ bytesRange start end) -- write to file liftIO . runResourceT $ do fd <- snd <$> allocate (openFd dest WriteOnly Nothing defaultFileFlags) closeFd void . liftIO $ fdSeek fd AbsoluteSeek (fromInteger . toInteger $ start) let source = r ^. A.gorsBody ^. to _streamBody sink = Conduit.awaitForever $ liftIO . UBS.fdWrite fd runConduit $ source .| sink case res of Just () -> pure () Nothing -> liftIO $ ioError (userError "downloadWithRange timeout") downloadRecursiveWithMode :: WriteMode -> Address -> FilePath -> ExceptT DownloadError AWS () downloadRecursiveWithMode mode src dest = do -- Check if the destination already exists and is not a directory. es <- tryIO $ getFileStatus dest case es of Left _ -> pure () Right st -> unless (isDirectory st) . throwE $ DownloadDestinationNotDirectory dest -- Real business starts here. addrs <- lift $ listRecursively src mapM_ drWorker addrs where drWorker :: Address -> ExceptT DownloadError AWS () drWorker addr = do fpdest <- maybe (throwE $ DownloadInvariant addr src) pure $ (() dest) . T.unpack . unKey <$> removeCommonPrefix src addr downloadWithMode mode addr fpdest downloadRecursive :: Address -> FilePath -> ExceptT DownloadError AWS () downloadRecursive = downloadRecursiveWithMode Fail createMultipartUpload :: Address -> AWS Text createMultipartUpload a = do mpu <- send $ f' A.createMultipartUpload a & A.cmuServerSideEncryption .~ Just sse maybe (throwM . Invariant $ "MultipartUpload: missing 'UploadId'") pure (mpu ^. A.cmursUploadId) listMultipartParts :: Address -> Text -> AWS [Part] listMultipartParts a uploadId = do let req = f' A.listParts a uploadId runConduit $ paginate req .| DC.foldMap (^. A.lprsParts) listMultiparts :: Bucket -> AWS [MultipartUpload] listMultiparts (Bucket bn) = do let req = A.listMultipartUploads $ BucketName bn runConduit $ paginate req .| DC.foldMap (^. A.lmursUploads) listOldMultiparts :: Bucket -> AWS [MultipartUpload] listOldMultiparts b = do mus <- listMultiparts b now <- liftIO getCurrentTime pure $ filter (filterOld now) mus listOldMultiparts' :: Bucket -> Int -> AWS [MultipartUpload] listOldMultiparts' b i = do mus <- listMultiparts b now <- liftIO getCurrentTime pure $ filter (filterNDays i now) mus -- | Filter parts older than 7 days. -- filterOld :: UTCTime -> MultipartUpload -> Bool filterOld = filterNDays 7 filterNDays :: Int -> UTCTime -> MultipartUpload -> Bool filterNDays n now m = case m ^. A.muInitiated of Nothing -> False Just x -> nDaysOld n now x nDaysOld :: Int -> UTCTime -> UTCTime -> Bool nDaysOld n now utc = do let n' = fromInteger $ toInteger n let diff = -1 * 60 * 60 * 24 * n' :: NominalDiffTime let boundary = addUTCTime diff now boundary > utc abortMultipart :: Bucket -> MultipartUpload -> AWS () abortMultipart (Bucket b) mu = do (ObjectKey k) <- maybe (throwM $ Invariant "Multipart key missing") pure (mu ^. A.muKey) i <- maybe (throwM $ Invariant "Multipart uploadId missing") pure (mu ^. A.muUploadId) abortMultipart' (Address (Bucket b) (Key k)) i abortMultipart' :: Address -> Text -> AWS () abortMultipart' a i = void . send $ f' A.abortMultipartUpload a i grantReadAccess :: Address -> ReadGrant -> AWS () grantReadAccess a g = void . send $ (f' P.putObjectACL a & P.poaGrantRead .~ Just (readGrant g)) tryIO :: MonadIO m => IO a -> m (Either IOError a) tryIO = liftIO . CE.try ifM :: Monad m => m Bool -> m a -> m a -> m a ifM p x y = p >>= \b -> if b then x else y whenM :: Monad m => m Bool -> m () -> m () whenM p m = p >>= flip when m unlessM :: Monad m => m Bool -> m () -> m () unlessM p m = p >>= flip unless m fromMaybeM :: Applicative f => f a -> Maybe a -> f a fromMaybeM = flip maybe pure