#if __GLASGOW_HASKELL__ >= 800
#endif
module Database.DynamoDB.QueryRequest (
query
, querySimple
, queryCond
, querySource
, querySourceChunks
, queryOverIndex
, scan
, scanCond
, scanSource
, scanSourceChunks
, QueryOpts
, queryOpts
, qConsistentRead, qStartKey, qDirection, qFilterCondition, qHashKey, qRangeCondition, qLimit
, ScanOpts
, scanOpts
, sFilterCondition, sConsistentRead, sLimit, sParallel, sStartKey
, leftJoin
, innerJoin
) where
import Control.Arrow (first)
import Control.Arrow (second)
import Control.Lens (Lens', sequenceOf, view, (%~),
(.~), (^.), _2)
import Control.Lens.TH (makeLenses)
import Control.Monad ((>=>))
import Control.Monad.Catch (throwM)
import Data.Bool (bool)
import Data.Coerce (coerce)
import Data.Conduit (Conduit, Source, (=$=))
import qualified Data.Conduit.List as CL
import Data.Foldable (toList)
import Data.Function ((&))
import Data.HashMap.Strict (HashMap)
import qualified Data.Map as Map
import Data.Maybe (mapMaybe)
import Data.Monoid ((<>))
import Data.Proxy
import Data.Sequence (Seq)
import qualified Data.Sequence as Seq
import qualified Data.Text as T
import Generics.SOP
import Network.AWS
import qualified Network.AWS.DynamoDB.Query as D
import qualified Network.AWS.DynamoDB.Scan as D
import qualified Network.AWS.DynamoDB.Types as D
import Network.AWS.Pager (AWSPager (..))
import Numeric.Natural (Natural)
import Database.DynamoDB.BatchRequest (getItemBatch)
import Database.DynamoDB.Class
import Database.DynamoDB.Filter
import Database.DynamoDB.Internal
import Database.DynamoDB.Types
rsDecoder :: (MonadAWS m, DynamoCollection a r t)
=> HashMap T.Text D.AttributeValue -> m a
rsDecoder item =
case dGsDecode item of
Just res -> return res
Nothing -> throwM (DynamoException $ "Error decoding item: " <> T.pack (show item))
data QueryOpts a hash range = QueryOpts {
_qHashKey :: hash
, _qRangeCondition :: Maybe (RangeOper range)
, _qFilterCondition :: Maybe (FilterCondition a)
, _qConsistentRead :: Consistency
, _qDirection :: Direction
, _qLimit :: Maybe Natural
, _qStartKey :: Maybe (hash, range)
}
makeLenses ''QueryOpts
queryOpts :: hash -> QueryOpts a hash range
queryOpts key = QueryOpts key Nothing Nothing Eventually Forward Nothing Nothing
queryCmd :: forall a t hash range. CanQuery a t hash range => QueryOpts a hash range -> D.Query
queryCmd q =
dQueryKey (Proxy :: Proxy a) (q ^. qHashKey) (q ^. qRangeCondition)
& D.qConsistentRead . consistencyL .~ (q ^. qConsistentRead)
& D.qScanIndexForward .~ Just (q ^. qDirection == Forward)
& D.qLimit .~ (q ^. qLimit)
& addStartKey (q ^. qStartKey)
& addCondition (q ^. qFilterCondition)
where
addCondition Nothing = id
addCondition (Just cond) =
let (expr, attnames, attvals) = dumpCondition cond
in (D.qExpressionAttributeNames %~ (<> attnames))
. bool (D.qExpressionAttributeValues %~ (<> attvals)) id (null attvals)
. (D.qFilterExpression .~ Just expr)
addStartKey Nothing = id
addStartKey (Just (key, range)) =
D.qExclusiveStartKey .~ dQueryKeyToAttr (Proxy :: Proxy a) (key, range)
newtype FixedQuery = FixedQuery D.Query
instance AWSRequest FixedQuery where
type Rs FixedQuery = D.QueryResponse
request (FixedQuery a) = coerce (request a)
response lgr svc _ = response lgr svc (Proxy :: Proxy D.Query)
instance AWSPager FixedQuery where
page (FixedQuery dq) resp
| null lastkey = Nothing
| otherwise = Just $ FixedQuery (dq & D.qExclusiveStartKey .~ lastkey)
where
lastkey = resp ^. D.qrsLastEvaluatedKey
newtype FixedScan = FixedScan D.Scan
instance AWSRequest FixedScan where
type Rs FixedScan = D.ScanResponse
request (FixedScan a) = coerce (request a)
response lgr svc _ = response lgr svc (Proxy :: Proxy D.Scan)
instance AWSPager FixedScan where
page (FixedScan dq) resp
| null lastkey = Nothing
| otherwise = Just $ FixedScan (dq & D.sExclusiveStartKey .~ lastkey)
where
lastkey = resp ^. D.srsLastEvaluatedKey
querySourceChunks :: forall a t m hash range. (CanQuery a t hash range, MonadAWS m)
=> Proxy a -> QueryOpts a hash range -> Source m [a]
querySourceChunks _ q = paginate (FixedQuery (queryCmd q)) =$= CL.mapM (\res -> mapM rsDecoder (res ^. D.qrsItems))
querySource :: forall a t m hash range. (CanQuery a t hash range, MonadAWS m)
=> Proxy a -> QueryOpts a hash range -> Source m a
querySource p q = querySourceChunks p q =$= CL.concat
queryOverIndex :: forall a t m v1 v2 hash r2 range rest parent.
(CanQuery a t hash range, MonadAWS m,
Code a ~ '[ hash ': range ': rest],
DynamoIndex a parent 'WithRange, ContainsTableKey a parent (PrimaryKey parent r2),
DynamoTable parent r2,
DynamoScalar v1 hash, DynamoScalar v2 range)
=> Proxy a -> QueryOpts a hash range -> Source m parent
queryOverIndex p q =
querySourceChunks p (q & setConsistency)
=$= CL.mapFoldableM batchParent
where
setConsistency
| indexIsLocal p = id
| otherwise = qConsistentRead .~ Eventually
batchParent vals = getItemBatch (q ^. qConsistentRead) (map dTableKey vals)
querySimple :: forall a t m hash range.
(CanQuery a t hash range, MonadAWS m)
=> Proxy a
-> hash
-> Maybe (RangeOper range)
-> Direction
-> Int
-> m [a]
querySimple p key range direction limit = do
let opts = queryOpts key & qRangeCondition .~ range
& qDirection .~ direction
fst <$> query p opts limit
queryCond :: forall a t m hash range.
(CanQuery a t hash range, MonadAWS m)
=> Proxy a
-> hash
-> Maybe (RangeOper range)
-> FilterCondition a
-> Direction
-> Int
-> m [a]
queryCond p key range cond direction limit = do
let opts = queryOpts key & qRangeCondition .~ range
& qDirection .~ direction
& qFilterCondition .~ Just cond
fst <$> query p opts limit
query :: forall a t m range hash.
(CanQuery a t hash range, MonadAWS m)
=> Proxy a
-> QueryOpts a hash range
-> Int
-> m ([a], Maybe (PrimaryKey a 'WithRange))
query _ opts limit = do
let cmd = queryCmd (opts & addQLimit)
boundedFetch D.qExclusiveStartKey (view D.qrsItems) (view D.qrsLastEvaluatedKey) cmd limit
where
addQLimit
| Nothing <- opts ^. qLimit, Nothing <- opts ^. qFilterCondition = qLimit .~ Just (fromIntegral limit)
| otherwise = id
boundedFetch :: forall a r t m cmd.
(MonadAWS m, HasPrimaryKey a r t, AWSRequest cmd)
=> Lens' cmd (HashMap T.Text D.AttributeValue)
-> (Rs cmd -> [HashMap T.Text D.AttributeValue])
-> (Rs cmd -> HashMap T.Text D.AttributeValue)
-> cmd
-> Int
-> m ([a], Maybe (PrimaryKey a r))
boundedFetch startLens rsResult rsLast startcmd limit = do
(result, nextcmd) <- unfoldLimit fetch startcmd limit
if | length result > limit ->
let final = Seq.take limit result
in case Seq.viewr final of
Seq.EmptyR -> return ([], Nothing)
(_ Seq.:> lastitem) -> return (toList final, Just (dItemToKey lastitem))
| length result == limit, Just rs <- nextcmd ->
return (toList result, dAttrToKey (Proxy :: Proxy a) (rs ^. startLens))
| otherwise -> return (toList result, Nothing)
where
fetch cmd = do
rs <- send cmd
items <- Seq.fromList <$> mapM rsDecoder (rsResult rs)
let lastkey = rsLast rs
newquery = bool (Just (cmd & startLens .~ lastkey)) Nothing (null lastkey)
return (items, newquery)
unfoldLimit :: Monad m => (cmd -> m (Seq a, Maybe cmd)) -> cmd -> Int -> m (Seq a, Maybe cmd)
unfoldLimit code = go
where
go cmd limit = do
(vals, mnext) <- code cmd
let cnt = length vals
if | Just next <- mnext, cnt < limit -> first (vals <>) <$> go next (limit cnt)
| otherwise -> return (vals, mnext)
data ScanOpts a r = ScanOpts {
_sFilterCondition :: Maybe (FilterCondition a)
, _sConsistentRead :: Consistency
, _sLimit :: Maybe Natural
, _sParallel :: Maybe (Natural, Natural)
, _sStartKey :: Maybe (PrimaryKey a r)
}
makeLenses ''ScanOpts
scanOpts :: ScanOpts a r
scanOpts = ScanOpts Nothing Eventually Nothing Nothing Nothing
scanSourceChunks :: (MonadAWS m, TableScan a r t) => Proxy a -> ScanOpts a r -> Source m [a]
scanSourceChunks _ q = paginate (FixedScan (scanCmd q)) =$= CL.mapM (\res -> mapM rsDecoder (res ^. D.srsItems))
scanSource :: (MonadAWS m, TableScan a r t) => Proxy a -> ScanOpts a r -> Source m a
scanSource p q = scanSourceChunks p q =$= CL.concat
scan :: (MonadAWS m, TableScan a r t)
=> Proxy a
-> ScanOpts a r
-> Int
-> m ([a], Maybe (PrimaryKey a r))
scan _ opts limit = do
let cmd = scanCmd (opts & addSLimit)
boundedFetch D.sExclusiveStartKey (view D.srsItems) (view D.srsLastEvaluatedKey) cmd limit
where
addSLimit
| Nothing <- opts ^. sLimit, Nothing <- opts ^. sFilterCondition = sLimit .~ Just (fromIntegral limit)
| otherwise = id
scanCmd :: forall a r t. TableScan a r t => ScanOpts a r -> D.Scan
scanCmd q =
dScan (Proxy :: Proxy a)
& D.sConsistentRead . consistencyL .~ (q ^. sConsistentRead)
& D.sLimit .~ (q ^. sLimit)
& addStartKey (q ^. sStartKey)
& addCondition (q ^. sFilterCondition)
& addParallel (q ^. sParallel)
where
addCondition Nothing = id
addCondition (Just cond) =
let (expr, attnames, attvals) = dumpCondition cond
in (D.sExpressionAttributeNames %~ (<> attnames))
. bool (D.sExpressionAttributeValues %~ (<> attvals)) id (null attvals)
. (D.sFilterExpression .~ Just expr)
addStartKey Nothing = id
addStartKey (Just pkey) = D.sExclusiveStartKey .~ dKeyToAttr (Proxy :: Proxy a) pkey
addParallel Nothing = id
addParallel (Just (segment,total)) =
(D.sTotalSegments .~ Just total)
. (D.sSegment .~ Just segment)
scanCond :: forall a m r t. (MonadAWS m, TableScan a r t) => Proxy a -> FilterCondition a -> Int -> m [a]
scanCond _ cond limit = do
let opts = scanOpts & sFilterCondition .~ Just cond
cmd = scanCmd opts
fst <$> boundedFetch D.sExclusiveStartKey (view D.srsItems) (view D.srsLastEvaluatedKey) cmd limit
leftJoin :: forall a b m r.
(MonadAWS m, DynamoTable a r, Ord (PrimaryKey a r), ContainsTableKey a a (PrimaryKey a r))
=> Consistency
-> Proxy a
-> (b -> Maybe (PrimaryKey a r))
-> Conduit [b] m [(b, Maybe a)]
leftJoin consistency p getkey = CL.mapM doJoin
where
doJoin input = do
let keys = filter (dKeyIsDefined p) $ mapMaybe getkey input
rightTbl <- getItemBatch consistency keys
let resultMap = Map.fromList $ map (\res -> (dTableKey res,res)) rightTbl
return $ map (second (id >=> (`Map.lookup` resultMap))) $ zip input $ map getkey input
innerJoin :: forall a b m r.
(MonadAWS m, DynamoTable a r, Ord (PrimaryKey a r), ContainsTableKey a a (PrimaryKey a r))
=> Consistency
-> Proxy a
-> (b -> Maybe (PrimaryKey a r))
-> Conduit [b] m [(b, a)]
innerJoin consistency p getkey =
leftJoin consistency p getkey =$= CL.map (mapMaybe (sequenceOf _2))