diff --git a/broker/cloud_run/lsst/ps_to_storage/main.py b/broker/cloud_run/lsst/ps_to_storage/main.py index ec80a34b..44acbe06 100644 --- a/broker/cloud_run/lsst/ps_to_storage/main.py +++ b/broker/cloud_run/lsst/ps_to_storage/main.py @@ -9,7 +9,7 @@ import math import os import struct -from typing import Any, Dict, Optional +from typing import Any, Dict from astropy.time import Time import flask @@ -52,7 +52,6 @@ client = storage.Client() bucket = client.get_bucket(client.bucket(bucket_name, user_project=PROJECT_ID)) -publisher = TOPIC_ALERTS.client # define a binary data structure for packing and unpacking bytes _ConfluentWireFormatHeader = struct.Struct(">bi") @@ -99,17 +98,17 @@ def store_alert_data(envelope) -> None: alert = _unpack_alert(envelope) blob = bucket.blob(_generate_alert_filename(alert)) - blob.metadata = create_file_metadata(alert, event_id=envelope["message"]["messageId"]) + blob.metadata = _create_file_metadata(alert, event_id=envelope["message"]["messageId"]) # raise a PreconditionFailed exception if filename already exists in the bucket using "if_generation_match=0" # let it raise. the main function will catch it and then drop the message. blob.upload_from_string(base64.b64decode(envelope["message"]["data"]), if_generation_match=0) # publish alerts to appropriate Pub/Sub topics - TOPIC_ALERTS.publish(alert) # not a duplicate, publish the broker's main "alerts" stream + TOPIC_ALERTS.publish(alert) # deduplicated "alerts" stream TOPIC_BIGQUERY_IMPORT.publish( _reformat_alert_data_to_valid_json(alert) - ) # publish the alert as a JSON message to the bigquery-import topic + ) # alert as valid JSON message def _unpack_alert(envelope) -> pittgoogle.Alert: @@ -133,7 +132,7 @@ def _unpack_alert(envelope) -> pittgoogle.Alert: schema = sr_client.get_schema(schema_id=schema_id) parse_schema = json.loads(schema.schema_str) schema_version = parse_schema["namespace"].split(".")[1] - schema_name = parse_schema["namespace"] + ".alert" # returns lsst.v7_x.alert string + schema_name = parse_schema["namespace"] + ".alert" # returns "lsst.v7_x.alert" alert_dict = fastavro.schemaless_reader(content_bytes, parse_schema) return pittgoogle.Alert.from_dict( @@ -184,7 +183,7 @@ def _generate_alert_filename(alert: pittgoogle.Alert) -> str: return f"{alert.attributes.get('schema_version')}/{alert_date}/{alert.objectid}/{alert.sourceid}.avro" -def create_file_metadata(alert: pittgoogle.Alert, event_id: str) -> dict: +def _create_file_metadata(alert: pittgoogle.Alert, event_id: str) -> dict: """Return key/value pairs to be attached to the file as metadata.""" metadata = {"file_origin_message_id": event_id} @@ -199,10 +198,9 @@ def create_file_metadata(alert: pittgoogle.Alert, event_id: str) -> dict: def _reformat_alert_data_to_valid_json(alert: pittgoogle.alert) -> pittgoogle.alert.Alert: """Creates an Alert object whose data will be published as a valid JSON message.""" - cutouts_removed = alert.drop_cutouts() # remove cutouts - valid_json_dict = _reformat_nan_in_alert_dict( - cutouts_removed.dict - ) # replace NaN values with None + # transform data + cutouts_removed = alert.drop_cutouts() + valid_json_dict = _reformat_nan_in_alert_dict(cutouts_removed) return pittgoogle.Alert.from_dict(payload=valid_json_dict, attributes=alert.attributes)