Skip to content

Commit 44f5c76

Browse files
committed
add the necessary functions to create outgoing lite alerts
1 parent 4ca5dc5 commit 44f5c76

File tree

1 file changed

+80
-7
lines changed
  • broker/cloud_run/lsst/ps_to_storage

1 file changed

+80
-7
lines changed

broker/cloud_run/lsst/ps_to_storage/main.py

Lines changed: 80 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
11
#!/usr/bin/env python3
22
# -*- coding: UTF-8 -*-
33

4-
"""This module stores LSST alert data as an Avro file in Cloud Storage."""
4+
"""This module stores LSST alert data as an Avro file in Cloud Storage and publishes it to various Pub/Sub topics."""
55

66
import os
77
import flask
88
import pittgoogle
99
from google.cloud import logging, storage
1010
from google.cloud.exceptions import PreconditionFailed
11+
from typing import Any
1112

1213
# [FIXME] Make this helpful or else delete it.
1314
# Connect the python logger to the google cloud logger.
@@ -28,6 +29,51 @@
2829
# Variables for outgoing data
2930
HTTP_204 = 204 # HTTP code: Success
3031
HTTP_400 = 400 # HTTP code: Bad Request
32+
LITE_FIELDS_CONFIG = {
33+
"diaSource": {
34+
"fields": {
35+
"diaSourceId",
36+
"midpointMjdTai",
37+
"ra",
38+
"raErr",
39+
"dec",
40+
"decErr",
41+
"psfFlux",
42+
"psfFluxErr",
43+
"band",
44+
},
45+
"is_list": False,
46+
},
47+
"prvDiaSources": {
48+
"fields": {
49+
"diaSourceId",
50+
"midpointMjdTai",
51+
"ra",
52+
"raErr",
53+
"dec",
54+
"decErr",
55+
"psfFlux",
56+
"psfFluxErr",
57+
"band",
58+
},
59+
"is_list": True,
60+
},
61+
"diaObject": {
62+
"fields": {
63+
"diaObjectId",
64+
"lastDiaSourceMjdTai",
65+
"firstDiaSourceMjdTai",
66+
"nDiaSources",
67+
"u_psfFluxErrMean",
68+
"g_psfFluxErrMean",
69+
"r_psfFluxErrMean",
70+
"i_psfFluxErrMean",
71+
"z_psfFluxErrMean",
72+
"y_psfFluxErrMean",
73+
},
74+
"is_list": False,
75+
},
76+
}
3177

3278
# GCP resources used in this module
3379
TOPIC_ALERTS = pittgoogle.Topic.from_cloud(
@@ -88,12 +134,8 @@ def run():
88134
TOPIC_ALERTS.publish(alert)
89135
# publish the same alert as JSON. Data will be coerced to valid JSON by pittgoogle.
90136
TOPIC_ALERTS_JSON.publish(alert, serializer="json")
91-
# add top-level key for lite stream
92-
alert_lite = pittgoogle.Alert.from_dict(
93-
payload={"alert_lite": alert.dict},
94-
attributes={**alert.attributes},
95-
)
96-
TOPIC_LITE.publish(alert_lite, serializer="json")
137+
# publish a lite version of the alert as JSON
138+
TOPIC_LITE.publish(_create_lite_alert(alert), serializer="json")
97139

98140
return "", HTTP_204
99141

@@ -109,3 +151,34 @@ def _create_file_metadata(alert: pittgoogle.Alert, event_id: str) -> dict:
109151
metadata["kafka.timestamp"] = alert.attributes["kafka.timestamp"]
110152

111153
return metadata
154+
155+
156+
def _create_lite_alert(alert: pittgoogle.Alert) -> pittgoogle.Alert:
157+
"""Creates a lite Alert object by filtering nested fields from the original Alert dictionary."""
158+
159+
alert_lite_dict = alert.drop_cutouts()
160+
for key, config in LITE_FIELDS_CONFIG.items():
161+
if key in alert_lite_dict:
162+
# replace the original nested object with its filtered version
163+
alert_lite_dict[key] = _process_field(alert_lite_dict.get(key), config)
164+
165+
return pittgoogle.Alert.from_dict(
166+
payload={"alert_lite": alert_lite_dict},
167+
attributes={**alert.attributes},
168+
)
169+
170+
171+
def _process_field(original_value: Any, config: dict) -> Any:
172+
"""Filters a dictionary or a list of dictionaries based on the provided configuration."""
173+
whitelisted_fields = config["fields"]
174+
175+
if config["is_list"]:
176+
return [_filter_dict(item, whitelisted_fields) for item in original_value or []]
177+
else:
178+
return _filter_dict(original_value, whitelisted_fields)
179+
180+
181+
def _filter_dict(alert_dict: dict, whitelisted_fields: set) -> dict:
182+
"""Creates a new dictionary containing only the keys specified in whitelisted_fields."""
183+
184+
return {k: v for k, v in (alert_dict or {}).items() if k in whitelisted_fields}

0 commit comments

Comments
 (0)