diff --git a/broker/cloud_run/lsst/ps_to_storage/main.py b/broker/cloud_run/lsst/ps_to_storage/main.py index 8744aa3ef..ec80a34ba 100644 --- a/broker/cloud_run/lsst/ps_to_storage/main.py +++ b/broker/cloud_run/lsst/ps_to_storage/main.py @@ -95,60 +95,56 @@ def run(): def store_alert_data(envelope) -> None: """Uploads the msg data bytes to a GCP storage bucket.""" + # create an alert object from the envelope + alert = _unpack_alert(envelope) + + blob = bucket.blob(_generate_alert_filename(alert)) + blob.metadata = create_file_metadata(alert, event_id=envelope["message"]["messageId"]) + + # raise a PreconditionFailed exception if filename already exists in the bucket using "if_generation_match=0" + # let it raise. the main function will catch it and then drop the message. + blob.upload_from_string(base64.b64decode(envelope["message"]["data"]), if_generation_match=0) + + # publish alerts to appropriate Pub/Sub topics + TOPIC_ALERTS.publish(alert) # not a duplicate, publish the broker's main "alerts" stream + TOPIC_BIGQUERY_IMPORT.publish( + _reformat_alert_data_to_valid_json(alert) + ) # publish the alert as a JSON message to the bigquery-import topic + + +def _unpack_alert(envelope) -> pittgoogle.Alert: + """Unpacks an alert from a base64-encoded message envelope and deserializes it into a `pittgoogle.Alert` object. + Parameters + ---------- + envelope : dict + A dictionary containing the message envelope. + Returns + ------- + pittgoogle.Alert: The alert object. + """ + alert_bytes = base64.b64decode(envelope["message"]["data"]) # alert packet, bytes attributes = envelope["message"].get("attributes", {}) + content_bytes = io.BytesIO(alert_bytes[5:]) - # unpack the alert and read schema ID + # unpack the alert and create an alert dictionary header_bytes = alert_bytes[:5] schema_id = deserialize_confluent_wire_header(header_bytes) - - # get and load schema schema = sr_client.get_schema(schema_id=schema_id) parse_schema = json.loads(schema.schema_str) schema_version = parse_schema["namespace"].split(".")[1] - content_bytes = io.BytesIO(alert_bytes[5:]) - - # deserialize the alert + schema_name = parse_schema["namespace"] + ".alert" # returns lsst.v7_x.alert string alert_dict = fastavro.schemaless_reader(content_bytes, parse_schema) - # convert the MJD timestamp to "YYYY-MM-DD" - time_obj = Time(alert_dict["diaSource"]["midpointMjdTai"], format="mjd") - alert_date = time_obj.datetime.strftime("%Y-%m-%d") - - filename = generate_alert_filename( - { - "schema_version": schema_version, - "alert_date": alert_date, - "objectId": alert_dict["diaObject"]["diaObjectId"], - "sourceId": alert_dict["diaSource"]["diaSourceId"], - "format": "avro", - } - ) - - blob = bucket.blob(filename) - blob.metadata = create_file_metadata(alert_dict, event_id=envelope["message"]["messageId"]) - - # raise a PreconditionFailed exception if filename already exists in the bucket using "if_generation_match=0" - # let it raise. the main function will catch it and then drop the message. - blob.upload_from_string(alert_bytes, if_generation_match=0) - - # Cloud Storage says this is not a duplicate, so now we publish the broker's main "alerts" stream - publish_alerts_stream( - topic_name=TOPIC_ALERTS.name, - message=alert_bytes, + return pittgoogle.Alert.from_dict( + payload=alert_dict, attributes={ "diaObjectId": str(alert_dict["diaObject"]["diaObjectId"]), "diaSourceId": str(alert_dict["diaSource"]["diaSourceId"]), "schema_version": schema_version, **attributes, }, - ) - - # publish the alert as a JSON message to the bigquery-import topic - TOPIC_BIGQUERY_IMPORT.publish( - _reformat_alert_data_to_valid_json( - alert_dict, attributes={"schema_version": schema_version} - ) + schema_name=schema_name, ) @@ -170,73 +166,45 @@ def deserialize_confluent_wire_header(raw): return version -def generate_alert_filename(aname: dict) -> str: - """ - Generate the filename of an alert stored to a Cloud Storage bucket. - - Args: - aname: - Components to create the filename. Required key/value pairs are those needed to create a parsed filename. - Extra keys are ignored. - - Returns: - str: The formatted filename as "{schema_version}/{YYYY-MM-DD}/{objectId}/{sourceId}.{format}". +def _generate_alert_filename(alert: pittgoogle.Alert) -> str: + """Generate the filename of an alert stored to a Cloud Storage bucket. + Parameters + ---------- + alert : pittgoogle.Alert + The alert object. + Returns + ------- + str: The formatted filename as "{schema_version}/{YYYY-MM-DD}/{diaObjectId}/{diaSourceId}.{format}". """ + time_obj = Time(alert.get("mjd"), format="mjd") + alert_date = time_obj.datetime.strftime( + "%Y-%m-%d" + ) # convert the MJD timestamp to "YYYY-MM-DD" - schema_version = aname["schema_version"] - alert_date = aname["alert_date"] - object_id = aname["objectId"] - source_id = aname["sourceId"] - file_format = aname["format"] + return f"{alert.attributes.get('schema_version')}/{alert_date}/{alert.objectid}/{alert.sourceid}.avro" - return f"{schema_version}/{alert_date}/{object_id}/{source_id}.{file_format}" - -def create_file_metadata(alert_dict: dict, event_id: str) -> dict: +def create_file_metadata(alert: pittgoogle.Alert, event_id: str) -> dict: """Return key/value pairs to be attached to the file as metadata.""" metadata = {"file_origin_message_id": event_id} - metadata["diaObjectId"] = alert_dict["diaObject"]["diaObjectId"] - metadata["diaSourceId"] = alert_dict["diaSource"]["diaSourceId"] - metadata["ra"] = alert_dict["diaSource"]["ra"] - metadata["dec"] = alert_dict["diaSource"]["dec"] + metadata["diaObjectId"] = alert.objectid + metadata["diaSourceId"] = alert.sourceid + metadata["ra"] = alert.get("ra") + metadata["dec"] = alert.get("dec") return metadata -def publish_alerts_stream( - topic_name: str, message: bytes, attributes: Optional[dict] = None -) -> str: - """Publish original alert bytes to a Pub/Sub topic.""" - - # enforce bytes type for message - if not isinstance(message, bytes): - raise TypeError("`message` must be bytes.") - - topic_path = publisher.topic_path(PROJECT_ID, topic_name) - future = publisher.publish(topic_path, data=message, **attributes) - - return future.result() - - -def _reformat_alert_data_to_valid_json( - alert_dict: dict, attributes: dict -) -> pittgoogle.alert.Alert: +def _reformat_alert_data_to_valid_json(alert: pittgoogle.alert) -> pittgoogle.alert.Alert: """Creates an Alert object whose data will be published as a valid JSON message.""" - # cutouts are sent as bytes; define and remove them - cutouts = [ - "cutoutTemplate", - "cutoutScience", - "cutoutDifference", - ] - for key in cutouts: - alert_dict.pop(key, None) - - # alert may contain NaN values; replace them with None - valid_json_dict = _reformat_nan_in_alert_dict(alert_dict) + cutouts_removed = alert.drop_cutouts() # remove cutouts + valid_json_dict = _reformat_nan_in_alert_dict( + cutouts_removed.dict + ) # replace NaN values with None - return pittgoogle.Alert.from_dict(payload=valid_json_dict, attributes=attributes) + return pittgoogle.Alert.from_dict(payload=valid_json_dict, attributes=alert.attributes) def _reformat_nan_in_alert_dict(alert_dict: Dict[str, Any]) -> Dict[str, Any]: diff --git a/broker/cloud_run/lsst/ps_to_storage/requirements.txt b/broker/cloud_run/lsst/ps_to_storage/requirements.txt index 89f0010df..b510695bd 100644 --- a/broker/cloud_run/lsst/ps_to_storage/requirements.txt +++ b/broker/cloud_run/lsst/ps_to_storage/requirements.txt @@ -8,7 +8,7 @@ fastavro google-cloud-logging google-cloud-storage httpx # used by confluent-kafka -pittgoogle-client>=0.3.11 +pittgoogle-client>=0.3.12 # for Cloud Run # https://cloud.google.com/run/docs/quickstarts/build-and-deploy/deploy-python-service