Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/issue 229 - Create new database tables for version D collections #260

Open
wants to merge 5 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]

### Added
- Issue 229 - Create new database tables for version d collections
- Issue 211 - Query track ingest table for granules with "to_ingest" status
- Issue 212 - Update track ingest table with granule status
- Issue 203 - Construct CNM to trigger load data operations and ingest granule
Expand Down
26 changes: 10 additions & 16 deletions hydrocron/db/io/swot_shp.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def read_shapefile(filepath, obscure_data, columns, s3_resource=None):
np.random.default_rng().integers(low=2, high=10)*shp_file[numeric_columns],
shp_file[numeric_columns])

filename_attrs = parse_from_filename(filename)
filename_attrs = parse_from_filename(filepath)

xml_attrs = parse_metadata_from_shpxml(shp_xml_tree)

Expand Down Expand Up @@ -204,38 +204,32 @@ def assemble_attributes(geodf, attributes):
return items


def parse_from_filename(filename):
def parse_from_filename(filepath):
"""
Parses the cycle, pass, start and end time from
the shapefile name and add to each item

Parameters
----------
filename : string
The string to parse
filepath : string
The full uri of the granule to parse

Returns
-------
filename_attrs : dict
A dictionary of attributes from the filename
"""
logging.info('Starting parse attributes from filename')

filename = os.path.basename(filepath)
filename_components = filename.split("_")

collection = ""
collection_version = ""

if 'RiverSP_Reach' in filename:
collection = constants.SWOT_REACH_COLLECTION_NAME
collection_version = constants.SWOT_REACH_COLLECTION_VERSION

if 'RiverSP_Node' in filename:
collection = constants.SWOT_NODE_COLLECTION_NAME
collection_version = constants.SWOT_NODE_COLLECTION_VERSION

if 'LakeSP_Prior' in filename:
collection = constants.SWOT_PRIOR_LAKE_COLLECTION_NAME
collection_version = constants.SWOT_PRIOR_LAKE_COLLECTION_VERSION
for table_info in constants.TABLE_COLLECTION_INFO:
if (table_info['feature_type'] in filename) & (table_info['collection_name'] in filepath):
collection = table_info['collection_name']

filename_attrs = {
'cycle_id': filename_components[5],
Expand Down Expand Up @@ -283,7 +277,7 @@ def load_benchmarking_data():
'continent_id': 'XX',
'range_end_time': '2024-12-31T23:59:00Z',
'crid': 'TEST',
'collection_shortname': constants.SWOT_REACH_COLLECTION_NAME
'collection_shortname': constants.TABLE_COLLECTION_INFO[0]['collection_name']
}

items = assemble_attributes(csv_file, filename_attrs)
Expand Down
137 changes: 45 additions & 92 deletions hydrocron/db/load_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,25 +42,14 @@ def lambda_handler(event, _): # noqa: E501 # pylint: disable=W0613
end_date = event['body']['end_date']
load_benchmarking_data = event['body']['load_benchmarking_data']

match table_name:
case constants.SWOT_REACH_TABLE_NAME:
collection_shortname = constants.SWOT_REACH_COLLECTION_NAME
track_table = constants.SWOT_REACH_TRACK_INGEST_TABLE_NAME
feature_type = 'Reach'
case constants.SWOT_NODE_TABLE_NAME:
collection_shortname = constants.SWOT_NODE_COLLECTION_NAME
track_table = constants.SWOT_NODE_TRACK_INGEST_TABLE_NAME
feature_type = 'Node'
case constants.SWOT_PRIOR_LAKE_TABLE_NAME:
collection_shortname = constants.SWOT_PRIOR_LAKE_COLLECTION_NAME
track_table = constants.SWOT_PRIOR_LAKE_TRACK_INGEST_TABLE_NAME
feature_type = 'LakeSP_Prior'
case constants.DB_TEST_TABLE_NAME:
collection_shortname = constants.SWOT_REACH_COLLECTION_NAME
track_table = constants.SWOT_REACH_TRACK_INGEST_TABLE_NAME
feature_type = 'Reach'
case _:
raise MissingTable(f"Hydrocron table '{table_name}' does not exist.")
for table_info in constants.TABLE_COLLECTION_INFO:
if table_info['table_name'] in table_name:
collection_shortname = table_info['collection_name']
track_table = table_info['track_table']
feature_type = table_info['feature_type']
break
else:
raise MissingTable(f"Error: Table does not exist: {table_name}")

logging.info("Searching for granules in collection %s", collection_shortname)

Expand Down Expand Up @@ -107,8 +96,6 @@ def granule_handler(event, _):
Second Lambda entrypoint for loading individual granules
"""
granule_path = event['body']['granule_path']
table_name = event['body']['table_name']
track_table = event['body']['track_table']

load_benchmarking_data = event['body']['load_benchmarking_data']

Expand All @@ -124,17 +111,16 @@ def granule_handler(event, _):
revision_date = "Not Found"
logging.info('No CNM revision date')

if ("Reach" in granule_path) & (table_name != constants.SWOT_REACH_TABLE_NAME):
raise TableMisMatch(f"Error: Cannot load Reach data into table: '{table_name}'")

if ("Node" in granule_path) & (table_name != constants.SWOT_NODE_TABLE_NAME):
raise TableMisMatch(f"Error: Cannot load Node data into table: '{table_name}'")

if ("LakeSP_Prior" in granule_path) & (table_name != constants.SWOT_PRIOR_LAKE_TABLE_NAME):
raise TableMisMatch(f"Error: Cannot load Prior Lake data into table: '{table_name}'")

if ("LakeSP_Obs" in granule_path) | ("LakeSP_Unassigned" in granule_path):
raise TableMisMatch(f"Error: Cannot load Observed or Unassigned Lake data into table: '{table_name}'")
raise MissingTable("Error: Cannot load Observed or Unassigned Lake data")

for table_info in constants.TABLE_COLLECTION_INFO:
if (table_info['collection_name'] in granule_path) & (table_info['feature_type'] in granule_path):
table_name = table_info['table']
track_table = table_info['track_table']
break
else:
raise MissingTable(f"Error: Cannot load granule: {granule_path}, no support for this collection")

logging.info("Value of load_benchmarking_data is: %s", load_benchmarking_data)

Expand Down Expand Up @@ -189,50 +175,27 @@ def cnm_handler(event, _):
granule_uri = files['uri']
checksum = files['checksum']

if 'Reach' in granule_uri:
event2 = ('{"body": {"granule_path": "' + granule_uri
+ '","table_name": "' + constants.SWOT_REACH_TABLE_NAME
+ '","track_table": "' + constants.SWOT_REACH_TRACK_INGEST_TABLE_NAME
+ '","checksum": "' + checksum
+ '","revisionDate": "' + revision_date
+ '","load_benchmarking_data": "' + load_benchmarking_data + '"}}')

logging.info("Invoking granule load lambda with event json %s", str(event2))
for table_info in constants.TABLE_COLLECTION_INFO:
if (table_info['collection_name'] in granule_uri) & (table_info['feature_type'] in granule_uri):
table_name = table_info['table_name']
track_table = table_info['track_table']
break
else:
raise MissingTable(f"Error: Cannot load granule: {granule_uri}")

lambda_client.invoke(
FunctionName=os.environ['GRANULE_LAMBDA_FUNCTION_NAME'],
InvocationType='Event',
Payload=event2)
event2 = ('{"body": {"granule_path": "' + granule_uri
+ '","table_name": "' + table_name
+ '","track_table": "' + track_table
+ '","checksum": "' + checksum
+ '","revisionDate": "' + revision_date
+ '","load_benchmarking_data": "' + load_benchmarking_data + '"}}')

if 'Node' in granule_uri:
event2 = ('{"body": {"granule_path": "' + granule_uri
+ '","table_name": "' + constants.SWOT_NODE_TABLE_NAME
+ '","track_table": "' + constants.SWOT_NODE_TRACK_INGEST_TABLE_NAME
+ '","checksum": "' + checksum
+ '","revisionDate": "' + revision_date
+ '","load_benchmarking_data": "' + load_benchmarking_data + '"}}')
logging.info("Invoking granule load lambda with event json %s", str(event2))

logging.info("Invoking granule load lambda with event json %s", str(event2))

lambda_client.invoke(
FunctionName=os.environ['GRANULE_LAMBDA_FUNCTION_NAME'],
InvocationType='Event',
Payload=event2)

if 'LakeSP_Prior' in granule_uri:
event2 = ('{"body": {"granule_path": "' + granule_uri
+ '","table_name": "' + constants.SWOT_PRIOR_LAKE_TABLE_NAME
+ '","track_table": "' + constants.SWOT_PRIOR_LAKE_TRACK_INGEST_TABLE_NAME
+ '","checksum": "' + checksum
+ '","revisionDate": "' + revision_date
+ '","load_benchmarking_data": "' + load_benchmarking_data + '"}}')

logging.info("Invoking granule load lambda with event json %s", str(event2))

lambda_client.invoke(
FunctionName=os.environ['GRANULE_LAMBDA_FUNCTION_NAME'],
InvocationType='Event',
Payload=event2)
lambda_client.invoke(
FunctionName=os.environ['GRANULE_LAMBDA_FUNCTION_NAME'],
InvocationType='Event',
Payload=event2)


def find_new_granules(collection_shortname, start_date, end_date):
Expand Down Expand Up @@ -337,27 +300,17 @@ def load_data(dynamo_resource, table_name, items):
raise MissingTable(f"Hydrocron table '{table_name}' does not exist.") from err
raise err

match hydrocron_table.table_name:
case constants.SWOT_REACH_TABLE_NAME:
feature_name = 'reach'
feature_id = feature_name + '_id'
case constants.SWOT_NODE_TABLE_NAME:
feature_name = 'node'
feature_id = feature_name + '_id'
case constants.SWOT_PRIOR_LAKE_TABLE_NAME:
feature_name = 'prior_lake'
feature_id = 'lake_id'
case constants.SWOT_REACH_TRACK_INGEST_TABLE_NAME:
feature_name = 'track ingest reaches'
for table_info in constants.TABLE_COLLECTION_INFO:
if hydrocron_table.table_name in table_info['track_table']:
feature_name = 'track ingest ' + str.lower(table_info['feature_type'])
feature_id = 'granuleUR'
case constants.SWOT_NODE_TRACK_INGEST_TABLE_NAME:
feature_name = 'track ingest nodes'
feature_id = 'granuleUR'
case constants.SWOT_PRIOR_LAKE_TRACK_INGEST_TABLE_NAME:
feature_name = 'track ingest prior lakes'
feature_id = 'granuleUR'
case _:
logging.warning('Items cannot be parsed, file reader not implemented for table %s', hydrocron_table.table_name)
break
if hydrocron_table.table_name in table_info['table_name']:
feature_name = table_info['feature_type']
feature_id = table_info['feature_id']
break
else:
raise MissingTable(f'Items cannot be parsed, file reader not implemented for table {hydrocron_table.table_name}')

if len(items) > 5:
logging.info("Batch adding %s %s items. First 5 feature ids in batch: ", len(items), feature_name)
Expand Down
18 changes: 7 additions & 11 deletions hydrocron/db/track_ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -401,17 +401,13 @@ def track_ingest_handler(event, context):
reprocessed_crid = event["reprocessed_crid"]
temporal = "temporal" in event.keys()

if ("reach" in collection_shortname) and ((hydrocron_table != constants.SWOT_REACH_TABLE_NAME)
or (hydrocron_track_table != constants.SWOT_REACH_TRACK_INGEST_TABLE_NAME)):
raise TableMisMatch(f"Error: Cannot query reach data for tables: '{hydrocron_table}' and '{hydrocron_track_table}'")

if ("node" in collection_shortname) and ((hydrocron_table != constants.SWOT_NODE_TABLE_NAME)
or (hydrocron_track_table != constants.SWOT_NODE_TRACK_INGEST_TABLE_NAME)):
raise TableMisMatch(f"Error: Cannot query node data for tables: '{hydrocron_table}' and '{hydrocron_track_table}'")

if ("prior" in collection_shortname) and ((hydrocron_table != constants.SWOT_PRIOR_LAKE_TABLE_NAME)
or (hydrocron_track_table != constants.SWOT_PRIOR_LAKE_TRACK_INGEST_TABLE_NAME)):
raise TableMisMatch(f"Error: Cannot query prior lake data for tables: '{hydrocron_table}' and '{hydrocron_track_table}'")
for table_info in constants.TABLE_COLLECTION_INFO:
if (table_info['collection_name'] in collection_shortname) & (str.lower(table_info['feature_type']) in collection_shortname):
hydrocron_table = table_info['table_name']
hydrocron_track_table = table_info['track_table']
break
else:
raise TableMisMatch(f"Error: Cannot query data for tables: '{hydrocron_table}' and '{hydrocron_track_table}'")

if temporal:
query_start = datetime.datetime.strptime(event["query_start"], "%Y-%m-%dT%H:%M:%S").replace(tzinfo=timezone.utc)
Expand Down
62 changes: 50 additions & 12 deletions hydrocron/utils/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
'../..', 'tests', 'data',
'SWOT_L2_HR_RiverSP_Reach_548_011_NA_20230610T193337_20230610T193344_PIA1_01.zip' # noqa E501
))
TEST_REACH_PATHNAME = (
"SWOT_L2_HR_RiverSP_2.0/SWOT_L2_HR_RiverSP_Reach_548_011_NA_20230610T193337_20230610T193344_PIA1_01.zip")

TEST_REACH_FILENAME = (
"SWOT_L2_HR_RiverSP_Reach_548_011_NA_"
Expand All @@ -31,6 +33,9 @@

DB_TEST_TABLE_NAME = "hydrocron-swot-test-table"
API_TEST_REACH_TABLE_NAME = "hydrocron-swot-reach-table"
API_TEST_NODE_TABLE_NAME = "hydrocron-swot-node-table"
TEST_REACH_COLLECTION_NAME = "SWOT_L2_HR_RiverSP_2.0"
TEST_REACH_TRACK_INGEST_TABLE_NAME = "hydrocron-swot-reach-track-ingest-table"
TEST_REACH_PARTITION_KEY_NAME = 'reach_id'
TEST_REACH_SORT_KEY_NAME = 'range_start_time'
TEST_REACH_ID_VALUE = '71224100223'
Expand All @@ -47,6 +52,9 @@
'SWOT_L2_HR_LakeSP_Prior_018_100_GR_20240713T111741_20240713T112027_PIC0_01.zip' # noqa E501
))

TEST_PLAKE_PATHNAME = (
"SWOT_L2_HR_LakeSP_2.0/SWOT_L2_HR_LakeSP_Prior_018_100_GR_20240713T111741_20240713T112027_PIC0_01.zip")

TEST_PLAKE_FILENAME = (
"SWOT_L2_HR_LakeSP_Prior_018_100_GR_20240713T111741_20240713T112027_PIC0_01.zip")

Expand Down Expand Up @@ -113,6 +121,7 @@

DB_TEST_PLAKE_TABLE_NAME = "hydrocron-swot-testlake-table"
API_TEST_PLAKE_TABLE_NAME = "hydrocron-swot-prior-lake-table"
TEST_PLAKE_COLLECTION_NAME = "SWOT_L2_HR_LakeSP_2.0"
TEST_PLAKE_PARTITION_KEY_NAME = 'lake_id'
TEST_PLAKE_SORT_KEY_NAME = 'range_start_time'
TEST_PLAKE_ID_VALUE = '9130047472'
Expand All @@ -125,19 +134,48 @@
# ------------ #
# PROD CONSTANTS #
# ------------ #
SWOT_REACH_TABLE_NAME = "hydrocron-swot-reach-table"
SWOT_NODE_TABLE_NAME = "hydrocron-swot-node-table"
SWOT_PRIOR_LAKE_TABLE_NAME = "hydrocron-swot-prior-lake-table"
SWOT_REACH_TRACK_INGEST_TABLE_NAME = "hydrocron-swot-reach-track-ingest-table"
SWOT_NODE_TRACK_INGEST_TABLE_NAME = "hydrocron-swot-node-track-ingest-table"
SWOT_PRIOR_LAKE_TRACK_INGEST_TABLE_NAME = "hydrocron-swot-prior-lake-track-ingest-table"
TABLE_COLLECTION_INFO = [
{'collection_name': 'SWOT_L2_HR_RiverSP_2.0',
'table_name': 'hydrocron-swot-reach-table',
'track_table': 'hydrocron-swot-reach-track-ingest-table',
'feature_type': 'Reach',
'feature_id': 'reach_id'
},
{'collection_name': 'SWOT_L2_HR_RiverSP_2.0',
'table_name': 'hydrocron-swot-node-table',
'track_table': 'hydrocron-swot-node-track-ingest-table',
'feature_type': 'Node',
'feature_id': 'node_id'
},
{'collection_name': 'SWOT_L2_HR_LakeSP_2.0',
'table_name': 'hydrocron-swot-prior-lake-table',
'track_table': 'hydrocron-swot-prior-lake-track-ingest-table',
'feature_type': 'LakeSP_Prior',
'feature_id': 'lake_id'
},
{'collection_name': 'SWOT_L2_HR_RiverSP_D',
'table_name': 'hydrocron-SWOT_L2_HR_RiverSP_D-reach-table',
'track_table': 'hydrocron-SWOT_L2_HR_RiverSP_D-reach-track-ingest',
'feature_type': 'Reach',
'feature_id': 'reach_id'
},
{'collection_name': 'SWOT_L2_HR_RiverSP_D',
'table_name': 'hydrocron-SWOT_L2_HR_RiverSP_D-node-table',
'track_table': 'hydrocron-SWOT_L2_HR_RiverSP_D-node-track-ingest',
'feature_type': 'Node',
'feature_id': 'node_id'
},
{'collection_name': 'SWOT_L2_HR_LakeSP_D',
'table_name': 'hydrocron-SWOT_L2_HR_LakeSP_D-prior-lake-table',
'track_table': 'hydrocron-SWOT_L2_HR_LakeSP_D-prior-lake-track-ingest',
'feature_type': 'LakeSP_Prior',
'feature_id': 'lake_id'
}
]
SWOT_REACH_TABLE_NAME = 'hydrocron-swot-reach-table'
SWOT_NODE_TABLE_NAME = 'hydrocron-swot-node-table'
SWOT_PRIOR_LAKE_TABLE_NAME = 'hydrocron-swot-prior-lake-table'

SWOT_REACH_COLLECTION_NAME = "SWOT_L2_HR_RiverSP_2.0"
SWOT_NODE_COLLECTION_NAME = "SWOT_L2_HR_RiverSP_2.0"
SWOT_PRIOR_LAKE_COLLECTION_NAME = "SWOT_L2_HR_LakeSP_2.0"
SWOT_REACH_COLLECTION_VERSION = SWOT_REACH_COLLECTION_NAME[19:]
SWOT_NODE_COLLECTION_VERSION = SWOT_NODE_COLLECTION_NAME[19:]
SWOT_PRIOR_LAKE_COLLECTION_VERSION = SWOT_PRIOR_LAKE_COLLECTION_NAME[18:]
SWOT_PRIOR_LAKE_FILL_GEOMETRY_COORDS = (
(-31.286028054129474, -27.207309600925463),
(-22.19117572552625, -28.812946226841383),
Expand Down
Loading