Skip to content

Commit

Permalink
update comment and fix bug
Browse files Browse the repository at this point in the history
  • Loading branch information
hernandezc1 committed Mar 10, 2025
1 parent de74efa commit c615ba7
Showing 1 changed file with 9 additions and 11 deletions.
20 changes: 9 additions & 11 deletions broker/cloud_run/lsst/ps_to_storage/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import math
import os
import struct
from typing import Any, Dict, Optional
from typing import Any, Dict
from astropy.time import Time

import flask
Expand Down Expand Up @@ -52,7 +52,6 @@

client = storage.Client()
bucket = client.get_bucket(client.bucket(bucket_name, user_project=PROJECT_ID))
publisher = TOPIC_ALERTS.client

# define a binary data structure for packing and unpacking bytes
_ConfluentWireFormatHeader = struct.Struct(">bi")
Expand Down Expand Up @@ -99,17 +98,17 @@ def store_alert_data(envelope) -> None:
alert = _unpack_alert(envelope)

blob = bucket.blob(_generate_alert_filename(alert))
blob.metadata = create_file_metadata(alert, event_id=envelope["message"]["messageId"])
blob.metadata = _create_file_metadata(alert, event_id=envelope["message"]["messageId"])

# raise a PreconditionFailed exception if filename already exists in the bucket using "if_generation_match=0"
# let it raise. the main function will catch it and then drop the message.
blob.upload_from_string(base64.b64decode(envelope["message"]["data"]), if_generation_match=0)

# publish alerts to appropriate Pub/Sub topics
TOPIC_ALERTS.publish(alert) # not a duplicate, publish the broker's main "alerts" stream
TOPIC_ALERTS.publish(alert) # deduplicated "alerts" stream
TOPIC_BIGQUERY_IMPORT.publish(
_reformat_alert_data_to_valid_json(alert)
) # publish the alert as a JSON message to the bigquery-import topic
) # alert as valid JSON message


def _unpack_alert(envelope) -> pittgoogle.Alert:
Expand All @@ -133,7 +132,7 @@ def _unpack_alert(envelope) -> pittgoogle.Alert:
schema = sr_client.get_schema(schema_id=schema_id)
parse_schema = json.loads(schema.schema_str)
schema_version = parse_schema["namespace"].split(".")[1]
schema_name = parse_schema["namespace"] + ".alert" # returns lsst.v7_x.alert string
schema_name = parse_schema["namespace"] + ".alert" # returns "lsst.v7_x.alert"
alert_dict = fastavro.schemaless_reader(content_bytes, parse_schema)

return pittgoogle.Alert.from_dict(
Expand Down Expand Up @@ -184,7 +183,7 @@ def _generate_alert_filename(alert: pittgoogle.Alert) -> str:
return f"{alert.attributes.get('schema_version')}/{alert_date}/{alert.objectid}/{alert.sourceid}.avro"


def create_file_metadata(alert: pittgoogle.Alert, event_id: str) -> dict:
def _create_file_metadata(alert: pittgoogle.Alert, event_id: str) -> dict:
"""Return key/value pairs to be attached to the file as metadata."""

metadata = {"file_origin_message_id": event_id}
Expand All @@ -199,10 +198,9 @@ def create_file_metadata(alert: pittgoogle.Alert, event_id: str) -> dict:
def _reformat_alert_data_to_valid_json(alert: pittgoogle.alert) -> pittgoogle.alert.Alert:
"""Creates an Alert object whose data will be published as a valid JSON message."""

cutouts_removed = alert.drop_cutouts() # remove cutouts
valid_json_dict = _reformat_nan_in_alert_dict(
cutouts_removed.dict
) # replace NaN values with None
# transform data
cutouts_removed = alert.drop_cutouts()
valid_json_dict = _reformat_nan_in_alert_dict(cutouts_removed)

return pittgoogle.Alert.from_dict(payload=valid_json_dict, attributes=alert.attributes)

Expand Down

0 comments on commit c615ba7

Please sign in to comment.