Skip to content

Commit

Permalink
Merge branch 'main' of github.com:lifemapper/bison
Browse files Browse the repository at this point in the history
  • Loading branch information
zzeppozz committed Oct 31, 2024
2 parents 7ce43fc + 809ee06 commit 4223f67
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 47 deletions.
90 changes: 61 additions & 29 deletions aws/lambda/bison_s1_load_ancillary_lambda.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,14 @@
dt = datetime.now()
yr = dt.year
mo = dt.month
bison_datestr = f"{yr}_{mo:02d}_01"
prev_yr = yr
prev_mo = mo - 1
if mo == 1:
prev_mo = 12
prev_yr = yr - 1
gbif_datestr = f"{yr}-{mo:02d}-01"
bison_datestr = f"{yr}_{mo:02d}_01"
old_bison_datestr = f"{prev_yr}_{prev_mo:02d}_01"

# .............................................................................
# AWS constants
Expand All @@ -28,20 +34,12 @@
WORKFLOW_ROLE_ARN = f"arn:aws:iam::{PROJECT}:role/service-role/{WORKFLOW_ROLE_NAME}"
WORKFLOW_USER = f"project.{PROJECT}"

# EC2 launch template/version
EC2_SPOT_TEMPLATE = "bison_spot_task_template"
TASK = "test_task"

# S3 locations
S3_BUCKET = f"{PROJECT}-{AWS_ACCOUNT}-{REGION}"
S3_IN_DIR = "input"
S3_OUT_DIR = "output"
S3_LOG_DIR = "log"
S3_SUMMARY_DIR = "summary"
RIIS_BASENAME = "USRIISv2_MasterList"
riis_fname = f"{RIIS_BASENAME}_annotated_{bison_datestr}.csv"
# ancillary_data["riis"]["filename"]
annotated_riis_key = f"{S3_IN_DIR}/{riis_fname}"

# Redshift
# namespace, workgroup both = 'bison'
Expand All @@ -51,9 +49,6 @@
external_schema = "redshift_spectrum"
# Wait time for completion of Redshift command
waittime = 5
# Name the Redshift mounted gbif data and bison table to create from it
bison_tbl = f"{pub_schema}.bison_{bison_datestr}"
mounted_gbif_name = f"{external_schema}.occurrence_{bison_datestr}_parquet"

# .............................................................................
# Initialize Botocore session and clients
Expand All @@ -65,11 +60,18 @@
# Initialize Redshift client
config = Config(connect_timeout=timeout, read_timeout=timeout)
s3_client = session.client("s3", config=config, region_name=REGION)
rs_client = session.client("redshift", config=config)
rs_client = session.client("redshift-data", config=config)

# .............................................................................
# Ancillary data parameters
# .............................................................................
RIIS_BASENAME = "USRIISv2_MasterList"
riis_fname = f"{RIIS_BASENAME}_annotated_{bison_datestr}.csv"
annotated_riis_key = f"{S3_IN_DIR}/{riis_fname}"
old_annotated_riis_key = f"{S3_IN_DIR}/{RIIS_BASENAME}_annotated_{old_bison_datestr}.csv"
riis_tbl = f"riisv2_{bison_datestr}"
old_riis_tbl = f"riisv2_{old_bison_datestr}"

# Each fields tuple contains original fieldname, bison fieldname and bison fieldtype
ancillary_data = {
"aiannh": {
Expand All @@ -91,7 +93,7 @@
}
},
"riis": {
"table": f"riisv2_{bison_datestr}",
"table": riis_tbl,
"filename": riis_fname,
"fields": {
"locality": ("locality", "riis_region", "VARCHAR(3)"),
Expand All @@ -101,13 +103,16 @@
}
}

riis_tbl = ancillary_data["riis"]["table"]
aiannh_fname = ancillary_data["aiannh"]["filename"]
aiannh_tbl = ancillary_data["aiannh"]["table"]
county_fname = ancillary_data["county"]["filename"]
county_tbl = ancillary_data["county"]["table"]
tables_required = []

# .............................................................................
# Commands
# .............................................................................
query_tables_stmt = f"SHOW TABLES FROM SCHEMA {database}.{pub_schema};"

aiannh_fname = ancillary_data["aiannh"]["filename"]
create_aiannh_stmt = f"""
CREATE TABLE {aiannh_tbl} (
shape GEOMETRY,
Expand All @@ -130,7 +135,6 @@
IAM_role DEFAULT;
"""

county_fname = ancillary_data["county"]["filename"]
create_county_stmt = f"""
CREATE TABLE {county_tbl} (
shape GEOMETRY,
Expand Down Expand Up @@ -200,8 +204,11 @@ class VARCHAR(max),
FORMAT CSV
IAM_role DEFAULT;
"""
rm_old_riis_stmt = f"DROP TABLE IF EXISTS {pub_schema}.{old_riis_tbl};"

COMMANDS = [
# Before Redshift commands, check that current RIIS data exists on S3
REDSHIFT_COMMANDS = [
("remove_obsolete_riis", rm_old_riis_stmt, None),
("query_tables", query_tables_stmt, None),
("create_aiannh", create_aiannh_stmt, aiannh_tbl),
("fill_aiannh", fill_aiannh_stmt, aiannh_tbl),
Expand All @@ -210,6 +217,7 @@ class VARCHAR(max),
("create_riis", create_riis_stmt, riis_tbl),
("fill_riis", fill_riis_stmt, riis_tbl)
]
# After successful load, remove old RIIS data from S3


# --------------------------------------------------------------------------------------
Expand All @@ -226,8 +234,9 @@ def lambda_handler(event, context):
Raises:
Exception: on failure to execute Redshift command.
"""
output = []
# -------------------------------------
# FIRST: Check that current RIIS annotated data is on S3
# FIRST: Check that current RIIS annotated data exists on S3
# -------------------------------------
try:
tr_response = s3_client.list_objects_v2(
Expand All @@ -241,13 +250,17 @@ def lambda_handler(event, context):
raise Exception(
f"!!! Missing annotated RIIS data: {annotated_riis_key}")
else:
print(f"*** Found annotated RIIS data: {annotated_riis_key}")
msg = f"*** Found annotated RIIS data: {annotated_riis_key}"
output.append(msg)
print(msg)

# -------------------------------------
# NEXT: Load tables from S3 to Redshift
# NEXT: Delete obsolete table,
# list existing tables,
# load current tables from S3 to Redshift
# -------------------------------------
tables_present = []
for (cmd, stmt, tblname) in COMMANDS:
for (cmd, stmt, tblname) in REDSHIFT_COMMANDS:
if tblname is None or tblname not in tables_present:
# -------------------------------------
try:
Expand All @@ -256,9 +269,11 @@ def lambda_handler(event, context):
except Exception as e:
raise Exception(e)

print(f"*** {cmd.upper()} command submitted")
print(f"*** {stmt}")
submit_id = submit_result['Id']
msg = f"*** {cmd.upper()} command submitted with Id {submit_id}"
print(msg)
output.append(msg)
# print(f"*** {stmt}")

# -------------------------------------
# Loop til complete, then get result status
Expand Down Expand Up @@ -287,12 +302,12 @@ def lambda_handler(event, context):
elapsed_time += waittime

# -------------------------------------
# IFF query tables, create list of tables that are present
# First command queries tables and creates list of tables that are present
if cmd == "query_tables":
try:
stmt_result = rs_client.get_statement_result(Id=submit_id)
except Exception as e:
print(f"*** No get_statement_result {e}")
print(f"!!! No get_statement_result {e}")
else:
try:
records = stmt_result["Records"]
Expand All @@ -302,9 +317,26 @@ def lambda_handler(event, context):
# tablename is 2nd item in record
for rec in records:
tables_present.append(rec[2]['stringValue'])
print(f"*** Tables: {tables_present}")
msg = f"*** Tables in Redshift: {tables_present}"
output.append(msg)
print(msg)

# -------------------------------------
# Last: Remove obsolete RIIS annotated data from S3
# -------------------------------------
try:
rm_response = s3_client.delete_object(
Bucket=S3_BUCKET, Key=old_annotated_riis_key)

except Exception as e:
print(f"!!! Error deleting bucket/object {old_annotated_riis_key} ({e})")
raise e

msg = f"*** Deleted {old_annotated_riis_key} (if exists) with delete command."
output.append(msg)
print(msg)

return {
'statusCode': 200,
'body': json.dumps("Lambda result logged")
'body': json.dumps(output)
}
49 changes: 31 additions & 18 deletions aws/lambda/bison_s2_create_bison_lambda.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
if mo == 1:
prev_mo = 12
prev_yr = yr - 1
gbif_datestr = f"{yr}-{mo:02d}-01"
bison_datestr = f"{yr}_{mo:02d}_01"
old_bison_datestr = f"{prev_yr}_{prev_mo:02d}_01"
gbif_datestr = f"{yr}-{mo:02d}-01"

# .............................................................................
# AWS constants
Expand Down Expand Up @@ -53,8 +53,10 @@
external_schema = "redshift_spectrum"
# Wait time for completion of Redshift command
waittime = 5

# Name the Redshift mounted gbif data and bison table to create from it
bison_tbl = f"{pub_schema}.bison_{bison_datestr}"
old_bison_tbl = f"{pub_schema}.bison_{old_bison_datestr}"
mounted_gbif_name = f"{external_schema}.occurrence_{bison_datestr}_parquet"

# .............................................................................
Expand All @@ -66,10 +68,12 @@
session = boto3.Session(botocore_session=bc_session, region_name=REGION)
# Initialize Redshift client
config = Config(connect_timeout=timeout, read_timeout=timeout)
s3_client = session.client("s3", config=config, region_name=REGION)
rs_client = session.client("redshift", config=config)
rs_client = session.client("redshift-data", config=config)

COMMANDS = []
# .............................................................................
# Commands
# .............................................................................
query_tables_stmt = f"SHOW TABLES FROM SCHEMA {database}.{pub_schema};"

create_schema_stmt = f"""
CREATE EXTERNAL SCHEMA IF NOT EXISTS {external_schema}
Expand Down Expand Up @@ -158,9 +162,8 @@ class VARCHAR(max),
count_gbif_stmt = f"SELECT COUNT(*) from {mounted_gbif_name};"
count_bison_stmt = f"SELECT COUNT(*) FROM {bison_tbl};"
unmount_stmt = f"DROP TABLE {mounted_gbif_name};"
query_tables_stmt = f"SHOW TABLES FROM SCHEMA {database}.{pub_schema};"

COMMANDS = [
REDSHIFT_COMMANDS = [
("query_tables", query_tables_stmt),
("schema", create_schema_stmt),
# 2 secs
Expand All @@ -174,6 +177,13 @@ class VARCHAR(max),
# 1 secs
("unmount", unmount_stmt)
]
# .............................................................................
# Ancillary data parameters
# .............................................................................
RIIS_BASENAME = "USRIISv2_MasterList"
riis_fname = f"{RIIS_BASENAME}_annotated_{bison_datestr}.csv"
riis_tbl = f"riisv2_{bison_datestr}"

# Each fields tuple contains original fieldname, corresponding bison fieldname and type
ancillary_data = {
"aiannh": {
Expand All @@ -194,8 +204,8 @@ class VARCHAR(max),
}
},
"riis": {
"table": f"riisv2_{bison_datestr}",
"filename": f"USRIISv2_MasterList_annotated_{bison_datestr}.csv",
"table": riis_tbl,
"filename": riis_fname,
"fields": {
"locality": ("locality", "riis_region", "VARCHAR(3)"),
"occid": ("occurrenceid", "riis_occurrence_id", "VARCHAR(50)"),
Expand All @@ -209,7 +219,7 @@ class VARCHAR(max),
for (_orig_fld, bison_fld, bison_typ) in tbl["fields"].values():
# 1-2 secs
stmt = f"ALTER TABLE {bison_tbl} ADD COLUMN {bison_fld} {bison_typ} DEFAULT NULL;"
COMMANDS.append((f"add_{bison_fld}", stmt))
REDSHIFT_COMMANDS.append((f"add_{bison_fld}", stmt))


# --------------------------------------------------------------------------------------
Expand All @@ -226,11 +236,12 @@ def lambda_handler(event, context):
Raises:
Exception: on failure to execute Redshift command.
"""
output = []
# -------------------------------------
# No checks required
# Mount GBIF, subset to BISON table, add fields, all in Redshift
# -------------------------------------
for (cmd, stmt) in COMMANDS:
for (cmd, stmt) in REDSHIFT_COMMANDS:
# -------------------------------------
try:
submit_result = rs_client.execute_statement(
Expand All @@ -239,8 +250,9 @@ def lambda_handler(event, context):
raise Exception(e)

print("*** ......................")
print(f"*** {cmd.upper()} command submitted")
print(f"*** {stmt}")
msg = f"*** {cmd.upper()} command submitted"
output.append(msg)
print(msg)
submit_id = submit_result['Id']

# -------------------------------------
Expand All @@ -252,7 +264,7 @@ def lambda_handler(event, context):
describe_result = rs_client.describe_statement(Id=submit_id)
except Exception as e:
complete = True
print(f"Failed to describe_statement {e}")
print(f"!!! Failed to describe_statement {e}")
else:
status = describe_result["Status"]
if status in ("ABORTED", "FAILED", "FINISHED"):
Expand All @@ -263,7 +275,7 @@ def lambda_handler(event, context):
err = describe_result["Error"]
except Exception:
err = "Unknown Error"
print(f"*** FAILED: {err}")
print(f"!!! FAILED: {err}")
else:
time.sleep(waittime)
elapsed_time += waittime
Expand All @@ -286,11 +298,12 @@ def lambda_handler(event, context):
# tablename is 2nd item in record
for rec in records:
tables_present.append(rec[2]['stringValue'])
print(f"*** Tables: {tables_present}")
msg = f"*** Tables: {tables_present}"
else:
print(f"*** COUNT = {records[0][0]['longValue']}")

msg = f"*** COUNT = {records[0][0]['longValue']}"
print(msg)
output.append(msg)
return {
'statusCode': 200,
'body': json.dumps("Lambda result logged")
'body': json.dumps(output)
}

0 comments on commit 4223f67

Please sign in to comment.