Skip to content

Commit de74efa

Browse files
committed
update to pittgoogle-client v0.3.12 and update source code
1 parent aa3b896 commit de74efa

File tree

2 files changed

+60
-92
lines changed

2 files changed

+60
-92
lines changed

broker/cloud_run/lsst/ps_to_storage/main.py

Lines changed: 59 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -95,60 +95,56 @@ def run():
9595
def store_alert_data(envelope) -> None:
9696
"""Uploads the msg data bytes to a GCP storage bucket."""
9797

98+
# create an alert object from the envelope
99+
alert = _unpack_alert(envelope)
100+
101+
blob = bucket.blob(_generate_alert_filename(alert))
102+
blob.metadata = create_file_metadata(alert, event_id=envelope["message"]["messageId"])
103+
104+
# raise a PreconditionFailed exception if filename already exists in the bucket using "if_generation_match=0"
105+
# let it raise. the main function will catch it and then drop the message.
106+
blob.upload_from_string(base64.b64decode(envelope["message"]["data"]), if_generation_match=0)
107+
108+
# publish alerts to appropriate Pub/Sub topics
109+
TOPIC_ALERTS.publish(alert) # not a duplicate, publish the broker's main "alerts" stream
110+
TOPIC_BIGQUERY_IMPORT.publish(
111+
_reformat_alert_data_to_valid_json(alert)
112+
) # publish the alert as a JSON message to the bigquery-import topic
113+
114+
115+
def _unpack_alert(envelope) -> pittgoogle.Alert:
116+
"""Unpacks an alert from a base64-encoded message envelope and deserializes it into a `pittgoogle.Alert` object.
117+
Parameters
118+
----------
119+
envelope : dict
120+
A dictionary containing the message envelope.
121+
Returns
122+
-------
123+
pittgoogle.Alert: The alert object.
124+
"""
125+
98126
alert_bytes = base64.b64decode(envelope["message"]["data"]) # alert packet, bytes
99127
attributes = envelope["message"].get("attributes", {})
128+
content_bytes = io.BytesIO(alert_bytes[5:])
100129

101-
# unpack the alert and read schema ID
130+
# unpack the alert and create an alert dictionary
102131
header_bytes = alert_bytes[:5]
103132
schema_id = deserialize_confluent_wire_header(header_bytes)
104-
105-
# get and load schema
106133
schema = sr_client.get_schema(schema_id=schema_id)
107134
parse_schema = json.loads(schema.schema_str)
108135
schema_version = parse_schema["namespace"].split(".")[1]
109-
content_bytes = io.BytesIO(alert_bytes[5:])
110-
111-
# deserialize the alert
136+
schema_name = parse_schema["namespace"] + ".alert" # returns lsst.v7_x.alert string
112137
alert_dict = fastavro.schemaless_reader(content_bytes, parse_schema)
113138

114-
# convert the MJD timestamp to "YYYY-MM-DD"
115-
time_obj = Time(alert_dict["diaSource"]["midpointMjdTai"], format="mjd")
116-
alert_date = time_obj.datetime.strftime("%Y-%m-%d")
117-
118-
filename = generate_alert_filename(
119-
{
120-
"schema_version": schema_version,
121-
"alert_date": alert_date,
122-
"objectId": alert_dict["diaObject"]["diaObjectId"],
123-
"sourceId": alert_dict["diaSource"]["diaSourceId"],
124-
"format": "avro",
125-
}
126-
)
127-
128-
blob = bucket.blob(filename)
129-
blob.metadata = create_file_metadata(alert_dict, event_id=envelope["message"]["messageId"])
130-
131-
# raise a PreconditionFailed exception if filename already exists in the bucket using "if_generation_match=0"
132-
# let it raise. the main function will catch it and then drop the message.
133-
blob.upload_from_string(alert_bytes, if_generation_match=0)
134-
135-
# Cloud Storage says this is not a duplicate, so now we publish the broker's main "alerts" stream
136-
publish_alerts_stream(
137-
topic_name=TOPIC_ALERTS.name,
138-
message=alert_bytes,
139+
return pittgoogle.Alert.from_dict(
140+
payload=alert_dict,
139141
attributes={
140142
"diaObjectId": str(alert_dict["diaObject"]["diaObjectId"]),
141143
"diaSourceId": str(alert_dict["diaSource"]["diaSourceId"]),
142144
"schema_version": schema_version,
143145
**attributes,
144146
},
145-
)
146-
147-
# publish the alert as a JSON message to the bigquery-import topic
148-
TOPIC_BIGQUERY_IMPORT.publish(
149-
_reformat_alert_data_to_valid_json(
150-
alert_dict, attributes={"schema_version": schema_version}
151-
)
147+
schema_name=schema_name,
152148
)
153149

154150

@@ -170,73 +166,45 @@ def deserialize_confluent_wire_header(raw):
170166
return version
171167

172168

173-
def generate_alert_filename(aname: dict) -> str:
174-
"""
175-
Generate the filename of an alert stored to a Cloud Storage bucket.
176-
177-
Args:
178-
aname:
179-
Components to create the filename. Required key/value pairs are those needed to create a parsed filename.
180-
Extra keys are ignored.
181-
182-
Returns:
183-
str: The formatted filename as "{schema_version}/{YYYY-MM-DD}/{objectId}/{sourceId}.{format}".
169+
def _generate_alert_filename(alert: pittgoogle.Alert) -> str:
170+
"""Generate the filename of an alert stored to a Cloud Storage bucket.
171+
Parameters
172+
----------
173+
alert : pittgoogle.Alert
174+
The alert object.
175+
Returns
176+
-------
177+
str: The formatted filename as "{schema_version}/{YYYY-MM-DD}/{diaObjectId}/{diaSourceId}.{format}".
184178
"""
179+
time_obj = Time(alert.get("mjd"), format="mjd")
180+
alert_date = time_obj.datetime.strftime(
181+
"%Y-%m-%d"
182+
) # convert the MJD timestamp to "YYYY-MM-DD"
185183

186-
schema_version = aname["schema_version"]
187-
alert_date = aname["alert_date"]
188-
object_id = aname["objectId"]
189-
source_id = aname["sourceId"]
190-
file_format = aname["format"]
184+
return f"{alert.attributes.get('schema_version')}/{alert_date}/{alert.objectid}/{alert.sourceid}.avro"
191185

192-
return f"{schema_version}/{alert_date}/{object_id}/{source_id}.{file_format}"
193186

194-
195-
def create_file_metadata(alert_dict: dict, event_id: str) -> dict:
187+
def create_file_metadata(alert: pittgoogle.Alert, event_id: str) -> dict:
196188
"""Return key/value pairs to be attached to the file as metadata."""
197189

198190
metadata = {"file_origin_message_id": event_id}
199-
metadata["diaObjectId"] = alert_dict["diaObject"]["diaObjectId"]
200-
metadata["diaSourceId"] = alert_dict["diaSource"]["diaSourceId"]
201-
metadata["ra"] = alert_dict["diaSource"]["ra"]
202-
metadata["dec"] = alert_dict["diaSource"]["dec"]
191+
metadata["diaObjectId"] = alert.objectid
192+
metadata["diaSourceId"] = alert.sourceid
193+
metadata["ra"] = alert.get("ra")
194+
metadata["dec"] = alert.get("dec")
203195

204196
return metadata
205197

206198

207-
def publish_alerts_stream(
208-
topic_name: str, message: bytes, attributes: Optional[dict] = None
209-
) -> str:
210-
"""Publish original alert bytes to a Pub/Sub topic."""
211-
212-
# enforce bytes type for message
213-
if not isinstance(message, bytes):
214-
raise TypeError("`message` must be bytes.")
215-
216-
topic_path = publisher.topic_path(PROJECT_ID, topic_name)
217-
future = publisher.publish(topic_path, data=message, **attributes)
218-
219-
return future.result()
220-
221-
222-
def _reformat_alert_data_to_valid_json(
223-
alert_dict: dict, attributes: dict
224-
) -> pittgoogle.alert.Alert:
199+
def _reformat_alert_data_to_valid_json(alert: pittgoogle.alert) -> pittgoogle.alert.Alert:
225200
"""Creates an Alert object whose data will be published as a valid JSON message."""
226201

227-
# cutouts are sent as bytes; define and remove them
228-
cutouts = [
229-
"cutoutTemplate",
230-
"cutoutScience",
231-
"cutoutDifference",
232-
]
233-
for key in cutouts:
234-
alert_dict.pop(key, None)
235-
236-
# alert may contain NaN values; replace them with None
237-
valid_json_dict = _reformat_nan_in_alert_dict(alert_dict)
202+
cutouts_removed = alert.drop_cutouts() # remove cutouts
203+
valid_json_dict = _reformat_nan_in_alert_dict(
204+
cutouts_removed.dict
205+
) # replace NaN values with None
238206

239-
return pittgoogle.Alert.from_dict(payload=valid_json_dict, attributes=attributes)
207+
return pittgoogle.Alert.from_dict(payload=valid_json_dict, attributes=alert.attributes)
240208

241209

242210
def _reformat_nan_in_alert_dict(alert_dict: Dict[str, Any]) -> Dict[str, Any]:

broker/cloud_run/lsst/ps_to_storage/requirements.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ fastavro
88
google-cloud-logging
99
google-cloud-storage
1010
httpx # used by confluent-kafka
11-
pittgoogle-client>=0.3.11
11+
pittgoogle-client>=0.3.12
1212

1313
# for Cloud Run
1414
# https://cloud.google.com/run/docs/quickstarts/build-and-deploy/deploy-python-service

0 commit comments

Comments
 (0)