{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE QuasiQuotes #-}
module Control.Distributed.Fork.Lambda.Internal.Stack
( withStack
, StackInfo (..)
, awsUploadObject
, awsObjectExists
) where
import Control.Exception.Safe
import Control.Monad
import Data.Aeson (Value (Object))
import qualified Data.ByteString as BS
import qualified Data.ByteString.Lazy as BL
import qualified Data.HashMap.Strict as HM
import Data.List
import Data.Maybe
import Data.Monoid
import qualified Data.Text as T
import qualified Data.Text.Encoding as T
import Control.Lens
import Network.AWS hiding (environment)
import Network.AWS.Lambda
import qualified Network.AWS.S3 as S3
import Network.AWS.Waiter
import qualified Stratosphere as S
import Data.Aeson.QQ
import Network.AWS.CloudFormation
import Control.Distributed.Fork.Lambda.Internal.Constants
import Control.Distributed.Fork.Lambda.Internal.Types
seTemplate :: StackOptions -> S.Template
seTemplate StackOptions{..} =
S.template
(S.Resources
[ S.resource "func" $
S.LambdaFunctionProperties $
S.lambdaFunction
(S.lambdaFunctionCode
& S.lfcS3Bucket ?~ S.Ref templateParameterS3Bucket
& S.lfcS3Key ?~ S.Ref templateParameterS3Key)
"handler.handle"
(S.GetAtt "role" "Arn")
(S.Literal S.Python27)
& S.lfTimeout ?~ S.Literal 300
& S.lfMemorySize ?~ S.Literal (fromIntegral soLambdaMemory)
& S.lfDeadLetterConfig ?~
S.LambdaFunctionDeadLetterConfig (Just $ S.GetAtt "deadLetterQueue" "Arn")
, S.resource "role" seRole
, S.resource "answerQueue" $ S.SQSQueueProperties S.sqsQueue
, S.resource "deadLetterQueue" $ S.SQSQueueProperties S.sqsQueue
]) &
S.templateParameters ?~
S.Parameters
[ S.parameter templateParameterS3Bucket "String"
, S.parameter templateParameterS3Key "String"
] &
S.templateOutputs ?~
S.Outputs
[ S.output templateOutputFunc (S.Ref "func")
, S.output templateOutputAnswerQueue (S.Ref "answerQueue")
, S.output templateOutputDeadLetterQueue (S.Ref "deadLetterQueue")
]
seRole :: S.ResourceProperties
seRole =
S.IAMRoleProperties $
S.iamRole assumeRolePolicy
& S.iamrPolicies ?~
[ S.iamRolePolicy sqsAccessPolicy (S.Literal "sqs")
, S.iamRolePolicy cloudwatchPolicy (S.Literal "cloudwatch")
]
where
assumeRolePolicy = valueToObject [aesonQQ|
{
"Version":"2012-10-17",
"Statement": [{
"Effect": "Allow",
"Principal": {
"Service": [ "lambda.amazonaws.com" ]
},
"Action": [ "sts:AssumeRole" ]
}]
}
|]
sqsAccessPolicy = valueToObject [aesonQQ|
{
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Action": [
"sqs:SendMessage"
],
"Resource": "arn:aws:sqs:*"
}]
}
|]
cloudwatchPolicy = valueToObject [aesonQQ|
{
"Version": "2012-10-17",
"Statement": [{
"Action": [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Effect": "Allow",
"Resource": "arn:aws:logs:*:*:*"
}]
}
|]
valueToObject = \case
Object hm -> hm
_ -> error "invariant violation"
templateParameterS3Bucket :: T.Text
templateParameterS3Bucket = "s3bucket"
templateParameterS3Key :: T.Text
templateParameterS3Key = "s3key"
templateOutputFunc :: T.Text
templateOutputFunc = "output"
templateOutputAnswerQueue :: T.Text
templateOutputAnswerQueue = "answerQueue"
templateOutputDeadLetterQueue :: T.Text
templateOutputDeadLetterQueue = "deadLetterQueue"
awsUploadObject :: S3Loc -> BS.ByteString -> AWS ()
awsUploadObject (S3Loc (BucketName bucket) path) contents = do
pors <- send $ S3.putObject
(S3.BucketName bucket)
(S3.ObjectKey path)
(toBody contents)
unless (pors ^. S3.porsResponseStatus == 200) $
throwM . StackException $
"Upload failed. Status code: " <> T.pack (show $ pors ^. S3.porsResponseStatus)
awsObjectExists :: S3Loc -> AWS Bool
awsObjectExists (S3Loc (BucketName bucket) path) = do
lors <- send $
S3.listObjects (S3.BucketName bucket)
& S3.loPrefix ?~ path
unless (lors ^. S3.lorsResponseStatus == 200) $
throwM . StackException $
"List objects failed. Status code: " <> T.pack (show lors)
let files = map (view S3.oKey) (lors ^. S3.lorsContents)
return $ any (\(S3.ObjectKey k) -> k == path) files
data StackInfo = StackInfo
{ siId :: T.Text
, siFunc :: T.Text
, siAnswerQueue :: T.Text
, siDeadLetterQueue :: T.Text
}
seCreateStack :: StackOptions -> AWS StackInfo
seCreateStack options@StackOptions { soName = StackName stackName
, soLambdaCode = S3Loc (BucketName bucketName) path} = do
csrs <-
send $
createStack stackName
& csTemplateBody ?~
(T.decodeUtf8 . BL.toStrict . S.encodeTemplate $ seTemplate options)
& csCapabilities .~ [CapabilityIAM]
& csParameters .~
[ parameter
& pParameterKey ?~ templateParameterS3Bucket
& pParameterValue ?~ bucketName
, parameter
& pParameterKey ?~ templateParameterS3Key
& pParameterValue ?~ path
]
unless (csrs ^. csrsResponseStatus == 200) $
throwM $
StackException
("CloudFormation stack creation request failed." <> T.pack (show csrs))
stackId <-
case csrs ^. csrsStackId of
Nothing ->
throwM $
StackException
"Could not determine stack id."
Just xs -> return xs
await stackCreateComplete (describeStacks & dStackName ?~ stackId) >>= \case
AcceptSuccess -> return ()
err ->
throwM . StackException $ "CloudFormation stack creation failed." <> T.pack (show err)
dsrs <- send $ describeStacks & dStackName ?~ stackName
unless (dsrs ^. dsrsResponseStatus == 200) $
throwM . StackException $
"CloudFormation describeStack failed. Status code: "
<> T.pack (show $ dsrs ^. dsrsResponseStatus)
stackRs <- case dsrs ^. dsrsStacks of
[x] -> return x
_ -> throwM $ StackException "Unexpected answer from DescribeStacks."
func <- case lookupOutput stackRs templateOutputFunc of
Nothing -> throwM $ StackException "Could not determine function name."
Just t -> return t
answerQueue <- case lookupOutput stackRs templateOutputAnswerQueue of
Nothing -> throwM $ StackException "Could not determine answerQueue URL."
Just t -> return t
deadLetterQueue <- case lookupOutput stackRs templateOutputDeadLetterQueue of
Nothing -> throwM $ StackException "Could not determine deadLetterQueue URL."
Just t -> return t
_ <- send $
updateFunctionConfiguration func
& ufcEnvironment ?~ (
environment
& eVariables ?~ HM.fromList [ (envAnswerQueueUrl, answerQueue) ]
)
return $ StackInfo { siId = stackId
, siFunc = func
, siAnswerQueue = answerQueue
, siDeadLetterQueue = deadLetterQueue
}
where
lookupOutput :: Stack -> T.Text -> Maybe T.Text
lookupOutput st key =
fmap (\i -> fromJust $ i ^. oOutputValue) .
find (\o -> o ^. oOutputKey == Just key) $
st ^. sOutputs
seDeleteStack :: StackInfo -> AWS ()
seDeleteStack = void . send . deleteStack . siId
newtype StackException
= StackException T.Text
deriving Show
instance Exception StackException
withStack :: StackOptions -> Env -> (StackInfo -> IO a) -> IO a
withStack opts env = bracket create destroy
where
create = runResourceT . runAWS env $ seCreateStack opts
destroy = runResourceT . runAWS env . seDeleteStack