Skip to content

Commit 6ee6eec

Browse files
authored
Merge pull request #299 from mwvgroup/u/ch/lsst/main.py
Update LSST Cloud Run services
2 parents c83eebd + 26a1a80 commit 6ee6eec

File tree

11 files changed

+40
-9
lines changed

11 files changed

+40
-9
lines changed

broker/cloud_run/lsst/classify_snn/deploy.sh

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,4 +93,7 @@ else
9393
--push-auth-service-account="${runinvoker_svcact}" \
9494
--dead-letter-topic="${ps_deadletter_topic}" \
9595
--max-delivery-attempts=5
96+
gcloud pubsub subscriptions add-iam-policy-binding "${ps_input_subscrip}" \
97+
--member="serviceAccount:${service_account}" \
98+
--role="roles/pubsub.subscriber"
9699
fi

broker/cloud_run/lsst/classify_snn/main.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,15 @@ def run():
9999

100100
def _classify(alert_lite: pittgoogle.Alert) -> dict:
101101
"""Classify the alert using SuperNNova."""
102+
103+
# check to see if the alert has a ssObjectId
104+
if alert_lite.attributes.get("ssSource_ssObjectId"):
105+
return {
106+
"prob_class0": None,
107+
"prob_class1": None,
108+
"predicted_class": None,
109+
}
110+
102111
# init
103112
snn_df = _format_for_classifier(alert_lite)
104113
device = "cpu"

broker/cloud_run/lsst/classify_snn/requirements.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ pandas==1.5.1
22
numpy==1.23.3
33
google-cloud-functions
44
google-cloud-logging
5-
pittgoogle-client>=0.3.17
5+
pittgoogle-client>=0.3.18
66

77
# for Cloud Run
88
# https://cloud.google.com/run/docs/quickstarts/build-and-deploy/deploy-python-service

broker/cloud_run/lsst/classify_upsilon/deploy.sh

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,4 +95,7 @@ else
9595
--push-auth-service-account="${runinvoker_svcact}" \
9696
--dead-letter-topic="${ps_deadletter_topic}" \
9797
--max-delivery-attempts=5
98+
gcloud pubsub subscriptions add-iam-policy-binding "${ps_input_subscrip}" \
99+
--member="serviceAccount:${service_account}" \
100+
--role="roles/pubsub.subscriber"
98101
fi

broker/cloud_run/lsst/classify_upsilon/main.py

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -56,16 +56,20 @@ def run() -> tuple[str, int]:
5656
# extract the envelope from the request that triggered the endpoint
5757
# this contains a single Pub/Sub message with the alert to be processed
5858
envelope = flask.request.get_json()
59+
60+
# unpack the alert. raises a `BadRequest` if the envelope does not contain a valid message
5961
try:
6062
alert_lite = pittgoogle.Alert.from_cloud_run(envelope, "default")
6163
except pittgoogle.exceptions.BadRequest as exc:
6264
return str(exc), HTTP_400
6365

64-
alert_lite_df = _create_lite_dataframe(alert_lite.dict["alert_lite"])
65-
upsilon_dict = _classify_with_upsilon(alert_lite_df)
66+
# classify
67+
upsilon_dict = _classify(alert_lite)
6668
has_min_detections_in_any_band = any(
6769
upsilon_dict.get(f"n_data_points_{band}_band") >= 80 for band in SURVEY_BANDS
6870
)
71+
72+
# publish
6973
TOPIC.publish(
7074
pittgoogle.Alert.from_dict(
7175
{"alert_lite": alert_lite.dict["alert_lite"], "upsilon": upsilon_dict},
@@ -92,17 +96,21 @@ def run() -> tuple[str, int]:
9296
return "", HTTP_204
9397

9498

95-
def _classify_with_upsilon(alert_lite_df: pd.DataFrame) -> dict:
99+
def _classify(alert_lite: pittgoogle.Alert) -> dict:
96100
upsilon_dict = {}
101+
alert_lite_df = _create_lite_dataframe(alert_lite.dict["alert_lite"])
102+
is_ssobject = bool(alert_lite.attributes.get("ssSource_ssObjectId"))
103+
97104
for band in SURVEY_BANDS:
98105
# ---Extract data
99106
filter_diaSources = alert_lite_df[alert_lite_df["band"] == band]
100107
flux_gt_zero = filter_diaSources["psfFlux"].to_numpy() > 0
101108
upsilon_dict[f"n_data_points_{band}_band"] = flux_gt_zero.sum().item()
109+
# skip classification if the object has a ssObjectId
102110
# skip band if no detections or too few valid data points.
103111
# to avoid scipy's leastsq error: ("input vector length N=7 must not exceed output length M"), we require
104112
# that flux_gt_zero.sum() > 7
105-
if filter_diaSources.empty or flux_gt_zero.sum() <= 7:
113+
if is_ssobject or (filter_diaSources.empty or flux_gt_zero.sum() <= 7):
106114
upsilon_dict[f"{band}_label"] = None
107115
upsilon_dict[f"{band}_probability"] = None
108116
upsilon_dict[f"{band}_flag"] = None

broker/cloud_run/lsst/classify_upsilon/requirements.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
# file (or packaged with the function) in the same directory as `main.py`
55

66
google-cloud-logging
7-
pittgoogle-client>=0.3.17
7+
pittgoogle-client>=0.3.18
88
upsilon
99
# required by upsilon
1010
# https://github.com/dwkim78/upsilon?tab=readme-ov-file#1-dependency

broker/cloud_run/lsst/ps_to_storage/deploy.sh

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ survey="${3:-lsst}"
1212
region="${4:-us-central1}"
1313
# get the environment variable
1414
PROJECT_ID=$GOOGLE_CLOUD_PROJECT
15+
PROJECT_NUMBER=$(gcloud projects describe "$PROJECT_ID" --format="value(projectNumber)")
1516

1617
MODULE_NAME="alerts-to-storage" # lower case required by cloud run
1718
ROUTE_RUN="/" # url route that will trigger main.run()
@@ -37,7 +38,7 @@ ps_subscription_avro=$(define_GCP_resources "${survey}-alert_avros-counter")
3738
ps_topic_avro=$(define_GCP_resources "projects/${PROJECT_ID}/topics/${survey}-alert_avros")
3839
ps_trigger_topic=$(define_GCP_resources "${survey}-alerts_raw")
3940
runinvoker_svcact="cloud-run-invoker@${PROJECT_ID}.iam.gserviceaccount.com"
40-
41+
service_account="service-${PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com"
4142
if [ "${teardown}" = "True" ]; then
4243
# ensure that we do not teardown production resources
4344
if [ "${testid}" != "False" ]; then
@@ -92,4 +93,7 @@ else
9293
--push-auth-service-account="${runinvoker_svcact}" \
9394
--dead-letter-topic="${ps_deadletter_topic}" \
9495
--max-delivery-attempts=5
96+
gcloud pubsub subscriptions add-iam-policy-binding "${ps_input_subscrip}" \
97+
--member="serviceAccount:${service_account}" \
98+
--role="roles/pubsub.subscriber"
9599
fi

broker/cloud_run/lsst/ps_to_storage/main.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,5 +106,6 @@ def _create_file_metadata(alert: pittgoogle.Alert, event_id: str) -> dict:
106106
metadata["_".join(alert.get_key("sourceid"))] = alert.sourceid
107107
metadata["_".join(alert.get_key("ra"))] = alert.ra
108108
metadata["_".join(alert.get_key("dec"))] = alert.dec
109+
metadata["kafka.timestamp"] = alert.attributes["kafka.timestamp"]
109110

110111
return metadata

broker/cloud_run/lsst/ps_to_storage/requirements.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66
google-cloud-logging
77
google-cloud-storage
8-
pittgoogle-client>=0.3.17
8+
pittgoogle-client>=0.3.18
99

1010
# for Cloud Run
1111
# https://cloud.google.com/run/docs/quickstarts/build-and-deploy/deploy-python-service

broker/cloud_run/lsst/variability/deploy.sh

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,4 +92,7 @@ else
9292
--push-auth-service-account="${runinvoker_svcact}" \
9393
--dead-letter-topic="${ps_deadletter_topic}" \
9494
--max-delivery-attempts=5
95+
gcloud pubsub subscriptions add-iam-policy-binding "${ps_input_subscrip}" \
96+
--member="serviceAccount:${service_account}" \
97+
--role="roles/pubsub.subscriber"
9598
fi

0 commit comments

Comments
 (0)