Skip to content

Commit

Permalink
Paginate DynamoDB query results to return all items
Browse files Browse the repository at this point in the history
  • Loading branch information
nikki-t committed Oct 8, 2024
1 parent 027afd1 commit 1f0722a
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 45 deletions.
7 changes: 1 addition & 6 deletions hydrocron/api/controllers/timeseries.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,12 +239,7 @@ def timeseries_get(feature, feature_id, start_time, end_time, output, fields):
hits = 0

data_repository = DynamoDataRepository(connection.dynamodb_resource)
if feature.lower() == 'reach':
results = data_repository.get_reach_series_by_feature_id(feature_id, start_time, end_time)
if feature.lower() == 'node':
results = data_repository.get_node_series_by_feature_id(feature_id, start_time, end_time)
if feature.lower() == 'priorlake':
results = data_repository.get_prior_lake_series_by_feature_id(feature_id, start_time, end_time)
results = data_repository.get_series_by_feature_id(feature, feature_id, start_time, end_time)

if len(results['Items']) == 0:
data['http_code'] = '400 Bad Request'
Expand Down
93 changes: 54 additions & 39 deletions hydrocron/api/data_access/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import logging

from boto3.resources.base import ServiceResource
from boto3.dynamodb.conditions import Key # noqa: E501 # pylint: disable=C0412
from boto3.dynamodb.conditions import Key, And # noqa: E501 # pylint: disable=C0412

from hydrocron.utils import constants

Expand All @@ -19,61 +19,76 @@ def __init__(self, dynamo_resource: ServiceResource):
self._dynamo_instance = dynamo_resource
self._logger = logging.getLogger('hydrocron.api.data_access.db.DynamoDataRepository')

def get_reach_series_by_feature_id(self, feature_id: str, start_time: str, end_time: str): # noqa: E501 # pylint: disable=W0613
def get_series_by_feature_id(self, feature_type: str, feature_id: str, start_time: str, end_time: str): # noqa: E501 # pylint: disable=W0613
"""
@param feature_type:
@param feature_id:
@param start_time:
@param end_time:
@return:
"""
table_name = constants.SWOT_REACH_TABLE_NAME

hydrocron_table = self._dynamo_instance.Table(table_name)
hydrocron_table.load()

items = hydrocron_table.query(KeyConditionExpression=(
Key(constants.SWOT_REACH_PARTITION_KEY).eq(feature_id) &
Key(constants.SWOT_REACH_SORT_KEY).between(start_time, end_time))
)
return items

def get_node_series_by_feature_id(self, feature_id, start_time, end_time): # noqa: E501 # pylint: disable=W0613
"""
@param feature_id:
@param start_time:
@param end_time:
@return:
"""
table_name = constants.SWOT_NODE_TABLE_NAME

hydrocron_table = self._dynamo_instance.Table(table_name)
hydrocron_table.load()
if feature_type.lower() == 'reach':
table_name = constants.SWOT_REACH_TABLE_NAME
partition_key = constants.SWOT_REACH_PARTITION_KEY
sort_key = constants.SWOT_REACH_SORT_KEY
elif feature_type.lower() == 'node':
table_name = constants.SWOT_NODE_TABLE_NAME
partition_key = constants.SWOT_NODE_PARTITION_KEY
sort_key = constants.SWOT_NODE_SORT_KEY
elif feature_type.lower() == 'priorlake':
table_name = constants.SWOT_PRIOR_LAKE_TABLE_NAME
partition_key = constants.SWOT_PRIOR_LAKE_PARTITION_KEY
sort_key = constants.SWOT_PRIOR_LAKE_SORT_KEY
else:
table_name = ''
partition_key = ''
sort_key = ''

if table_name:
hydrocron_table = self._dynamo_instance.Table(table_name)
hydrocron_table.load()
key_condition_expression = (
Key(partition_key).eq(feature_id) &
Key(sort_key).between(start_time, end_time)
)
items = self._query_hydrocron_table(hydrocron_table, key_condition_expression)
else:
items = {'Items': []}

items = hydrocron_table.query(KeyConditionExpression=(
Key(constants.SWOT_NODE_PARTITION_KEY).eq(feature_id) &
Key(constants.SWOT_NODE_SORT_KEY).between(start_time, end_time))
)
return items

def get_prior_lake_series_by_feature_id(self, feature_id, start_time, end_time): # noqa: E501 # pylint: disable=W0613
def _query_hydrocron_table(self, hydrocron_table: str, key_condition_expression: And):
"""
@param feature_id:
@param start_time:
@param end_time:
@param hydrocron_table:
@param key_condition_expression:
@return:
"""
table_name = constants.SWOT_PRIOR_LAKE_TABLE_NAME

hydrocron_table = self._dynamo_instance.Table(table_name)
hydrocron_table.load()
items = hydrocron_table.query(
KeyConditionExpression=key_condition_expression
)
last_key_evaluated = ''
if 'LastEvaluatedKey' in items.keys():
last_key_evaluated = items['LastEvaluatedKey']

while last_key_evaluated:
next_items = hydrocron_table.query(
ExclusiveStartKey=last_key_evaluated,
KeyConditionExpression=key_condition_expression
)
items['Items'].extend(next_items['Items'])
items['Count'] += next_items['Count']
items['ScannedCount'] += next_items['ScannedCount']
items['ResponseMetadata'] = next_items['ResponseMetadata']
last_key_evaluated = ''
if 'LastEvaluatedKey' in next_items.keys():
last_key_evaluated = next_items['LastEvaluatedKey']
else:
items.pop('LastEvaluatedKey')

items = hydrocron_table.query(KeyConditionExpression=(
Key(constants.SWOT_PRIOR_LAKE_PARTITION_KEY).eq(feature_id) &
Key(constants.SWOT_PRIOR_LAKE_SORT_KEY).between(start_time, end_time))
)
return items

def get_granule_ur(self, table_name, granule_ur):
Expand Down

0 comments on commit 1f0722a

Please sign in to comment.