module Network.AWS.S3.StreamingUpload
(
streamUpload
, UploadLocation(..)
, concurrentUpload
, abortAllUploads
, module Network.AWS.S3.CreateMultipartUpload
, module Network.AWS.S3.CompleteMultipartUpload
, chunkSize
) where
import Network.AWS (Error, HasEnv (..),
LogLevel (..),
MonadAWS, getFileSize,
hashedBody, send,
toBody)
import Control.Monad.Trans.AWS (AWSConstraint)
import Network.AWS.Data.Crypto (Digest, SHA256,
hashFinalize, hashInit,
hashUpdate)
import Network.AWS.S3.AbortMultipartUpload
import Network.AWS.S3.CompleteMultipartUpload
import Network.AWS.S3.CreateMultipartUpload
import Network.AWS.S3.ListMultipartUploads
import Network.AWS.S3.Types (BucketName, cmuParts, completedMultipartUpload,
completedPart, muKey,
muUploadId)
import Network.AWS.S3.UploadPart
import Control.Applicative
import Control.Category ((>>>))
import Control.Monad (forM_, when, (>=>))
import Control.Monad.IO.Class (MonadIO, liftIO)
import Control.Monad.Morph (lift)
import Control.Monad.Trans.Resource (MonadBaseControl,
MonadResource, throwM)
import Data.Conduit (Sink, await)
import Data.Conduit.List (sourceList)
import Data.ByteString (ByteString)
import qualified Data.ByteString as BS
import Data.ByteString.Builder (stringUtf8)
import System.IO.MMap (mmapFileByteString)
import qualified Data.DList as D
import Data.List (unfoldr)
import Data.List.NonEmpty (nonEmpty)
import Control.Exception.Lens (catching, handling)
import Control.Lens
import Text.Printf (printf)
import Control.Concurrent.Async.Lifted (forConcurrently)
chunkSize :: Int
chunkSize = 6*1024*1024
streamUpload :: (MonadResource m, MonadAWS m, AWSConstraint r m)
=> CreateMultipartUpload
-> Sink ByteString m CompleteMultipartUploadResponse
streamUpload cmu = do
logger <- lift $ view envLogger
let logStr :: MonadIO m => String -> m ()
logStr = liftIO . logger Info . stringUtf8
cmur <- lift (send cmu)
when (cmur ^. cmursResponseStatus /= 200) $
fail "Failed to create upload"
logStr "\n**** Created upload\n"
let Just upId = cmur ^. cmursUploadId
bucket = cmu ^. cmuBucket
key = cmu ^. cmuKey
go !bss !bufsize !ctx !partnum !completed = Data.Conduit.await >>= \mbs -> case mbs of
Just bs | l <- BS.length bs
, bufsize + l <= chunkSize ->
go (D.snoc bss bs) (bufsize + l) (hashUpdate ctx bs) partnum completed
| otherwise -> do
rs <- lift $ partUploader partnum (bufsize + BS.length bs)
(hashFinalize $ hashUpdate ctx bs)
(D.snoc bss bs)
logStr $ printf "\n**** Uploaded part %d size $d\n" partnum bufsize
let part = completedPart partnum <$> (rs ^. uprsETag)
go empty 0 hashInit (partnum+1) $ D.snoc completed part
Nothing -> lift $ do
rs <- partUploader partnum bufsize (hashFinalize ctx) bss
logStr $ printf "\n**** Uploaded (final) part %d size $d\n" partnum bufsize
let allParts = D.toList $ D.snoc completed $ completedPart partnum <$> (rs ^. uprsETag)
prts = nonEmpty =<< sequence allParts
send $ completeMultipartUpload bucket key upId
& cMultipartUpload ?~ set cmuParts prts completedMultipartUpload
partUploader :: MonadAWS m => Int -> Int -> Digest SHA256 -> D.DList ByteString -> m UploadPartResponse
partUploader pnum size digest =
D.toList
>>> sourceList
>>> hashedBody digest (fromIntegral size)
>>> toBody
>>> uploadPart bucket key pnum upId
>>> send
>=> checkUpload
checkUpload :: (Monad m) => UploadPartResponse -> m UploadPartResponse
checkUpload upr = do
when (upr ^. uprsResponseStatus /= 200) $ fail "Failed to upload piece"
return upr
catching id (go D.empty 0 hashInit 1 D.empty) $ \e ->
lift (send (abortMultipartUpload bucket key upId)) >> throwM e
data UploadLocation
= FP FilePath
| BS ByteString
concurrentUpload :: (MonadAWS m, MonadBaseControl IO m)
=> UploadLocation -> CreateMultipartUpload -> m CompleteMultipartUploadResponse
concurrentUpload ud cmu = do
cmur <- send cmu
when (cmur ^. cmursResponseStatus /= 200) $
fail "Failed to create upload"
let Just upId = cmur ^. cmursUploadId
bucket = cmu ^. cmuBucket
key = cmu ^. cmuKey
hndlr e = send (abortMultipartUpload bucket key upId) >> throwM e
handling id hndlr $ do
umrs <- case ud of
BS bs -> forConcurrently (zip [1..] $ chunksOf chunkSize bs) $ \(partnum, b) -> do
umr <- send . uploadPart bucket key partnum upId . toBody $ b
pure $ completedPart partnum <$> (umr ^. uprsETag)
FP fp -> do
fsize <- liftIO $ getFileSize fp
let (count,lst) = divMod (fromIntegral fsize) chunkSize
params = [(partnum, chunkSize*offset, size)
| partnum <- [1..]
| offset <- [0..count]
| size <- (chunkSize <$ [0..count1]) ++ [lst]
]
forConcurrently params $ \(partnum,off,size) -> do
b <- liftIO $ mmapFileByteString fp (Just (fromIntegral off,size))
umr <- send . uploadPart bucket key partnum upId . toBody $ b
pure $ completedPart partnum <$> (umr ^. uprsETag)
let prts = nonEmpty =<< sequence umrs
send $ completeMultipartUpload bucket key upId
& cMultipartUpload ?~ set cmuParts prts completedMultipartUpload
abortAllUploads :: (MonadAWS m) => BucketName -> m ()
abortAllUploads bucket = do
rs <- send (listMultipartUploads bucket)
forM_ (rs ^. lmursUploads) $ \mu -> do
let mki = (,) <$> mu ^. muKey <*> mu ^. muUploadId
case mki of
Nothing -> pure ()
Just (key,uid) -> send (abortMultipartUpload bucket key uid) >> pure ()
justWhen :: (a -> Bool) -> (a -> b) -> a -> Maybe b
justWhen f g a = if f a then Just (g a) else Nothing
nothingWhen :: (a -> Bool) -> (a -> b) -> a -> Maybe b
nothingWhen f = justWhen (not . f)
chunksOf :: Int -> BS.ByteString -> [BS.ByteString]
chunksOf x = unfoldr (nothingWhen BS.null (BS.splitAt x))