Skip to content

Commit

Permalink
fix bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
hernandezc1 committed Mar 10, 2025
1 parent c615ba7 commit 4973e23
Showing 1 changed file with 27 additions and 11 deletions.
38 changes: 27 additions & 11 deletions broker/cloud_run/lsst/ps_to_storage/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import math
import os
import struct
from typing import Any, Dict
from typing import Any, Dict, Optional
from astropy.time import Time

import flask
Expand Down Expand Up @@ -52,6 +52,7 @@

client = storage.Client()
bucket = client.get_bucket(client.bucket(bucket_name, user_project=PROJECT_ID))
publisher = TOPIC_ALERTS.client

# define a binary data structure for packing and unpacking bytes
_ConfluentWireFormatHeader = struct.Struct(">bi")
Expand Down Expand Up @@ -104,11 +105,17 @@ def store_alert_data(envelope) -> None:
# let it raise. the main function will catch it and then drop the message.
blob.upload_from_string(base64.b64decode(envelope["message"]["data"]), if_generation_match=0)

json_dict = _reformat_alert_data_to_valid_json(alert)

# publish alerts to appropriate Pub/Sub topics
TOPIC_ALERTS.publish(alert) # deduplicated "alerts" stream
TOPIC_BIGQUERY_IMPORT.publish(
_reformat_alert_data_to_valid_json(alert)
) # alert as valid JSON message
publish_valid_json_stream(
topic_name=TOPIC_BIGQUERY_IMPORT.name,
message=json_dict,
attributes={
"schema_version": alert.attributes.get("schema_version"),
},
)


def _unpack_alert(envelope) -> pittgoogle.Alert:
Expand Down Expand Up @@ -195,14 +202,9 @@ def _create_file_metadata(alert: pittgoogle.Alert, event_id: str) -> dict:
return metadata


def _reformat_alert_data_to_valid_json(alert: pittgoogle.alert) -> pittgoogle.alert.Alert:
def _reformat_alert_data_to_valid_json(alert: pittgoogle.Alert) -> dict:
"""Creates an Alert object whose data will be published as a valid JSON message."""

# transform data
cutouts_removed = alert.drop_cutouts()
valid_json_dict = _reformat_nan_in_alert_dict(cutouts_removed)

return pittgoogle.Alert.from_dict(payload=valid_json_dict, attributes=alert.attributes)
return _reformat_nan_in_alert_dict(alert.drop_cutouts())


def _reformat_nan_in_alert_dict(alert_dict: Dict[str, Any]) -> Dict[str, Any]:
Expand All @@ -219,3 +221,17 @@ def _replace_nan_values_with_none(value: Any) -> Any:
if isinstance(value, float) and math.isnan(value):
return None
return value


def publish_valid_json_stream(
topic_name: str, message: dict, attributes: Optional[dict] = None
) -> str:
"""Publish alert data to a Pub/Sub topic as a valid JSON message."""

message_json = json.dumps(message, default=str)
message_bytes = message_json.encode("utf-8")

topic_path = publisher.topic_path(PROJECT_ID, topic_name)
future = publisher.publish(topic_path, data=message_bytes, **attributes)

return future.result()

0 comments on commit 4973e23

Please sign in to comment.