module Aws.DynamoDb.Streams.Core
( StreamsAction(..)
, streamsActionToText
, parseStreamsAction
, streamsServiceEndpoint
, StreamsMetadata(..)
, stmAmzId2
, stmRequestId
, StreamsConfiguration(..)
, stcRegion
, StreamsQuery(..)
, stqAction
, stqBody
, streamsSignQuery
, StreamsResponseJsonErrorData(..)
, srjedMessage
, srjedJSON
, StreamsErrorResponse(..)
, _StreamsResponseJsonError
, _StreamsErrorResponse
, _StreamsOtherError
, StreamsErrorResponseData(..)
, sterdErrorCode
, sterdErrorMessage
, StreamsOtherErrorData(..)
, stoeStatus
, stoeMessage
, jsonResponseConsumer
, errorResponseConsumer
, streamsResponseConsumer
) where
import Aws.Core
import Aws.General
import Control.Applicative
import Control.Applicative.Unicode
import Control.Exception
import Control.Monad.Trans
import Crypto.Hash
import Data.Byteable
import qualified Data.ByteString as B
import qualified Data.ByteString.Base16 as B16
import qualified Data.ByteString.Char8 as B8
import qualified Data.ByteString.Lazy as LB
import Data.Aeson
import qualified Data.CaseInsensitive as CI
import Data.Conduit (($$+-))
import Data.Conduit.Binary (sinkLbs)
import Data.Function (on)
import Data.IORef
import Data.List
import Data.Maybe
import Data.Profunctor
import Data.String
import Data.Typeable
import Control.Monad.Trans.Resource (throwM)
import Data.Monoid
import Data.Monoid.Unicode
import qualified Data.Text as T
import qualified Data.Text.Encoding as T
import qualified Network.HTTP.Types as HTTP
import qualified Network.HTTP.Conduit as HTTP
import Prelude.Unicode
import qualified Text.Parser.Char as P
import qualified Text.Parser.Combinators as P
import Text.Parser.Combinators ((<?>))
streamsTargetVersion ∷ IsString a ⇒ a
streamsTargetVersion = "DynamoDBStreams_20120810"
data StreamsAction
= ActionDescribeStream
| ActionGetRecords
| ActionGetShardIterator
| ActionListStreams
deriving (Eq, Ord, Enum, Bounded, Typeable, Read, Show)
streamsActionToText
∷ IsString s
⇒ StreamsAction
→ s
streamsActionToText = \case
ActionDescribeStream → "DescribeStream"
ActionGetRecords → "GetRecords"
ActionGetShardIterator → "GetShardIterator"
ActionListStreams → "ListStreams"
parseStreamsAction
∷ P.CharParsing m
⇒ m StreamsAction
parseStreamsAction = P.choice parsers <?> "StreamsAction"
where
actionToParser a =
a <$ P.text (streamsActionToText a)
parsers = actionToParser <$>
[ ActionDescribeStream
, ActionGetRecords
, ActionGetShardIterator
, ActionListStreams
]
instance AwsType StreamsAction where
toText = streamsActionToText
parse = parseStreamsAction
streamsServiceEndpoint
∷ Region
→ B8.ByteString
streamsServiceEndpoint = \case
UsEast1 → "streams.preview-dynamodb.us-east-1.amazonaws.com"
EuWest1 → "streams.preview-dynamodb.eu-west-1.amazonaws.com"
region → error $ "Unsupported region: " ⊕ show region
data StreamsMetadata
= StreamsMetadata
{ _stmAmzId2 ∷ !(Maybe T.Text)
, _stmRequestId ∷ !(Maybe T.Text)
} deriving (Eq, Show)
instance Loggable StreamsMetadata where
toLogText StreamsMetadata{..} =
"DynamoDb Streams: request ID="
⊕ fromMaybe "<none>" _stmRequestId
⊕ ", x-amz-id-2="
⊕ fromMaybe "<none>" _stmAmzId2
instance Monoid StreamsMetadata where
mempty = StreamsMetadata
{ _stmAmzId2 = Nothing
, _stmRequestId = Nothing
}
sm `mappend` sm' = StreamsMetadata
{ _stmAmzId2 = _stmAmzId2 sm <|> _stmAmzId2 sm'
, _stmRequestId = _stmRequestId sm <|> _stmRequestId sm'
}
stmAmzId2
∷ Functor f
⇒ (Maybe T.Text → f (Maybe T.Text))
→ StreamsMetadata
→ f StreamsMetadata
stmAmzId2 i StreamsMetadata{..} =
(\_stmAmzId2 → StreamsMetadata{..})
<$> i _stmAmzId2
stmRequestId
∷ Functor f
⇒ (Maybe T.Text → f (Maybe T.Text))
→ StreamsMetadata
→ f StreamsMetadata
stmRequestId i StreamsMetadata{..} =
(\_stmRequestId → StreamsMetadata{..})
<$> i _stmRequestId
data StreamsConfiguration qt
= StreamsConfiguration
{ _stcRegion ∷ !Region
} deriving (Eq, Show)
stcRegion
∷ Functor f
⇒ (Region → f Region)
→ StreamsConfiguration qt
→ f (StreamsConfiguration qt)
stcRegion i StreamsConfiguration{..} =
(\_stcRegion → StreamsConfiguration{..})
<$> i _stcRegion
data StreamsQuery
= StreamsQuery
{ _stqAction ∷ !StreamsAction
, _stqBody ∷ !LB.ByteString
} deriving (Eq, Show)
stqAction
∷ Functor f
⇒ (StreamsAction → f StreamsAction)
→ StreamsQuery
→ f StreamsQuery
stqAction i StreamsQuery{..} =
(\_stqAction → StreamsQuery{..})
<$> i _stqAction
stqBody
∷ Functor f
⇒ (LB.ByteString → f LB.ByteString)
→ StreamsQuery
→ f StreamsQuery
stqBody i StreamsQuery{..} =
(\_stqBody → StreamsQuery{..})
<$> i _stqBody
streamsTargetHeader
∷ StreamsAction
→ HTTP.Header
streamsTargetHeader a =
( "X-Amz-Target"
, streamsTargetVersion ⊕ "." ⊕ toText a
)
streamsSignQuery
∷ StreamsQuery
→ StreamsConfiguration qt
→ SignatureData
→ SignedQuery
streamsSignQuery StreamsQuery{..} StreamsConfiguration{..} sigData = SignedQuery
{ sqMethod = Post
, sqProtocol = HTTP
, sqHost = host
, sqPort = 80
, sqPath = "/"
, sqQuery = []
, sqDate = Just $ signatureTime sigData
, sqAuthorization = Just auth
, sqContentType = Just "application/x-amz-json-1.0"
, sqContentMd5 = Nothing
, sqAmzHeaders = amzHeaders ⊕ maybe [] pure securityTokenHeader
, sqOtherHeaders = []
, sqBody = Just $ HTTP.RequestBodyLBS _stqBody
, sqStringToSign = canonicalRequest
}
where
credentials = signatureCredentials sigData
host = streamsServiceEndpoint _stcRegion
sigTime = fmtTime "%Y%m%dT%H%M%SZ" $ signatureTime sigData
amzHeaders =
[ ("x-amz-date", sigTime)
, streamsTargetHeader _stqAction
]
securityTokenHeader =
("x-amz-security-token",)
<$> iamToken credentials
canonicalHeaders =
sortBy (compare `on` fst) $
amzHeaders ⊕
[ ("host", host)
, ("content-type", "application/x-amz-json-1.0")
]
canonicalRequest =
let bodyHash = B16.encode $ toBytes (hashlazy _stqBody :: Digest SHA256)
headers =
flip fmap canonicalHeaders $ \(a,b) →
[ CI.foldedCase a
, ":"
, b
]
in
B.concat ∘ intercalate ["\n"] $
[ [ "POST" ]
, [ "/" ]
, []
] ⊕ headers ⊕
[ []
, intersperse ";" ((CI.foldedCase ∘ fst) <$> canonicalHeaders)
, [ bodyHash ]
]
auth =
authorizationV4
sigData
HmacSHA256
(regionToText _stcRegion)
"dynamodb"
"content-type;host;x-amz-date;x-amz-target"
canonicalRequest
data StreamsErrorResponseData
= StreamsErrorResponseData
{ _sterdErrorCode ∷ !T.Text
, _sterdErrorMessage ∷ !T.Text
} deriving (Eq, Show, Typeable)
sterdErrorCode
∷ Functor f
⇒ (T.Text → f T.Text)
→ StreamsErrorResponseData
→ f StreamsErrorResponseData
sterdErrorCode i StreamsErrorResponseData{..} =
(\_sterdErrorCode → StreamsErrorResponseData{..})
<$> i _sterdErrorCode
sterdErrorMessage
∷ Functor f
⇒ (T.Text → f T.Text)
→ StreamsErrorResponseData
→ f StreamsErrorResponseData
sterdErrorMessage i StreamsErrorResponseData{..} =
(\_sterdErrorMessage → StreamsErrorResponseData{..})
<$> i _sterdErrorMessage
data StreamsOtherErrorData
= StreamsOtherErrorData
{ _stoeStatus ∷ !HTTP.Status
, _stoeMessage ∷ !T.Text
} deriving (Eq, Show, Typeable)
stoeStatus
∷ Functor f
⇒ (HTTP.Status → f HTTP.Status)
→ StreamsOtherErrorData
→ f StreamsOtherErrorData
stoeStatus i StreamsOtherErrorData{..} =
(\_stoeStatus → StreamsOtherErrorData{..})
<$> i _stoeStatus
stoeMessage
∷ Functor f
⇒ (T.Text → f T.Text)
→ StreamsOtherErrorData
→ f StreamsOtherErrorData
stoeMessage i StreamsOtherErrorData{..} =
(\_stoeMessage → StreamsOtherErrorData{..})
<$> i _stoeMessage
data StreamsResponseJsonErrorData
= StreamsResponseJsonErrorData
{ _srjedMessage ∷ !T.Text
, _srjedJSON ∷ !LB.ByteString
} deriving (Eq, Show, Typeable)
srjedMessage
∷ Functor f
⇒ (T.Text → f T.Text)
→ StreamsResponseJsonErrorData
→ f StreamsResponseJsonErrorData
srjedMessage i StreamsResponseJsonErrorData{..} =
(\_srjedMessage → StreamsResponseJsonErrorData{..})
<$> i _srjedMessage
srjedJSON
∷ Functor f
⇒ (LB.ByteString → f LB.ByteString)
→ StreamsResponseJsonErrorData
→ f StreamsResponseJsonErrorData
srjedJSON i StreamsResponseJsonErrorData{..} =
(\_srjedJSON → StreamsResponseJsonErrorData{..})
<$> i _srjedJSON
data StreamsErrorResponse
= StreamsResponseJsonError StreamsResponseJsonErrorData
| StreamsErrorResponse StreamsErrorResponseData
| StreamsOtherError StreamsOtherErrorData
deriving (Eq, Show, Typeable)
_StreamsResponseJsonError
∷ ( Choice p
, Applicative f
)
⇒ p StreamsResponseJsonErrorData (f StreamsResponseJsonErrorData)
→ p StreamsErrorResponse (f StreamsErrorResponse)
_StreamsResponseJsonError =
dimap to fro ∘ right'
where
to = \case
StreamsResponseJsonError e → Right e
e → Left e
fro = either pure (fmap StreamsResponseJsonError)
_StreamsErrorResponse
∷ ( Choice p
, Applicative f
)
⇒ p StreamsErrorResponseData (f StreamsErrorResponseData)
→ p StreamsErrorResponse (f StreamsErrorResponse)
_StreamsErrorResponse =
dimap to fro ∘ right'
where
to = \case
StreamsErrorResponse e → Right e
e → Left e
fro = either pure (fmap StreamsErrorResponse)
_StreamsOtherError
∷ ( Choice p
, Applicative f
)
⇒ p StreamsOtherErrorData (f StreamsOtherErrorData)
→ p StreamsErrorResponse (f StreamsErrorResponse)
_StreamsOtherError =
dimap to fro ∘ right'
where
to = \case
StreamsOtherError e → Right e
e → Left e
fro = either pure (fmap StreamsOtherError)
instance Exception StreamsErrorResponse
instance FromJSON StreamsErrorResponse where
parseJSON =
withObject "StreamsErrorResponse" $ \o →
fmap StreamsErrorResponse $
pure StreamsErrorResponseData
⊛ o .: "__type"
⊛ o .: "message"
jsonResponseConsumer
∷ FromJSON α
⇒ HTTPResponseConsumer α
jsonResponseConsumer res = do
doc ← HTTP.responseBody res $$+- sinkLbs
case eitherDecode (if doc ≡ mempty then "{}" else doc) of
Left e → throwM $ StreamsResponseJsonError StreamsResponseJsonErrorData
{ _srjedMessage = T.pack e
, _srjedJSON = doc
}
Right v → return v
streamsResponseConsumer
∷ FromJSON a
⇒ IORef StreamsMetadata
→ HTTPResponseConsumer a
streamsResponseConsumer metadata resp = do
let headerString = fmap T.decodeUtf8 ∘ flip lookup (HTTP.responseHeaders resp)
amzId2 = headerString "x-amz-id-2"
requestId = headerString "x-amz-request-id"
liftIO $ tellMetadataRef metadata StreamsMetadata
{ _stmAmzId2 = amzId2
, _stmRequestId = requestId
}
if HTTP.responseStatus resp ≥ HTTP.status400
then errorResponseConsumer resp
else jsonResponseConsumer resp
errorResponseConsumer ∷ HTTPResponseConsumer a
errorResponseConsumer resp = do
doc ← HTTP.responseBody resp $$+- sinkLbs
if HTTP.responseStatus resp ≡ HTTP.status400
then kinesisError doc
else throwM $ StreamsOtherError StreamsOtherErrorData
{ _stoeStatus = HTTP.responseStatus resp
, _stoeMessage = T.decodeUtf8 $ LB.toStrict doc
}
where
kinesisError doc =
case eitherDecode doc of
Left e → throwM $ StreamsResponseJsonError StreamsResponseJsonErrorData
{ _srjedMessage = T.pack e
, _srjedJSON = doc
}
Right a → throwM (a ∷ StreamsErrorResponse)