module Aws.DynamoDb.Commands.Query.Conduit
( module Aws.DynamoDb.Commands.Query
, ConduitQuery(..), ConduitQueryResponse
) where
import Data.Aeson (parseJSON)
import Data.Aeson.Types (parseMaybe)
import Data.ByteString (ByteString)
import Data.Conduit (ConduitM, fuse, yield, await, ResumableSource, unwrapResumable)
import Data.IORef (IORef)
import Data.JSON.ToGo.Parser
import Data.Maybe (maybeToList, fromMaybe)
import Data.Monoid (Monoid, mempty, mappend, (<>), Last(..))
import qualified Data.Text as T
import qualified Data.Text.Encoding as T
import Control.Applicative ((<$), (<$>), (<*>))
import Control.Monad.IO.Class (liftIO)
import Control.Monad.Trans.Class (lift)
import Control.Monad.Trans.Parser (ParserT, runParserT, runParserTWith, eitherResultM, ResultM(..))
import Control.Monad.Trans.Resource (ResourceT, register, runResourceT, throwM)
import Control.Monad.Trans.Writer.Lazy (WriterT, tell, runWriterT)
import qualified Network.HTTP.Types as HTTP
import qualified Network.HTTP.Conduit as HTTP
import Aws.Core
import Aws.DynamoDb.Core hiding (runParser)
import Aws.DynamoDb.Core.Conduit
import Aws.DynamoDb.Commands.Query
data ConduitQuery = ConduitQuery { getQuery :: Query }
instance SignQuery ConduitQuery where
type ServiceConfiguration ConduitQuery = DdbConfiguration
signQuery = ddbSignQuery "Query" . getQuery
data IncompleteQueryResponse = IncompleteQueryResponse
{ incompleteLastKey :: Last [Attribute]
, incompleteCount :: Last Int
, incompleteScanned :: Last Int
, incompleteConsumed :: Last ConsumedCapacity
}
toQueryResponse :: IncompleteQueryResponse -> Maybe (QueryResponse)
toQueryResponse icr = do
let items = mempty
lastKey = getLast $ incompleteLastKey icr
consumed = getLast $ incompleteConsumed icr
count <- getLast $ incompleteCount icr
scanned <- getLast $ incompleteScanned icr
return $ QueryResponse items lastKey count scanned consumed
instance Monoid IncompleteQueryResponse where
mempty = IncompleteQueryResponse mempty mempty mempty mempty
mappend (IncompleteQueryResponse a b c d)
(IncompleteQueryResponse e f g h)
= IncompleteQueryResponse (a <> e) (b <> f) (c <> g) (d <> h)
responseParser :: Monad m => ParserM (WriterT [Item] m) QueryResponse
responseParser = pobject key >>= maybe (fail "incomplete") return . toQueryResponse
where
key "Count" = setCount <$> Just <$> parse
key "ScannedCount" = setScanned <$> Just <$> parse
key "ConsumedCapacity" = setConsumed <$> Just <$> parse
key "LastKey" = setLastKey <$> parseMaybe parseAttributeJson <$> pvalue
key "Items" = mempty <$ (parray . const $ parse >>= lift . tell . maybeToList)
key _ = return mempty
setCount c = mempty { incompleteCount = Last c }
setScanned c = mempty { incompleteScanned = Last c }
setConsumed c = mempty { incompleteConsumed = Last c }
setLastKey c = mempty { incompleteLastKey = Last c }
runParser :: (Monad m, Monoid i, Eq i) => ParserT i (WriterT a m) r -> ConduitM i a m r
runParser p = await' >>= lift . runWriterT . runParserT p >>= \w -> case w of
(PartialM p', a) -> yield a >> runParser p'
(FailM i s, a) -> yield a >> fail s
(DoneM i r, a) -> yield a >> return r
where await' = await >>= maybe (return mempty) (\i -> if i == mempty then await' else return i)
consume p rsrc = do
src <- lift $ do
(src, finalize) <- unwrapResumable rsrc
register (runResourceT finalize)
return src
fuse src $ runParser p
instance Transaction ConduitQuery ConduitQueryResponse
type ConduitQueryResponse = ConduitResponse [Item] QueryResponse
instance ResponseConsumer ConduitQuery ConduitQueryResponse where
type ResponseMetadata ConduitQueryResponse = DdbResponse
responseConsumer _ ref resp = do
tellMeta
return $ case statusCode of
200 -> rSuccess
_ -> rError
where
header = fmap T.decodeUtf8 . flip lookup (HTTP.responseHeaders resp)
amzId = header "x-amzn-RequestId"
amzCrc = header "x-amz-crc32"
meta = DdbResponse amzCrc amzId
tellMeta = liftIO $ tellMetadataRef ref meta
rSuccess = consume responseParser $ HTTP.responseBody resp
rError = do
err'' <- consume parse $ HTTP.responseBody resp
errCode <- readErrCode . T.drop 1 . snd . T.breakOn "#" $ aeType err''
throwM . DdbError statusCode errCode . fromMaybe "" $ aeMessage err''
readErrCode = return . read . T.unpack
HTTP.Status{..} = HTTP.responseStatus resp