Skip to content

Commit

Permalink
changes required to successfully deploy Cloud Run service
Browse files Browse the repository at this point in the history
  • Loading branch information
hernandezc1 committed Mar 10, 2025
1 parent 440264a commit 2429668
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 48 deletions.
2 changes: 1 addition & 1 deletion broker/cloud_run/lsst/classify_snn/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Use the official lightweight Python image.
# https://hub.docker.com/_/python
FROM python:3.12-slim
FROM python:3.11-slim

# Allow statements and log messages to immediately appear in the Knative logs
ENV PYTHONUNBUFFERED True
Expand Down
42 changes: 32 additions & 10 deletions broker/cloud_run/lsst/classify_snn/deploy.sh
100644 → 100755
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#! /bin/bash
# Deploys or deletes broker Cloud Functions
# This script will not delete Cloud Functions that are in production
# Deploys or deletes broker Cloud Run service
# This script will not delete a Cloud Run service that is in production

# "False" uses production resources
# any other string will be appended to the names of all resources
Expand All @@ -10,7 +10,9 @@ teardown="${2:-False}"
# name of the survey this broker instance will ingest
survey="${3:-lsst}"
region="${4:-us-central1}"
PROJECT_ID=$GOOGLE_CLOUD_PROJECT # get the environment variable
# get environment variables
PROJECT_ID=$GOOGLE_CLOUD_PROJECT
PROJECT_NUMBER=$(gcloud projects describe "$PROJECT_ID" --format="value(projectNumber)")

MODULE_NAME="supernnova" # lower case required by cloud run
ROUTE_RUN="/" # url route that will trigger main.run()
Expand All @@ -32,20 +34,19 @@ define_GCP_resources() {
}

#--- GCP resources used in this script
artifact_registry_repo=$(define_GCP_resources "cloud-run-services")
artifact_registry_repo=$(define_GCP_resources "${survey}-cloud-run-services")
deadletter_topic_bigquery_import=$(define_GCP_resources "${survey}-bigquery-import-SuperNNova-deadletter")
deadletter_subscription_bigquery_import="${deadletter_topic_bigquery_import}"
ps_input_subscrip=$(define_GCP_resources "${survey}-alerts") # pub/sub subscription used to trigger cloud run module
# should we keep ps_output_topic?
ps_output_topic="${survey}-SuperNNova" # desc is using this. leave camel case to avoid a breaking change
ps_output_topic=$(define_GCP_resources "${survey}-SuperNNova")
subscription_bigquery_import=$(define_GCP_resources "${survey}-bigquery-import-SuperNNova") # BigQuery subscription
topic_bigquery_import=$(define_GCP_resources "${survey}-bigquery-import-SuperNNova")
trigger_topic=$(define_GCP_resources "${survey}-alerts")
deadletter_topic_bigquery_import=$(define_GCP_resources "${survey}-bigquery-import-SuperNNova-deadletter")
deadletter_subscription_bigquery_import="${deadletter_topic_bigquery_import}"

# additional GCP resources & variables used in this script
bq_dataset=$(define_GCP_resources "${survey}")
supernnova_classifications_table="SuperNNova"
cr_module_name=$(define_GCP_resources "${survey}-${MODULE_NAME}") # lower case required by cloud run
cr_module_name=$(define_GCP_resources "${survey}-${MODULE_NAME}") # lower case required by Cloud Run
runinvoker_svcact="cloud-run-invoker@${PROJECT_ID}.iam.gserviceaccount.com"


Expand All @@ -54,8 +55,10 @@ if [ "${teardown}" = "True" ]; then
if [ "${testid}" != "False" ]; then
gcloud pubsub topics delete "${ps_output_topic}"
gcloud pubsub topics delete "${topic_bigquery_import}"
gcloud pubsub topics delete "${deadletter_topic_bigquery_import}"
gcloud pubsub subscriptions delete "${ps_input_subscrip}"
gcloud pubsub subscriptions delete "${subscription_bigquery_import}"
gcloud pubsub subscriptions delete "${deadletter_subscription_bigquery_import}"
gcloud run services delete "${cr_module_name}" --region "${region}"
fi

Expand All @@ -67,6 +70,15 @@ else # Deploy the Cloud Run service
gcloud pubsub topics create "${topic_bigquery_import}"
gcloud pubsub topics create "${deadletter_topic_bigquery_import}"
gcloud pubsub subscriptions create "${deadletter_subscription_bigquery_import}" --topic="${deadletter_topic_bigquery_import}"
# in order to create BigQuery subscriptions, ensure that the following service account:
# service-<project number>@gcp-sa-pubsub.iam.gserviceaccount.com" has the
# bigquery.dataEditor role for each table
PUBSUB_SERVICE_ACCOUNT="service-${PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com"
roleid="roles/bigquery.dataEditor"
bq add-iam-policy-binding \
--member="serviceAccount:${PUBSUB_SERVICE_ACCOUNT}" \
--role="${roleid}" \
--table=true "${PROJECT_ID}:${bq_dataset}.${supernnova_classifications_table}"
gcloud pubsub subscriptions create "${subscription_bigquery_import}" \
--topic="${topic_bigquery_import}" \
--bigquery-table="${PROJECT_ID}:${bq_dataset}.${supernnova_classifications_table}" \
Expand All @@ -75,11 +87,21 @@ else # Deploy the Cloud Run service
--max-delivery-attempts=5 \
--dead-letter-topic-project="${PROJECT_ID}"

# this allows dead-lettered messages to be forwarded from the BigQuery subscription to the dead letter topic
# and it allows dead-lettered messages to be published to the dead letter topic.
gcloud pubsub topics add-iam-policy-binding "${deadletter_topic_bigquery_import}" \
--member="serviceAccount:$PUBSUB_SERVICE_ACCOUNT"\
--role="roles/pubsub.publisher"
gcloud pubsub subscriptions add-iam-policy-binding "${subscription_bigquery_import}" \
--member="serviceAccount:$PUBSUB_SERVICE_ACCOUNT"\
--role="roles/pubsub.subscriber"

echo "Creating container image and deploying to Cloud Run..."
moduledir="." # assumes deploying what's in our current directory
moduledir="." # deploys what's in our current directory
config="${moduledir}/cloudbuild.yaml"
url=$(gcloud builds submit --config="${config}" \
--substitutions="_SURVEY=${survey},_TESTID=${testid},_MODULE_NAME=${cr_module_name},_REPOSITORY=${artifact_registry_repo}" \
--region="${region}" \
"${moduledir}" | sed -n 's/^Step #2: Service URL: \(.*\)$/\1/p')

# ensure the Cloud Run service has the necessary permisions
Expand Down
23 changes: 7 additions & 16 deletions broker/cloud_run/lsst/classify_snn/main.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
"""Classify an alert using SuperNNova (M¨oller & de Boissi`ere 2019).
"""Classify an alert using SuperNNova (Möller & de Boissière 2019).
This code is intended to be containerized and deployed to Google Cloud Run.
Once deployed, individual alerts in the "trigger" stream will be delivered to the container as HTTP requests.
Expand Down Expand Up @@ -57,7 +57,6 @@
# Variables for outgoing data
HTTP_204 = 204 # HTTP code: Success
HTTP_400 = 400 # HTTP code: Bad Request
SCHEMA_OUT = "elasticc.v0_9_1.brokerClassification" # View the schema: pittgoogle.Schemas.get(SCHEMA_OUT).avsc

# define a binary data structure for packing and unpacking bytes
_ConfluentWireFormatHeader = struct.Struct(">bi")
Expand Down Expand Up @@ -105,9 +104,9 @@ def run():
return str(exc), HTTP_400

# publish
classification_alert = _create_outgoing_alert(alert, classifications)
TOPIC.publish(classification_alert)
TOPIC_BIGQUERY_IMPORT.publish(classification_alert)
classified_alert = _create_outgoing_alert(alert, classifications)
TOPIC.publish(classified_alert)
TOPIC_BIGQUERY_IMPORT.publish(classifications)

return "", HTTP_204

Expand Down Expand Up @@ -184,30 +183,22 @@ def _format_for_classifier(alert: pittgoogle.Alert) -> pd.DataFrame:

def _create_outgoing_alert(alert_in: pittgoogle.Alert, results: dict) -> pittgoogle.Alert:
"""Combine the incoming alert with the classification results to create the outgoing alert."""
# write down the mappings between our classifications and the ELAsTiCC taxonomy
# https://github.com/LSSTDESC/elasticc/blob/main/taxonomy/taxonomy.ipynb
classifications = [
{"classId": 2222, "probability": results["prob_class0"]},
]

# construct a dict that conforms to SCHEMA_OUT
outgoing_dict = {
"alertId": alert_in.alertid,
"diaSourceId": alert_in.sourceid,
# multiply by 1000 to switch microsecond -> millisecond precision for elasticc schema
# multiply by 1000 to switch microsecond -> millisecond precision
"LSSTPublishTimestamp": int(results["LSSTPublishTimestamp"] * 1000),
"brokerIngestTimestamp": results["brokerIngestTimestamp"],
"brokerName": BROKER_NAME,
"brokerVersion": results["brokerVersion"],
"classifierName": CLASSIFIER_NAME,
"classifierParams": str(MODEL_PATH), # record the training file
"classifications": classifications,
"probability": results["prob_class0"],
}

# create the outgoing Alert
alert_out = pittgoogle.Alert.from_dict(
payload=outgoing_dict, attributes=alert_in.attributes, schema_name=SCHEMA_OUT
)
alert_out = pittgoogle.Alert.from_dict(payload=outgoing_dict, attributes=alert_in.attributes)
# add the predicted class to the attributes. may help downstream users filter messages.
alert_out.attributes[MODULE_NAME] = results["predicted_class"]

Expand Down
4 changes: 3 additions & 1 deletion broker/cloud_run/lsst/classify_snn/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
google-cloud-functions
google-cloud-logging
pittgoogle-client>=0.3.1
pittgoogle-client>=0.3.12
confluent-kafka==2.6.0
fastavro

# for Cloud Run
# https://cloud.google.com/run/docs/quickstarts/build-and-deploy/deploy-python-service
Expand Down
22 changes: 2 additions & 20 deletions broker/setup_broker/lsst/setup_broker.sh
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,6 @@ topic_bigquery_import=$(define_GCP_resources "${survey}-bigquery-import")
subscription_bigquery_import=$(define_GCP_resources "${survey}-bigquery-import-${versiontag}") # BigQuery subscription
deadletter_topic_bigquery_import=$(define_GCP_resources "${survey}-bigquery-import-deadletter-${versiontag}")
deadletter_subscription_bigquery_import="${deadletter_topic_bigquery_import}"
topic_bigquery_import_supernnova=$(define_GCP_resources "${survey}-bigquery-import-supernnova")
subscription_bigquery_import_supernnova="${topic_bigquery_import_supernnova}"
deadletter_topic_bigquery_import_supernnova=$(define_GCP_resources "${survey}-bigquery-import-supernnova-deadletter")
deadletter_subscription_bigquery_import_supernnova="${deadletter_topic_bigquery_import_supernnova}"

alerts_table="alerts_${versiontag}"
supernnova_classifications_table="SuperNNova"
Expand Down Expand Up @@ -113,11 +109,8 @@ manage_resources() {
gcloud pubsub topics create "${topic_alerts}"
gcloud pubsub topics create "${topic_bigquery_import}"
gcloud pubsub topics create "${deadletter_topic_bigquery_import}"
gcloud pubsub topics create "${topic_bigquery_import_supernnova}"
gcloud pubsub topics create "${deadletter_topic_bigquery_import_supernnova}"
gcloud pubsub subscriptions create "${subscription_reservoir}" --topic="${topic_alerts}"
gcloud pubsub subscriptions create "${deadletter_subscription_bigquery_import}" --topic="${deadletter_topic_bigquery_import}"
gcloud pubsub subscriptions create "${deadletter_subscription_bigquery_import_supernnova}" --topic="${deadletter_topic_bigquery_import_supernnova}"
gcloud pubsub subscriptions create "${subscription_reservoir}" \
--topic="${topic_alerts}"

# in order to create BigQuery subscriptions, ensure that the following service account:
# service-<project number>@gcp-sa-pubsub.iam.gserviceaccount.com" has the
Expand All @@ -136,13 +129,6 @@ manage_resources() {
--max-delivery-attempts=5 \
--dead-letter-topic-project="${PROJECT_ID}" \
--message-filter='attributes.schema_version = "'"${versiontag}"'"'
gcloud pubsub subscriptions create "${subscription_bigquery_import_supernnova}" \
--topic="${topic_bigquery_import_supernnova}" \
--bigquery-table="${PROJECT_ID}:${bq_dataset}.${supernnova_classifications_table}" \
--use-table-schema \
--dead-letter-topic="${deadletter_topic_bigquery_import_supernnova}" \
--max-delivery-attempts=5 \
--dead-letter-topic-project="${PROJECT_ID}" \

# set IAM policies on resources
if [ "$testid" = "False" ]; then
Expand Down Expand Up @@ -175,13 +161,9 @@ manage_resources() {
gcloud pubsub topics delete "${topic_alerts}"
gcloud pubsub topics delete "${topic_bigquery_import}"
gcloud pubsub topics delete "${deadletter_topic_bigquery_import}"
gcloud pubsub topics delete "${topic_bigquery_import_supernnova}"
gcloud pubsub topics delete "${deadletter_topic_bigquery_import_supernnova}"
gcloud pubsub subscriptions delete "${subscription_reservoir}"
gcloud pubsub subscriptions delete "${deadletter_subscription_bigquery_import}"
gcloud pubsub subscriptions delete "${subscription_bigquery_import}"
gcloud pubsub subscriptions delete "${deadletter_subscription_bigquery_import_supernnova}"
gcloud pubsub subscriptions delete "${subscription_bigquery_import_supernnova}"
gcloud artifacts repositories delete "${artifact_registry_repo}" --location="${region}"
else
echo 'ERROR: No testid supplied.'
Expand Down

0 comments on commit 2429668

Please sign in to comment.