-
Notifications
You must be signed in to change notification settings - Fork 0
Ingest Swift/BAT-GUANO Alert Stream #277
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Changes from 21 commits
a6822ed
8470a38
d9e5e1d
415a33e
d768787
3562bce
8fa6c14
d89a760
241e4bc
d965999
dad2587
08e9486
2633278
1f8ec8f
d6fd04a
03652f7
86cdebc
e151221
18072e8
26547a1
48c7ae4
97d18b3
3a2655e
9b746d2
f1d221e
79805a5
face1ab
bf64911
76504c4
ae974ec
43ed80c
1f1f54b
e710bd7
efe7f18
d4fbb19
2c25fd8
98f3ccf
7587e56
3c5a658
4bf33e5
1fca11f
cf3a4c7
968493c
1122672
837f736
81bb094
45651e8
882187c
037ba42
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,21 @@ | ||
| # Use the official lightweight Python image. | ||
| # https://hub.docker.com/_/python | ||
| FROM python:3.12-slim | ||
|
|
||
| # Allow statements and log messages to immediately appear in the Knative logs | ||
| ENV PYTHONUNBUFFERED True | ||
|
|
||
| # Copy local code to the container image. | ||
| ENV APP_HOME /app | ||
| WORKDIR $APP_HOME | ||
| COPY . ./ | ||
|
|
||
| # Install production dependencies. | ||
| RUN pip install --no-cache-dir -r requirements.txt | ||
|
|
||
| # Run the web service on container startup. Here we use the gunicorn | ||
| # webserver, with one worker process and 8 threads. | ||
| # For environments with multiple CPU cores, increase the number of workers | ||
| # to be equal to the cores available. | ||
| # Timeout is set to 0 to disable the timeouts of the workers to allow Cloud Run to handle instance scaling. | ||
| CMD exec gunicorn --bind :$PORT --workers 1 --threads 8 --timeout 0 main:app |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,29 @@ | ||
| # https://cloud.google.com/build/docs/deploying-builds/deploy-cloud-run | ||
| # containerize the module and deploy it to Cloud Run | ||
| steps: | ||
| # Build the image | ||
| - name: 'gcr.io/cloud-builders/docker' | ||
| args: ['build', '-t', '${_REGION}-docker.pkg.dev/${PROJECT_ID}/${_REPOSITORY}/${_MODULE_IMAGE_NAME}', '.'] | ||
| # Push the image to Artifact Registry | ||
| - name: 'gcr.io/cloud-builders/docker' | ||
| args: ['push', '${_REGION}-docker.pkg.dev/${PROJECT_ID}/${_REPOSITORY}/${_MODULE_IMAGE_NAME}'] | ||
| # Deploy image to Cloud Run | ||
| - name: 'gcr.io/google.com/cloudsdktool/cloud-sdk' | ||
| entrypoint: gcloud | ||
| args: ['run', 'deploy', '${_MODULE_NAME}', '--image', '${_REGION}-docker.pkg.dev/${PROJECT_ID}/${_REPOSITORY}/${_MODULE_IMAGE_NAME}', '--region', '${_REGION}', '--set-env-vars', '${_ENV_VARS}'] | ||
| images: | ||
| - '${_REGION}-docker.pkg.dev/${PROJECT_ID}/${_REPOSITORY}/${_MODULE_IMAGE_NAME}' | ||
| substitutions: | ||
| _SURVEY: 'swift' | ||
| _TESTID: 'testid' | ||
| _MODULE_NAME: '${_SURVEY}-alerts-to-storage-${_TESTID}' | ||
| _MODULE_IMAGE_NAME: 'gcr.io/${PROJECT_ID}/${_REPOSITORY}/${_MODULE_NAME}' | ||
| _REPOSITORY: 'cloud-run-services' | ||
| # cloud functions automatically sets the projectid env var using the name "GCP_PROJECT" | ||
| # use the same name here for consistency | ||
| # [TODO] PROJECT_ID is set in setup.sh. this is confusing and we should revisit the decision. | ||
| # i (Raen) think i didn't make it a substitution because i didn't want to set a default for it. | ||
| _ENV_VARS: 'GCP_PROJECT=${PROJECT_ID},SURVEY=${_SURVEY},TESTID=${_TESTID}' | ||
| _REGION: 'us-central1' | ||
| options: | ||
| dynamic_substitutions: true |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,95 @@ | ||
| #! /bin/bash | ||
| # Deploys or deletes broker Cloud Run service | ||
| # This script will not delete Cloud Run services that are in production | ||
|
|
||
| # "False" uses production resources | ||
| # any other string will be appended to the names of all resources | ||
| testid="${1:-test}" | ||
| # "True" tearsdown/deletes resources, else setup | ||
| teardown="${2:-False}" | ||
| # name of the survey this broker instance will ingest | ||
| survey="${3:-swift}" | ||
| region="${4:-us-central1}" | ||
| # get the environment variable | ||
| PROJECT_ID=$GOOGLE_CLOUD_PROJECT | ||
|
|
||
| MODULE_NAME="alerts-to-storage" # lower case required by cloud run | ||
| ROUTE_RUN="/" # url route that will trigger main.run() | ||
|
|
||
| define_GCP_resources() { | ||
| local base_name="$1" | ||
| local testid_suffix="" | ||
|
|
||
| if [ "$testid" != "False" ]; then | ||
| testid_suffix="-${testid}" | ||
| fi | ||
| echo "${base_name}${testid_suffix}" | ||
| } | ||
|
|
||
| #--- GCP resources used in this script | ||
| artifact_registry_repo=$(define_GCP_resources "${survey}-cloud-run-services") | ||
| cr_module_name=$(define_GCP_resources "${survey}-${MODULE_NAME}") # lower case required by cloud run | ||
| gcs_avro_bucket=$(define_GCP_resources "${PROJECT_ID}-${survey}_alerts") | ||
| ps_input_subscrip=$(define_GCP_resources "${survey}-alerts_raw") # pub/sub subscription used to trigger cloud run module | ||
| ps_subscription_avro=$(define_GCP_resources "${survey}-alert_avros-counter") | ||
|
||
| ps_topic_avro=$(define_GCP_resources "projects/${PROJECT_ID}/topics/${survey}-alert_avros") | ||
|
||
| ps_trigger_topic=$(define_GCP_resources "${survey}-alerts_raw") | ||
| runinvoker_svcact="cloud-run-invoker@${PROJECT_ID}.iam.gserviceaccount.com" | ||
|
|
||
| if [ "${teardown}" = "True" ]; then | ||
| # ensure that we do not teardown production resources | ||
| if [ "${testid}" != "False" ]; then | ||
| echo | ||
| echo "Deleting resources for ${MODULE_NAME} module..." | ||
| gsutil rm -r "gs://${gcs_avro_bucket}" | ||
| gcloud pubsub topics delete "${ps_topic_avro}" | ||
| gcloud pubsub subscriptions delete "${ps_subscription_avro}" | ||
| gcloud pubsub subscriptions delete "${ps_input_subscrip}" | ||
| gcloud run services delete "${cr_module_name}" --region "${region}" | ||
| fi | ||
| else | ||
| echo | ||
| echo "Creating avro_bucket..." | ||
|
||
| if ! gsutil ls -b "gs://${gcs_avro_bucket}" >/dev/null 2>&1; then | ||
| #--- Create the bucket that will store the alerts | ||
| gsutil mb -l "${region}" "gs://${gcs_avro_bucket}" | ||
| gsutil uniformbucketlevelaccess set on "gs://${gcs_avro_bucket}" | ||
| gsutil requesterpays set on "gs://${gcs_avro_bucket}" | ||
| gcloud storage buckets add-iam-policy-binding "gs://${gcs_avro_bucket}" \ | ||
| --member="allUsers" \ | ||
| --role="roles/storage.objectViewer" | ||
|
||
| else | ||
| echo "${gcs_avro_bucket} already exists." | ||
| fi | ||
|
|
||
| #--- Setup the Pub/Sub notifications on the Avro storage bucket | ||
| echo | ||
| echo "Configuring Pub/Sub notifications on GCS bucket..." | ||
| trigger_event=OBJECT_FINALIZE | ||
| format=json # json or none; if json, file metadata sent in message body | ||
| gsutil notification create \ | ||
| -t "$ps_topic_avro" \ | ||
| -e "$trigger_event" \ | ||
| -f "$format" \ | ||
| "gs://${gcs_avro_bucket}" | ||
| gcloud pubsub subscriptions create "${ps_subscription_avro}" --topic="${ps_topic_avro}" | ||
|
|
||
| #--- Deploy the Cloud Run service | ||
| echo | ||
| echo "Creating container image for ${MODULE_NAME} module and deploying to Cloud Run..." | ||
| moduledir="." # assumes deploying what's in our current directory | ||
| config="${moduledir}/cloudbuild.yaml" | ||
| url=$(gcloud builds submit --config="${config}" \ | ||
| --substitutions="_SURVEY=${survey},_TESTID=${testid},_MODULE_NAME=${cr_module_name},_REPOSITORY=${artifact_registry_repo}" \ | ||
| "${moduledir}" | sed -n 's/^Step #2: Service URL: \(.*\)$/\1/p') | ||
| echo | ||
| echo "Creating trigger subscription for ${MODULE_NAME} Cloud Run service..." | ||
| # WARNING: This is set to retry failed deliveries. If there is a bug in main.py this will | ||
| # retry indefinitely, until the message is delete manually. | ||
| gcloud pubsub subscriptions create "${ps_input_subscrip}" \ | ||
| --topic "${ps_trigger_topic}" \ | ||
| --topic-project "${PROJECT_ID}" \ | ||
| --ack-deadline=600 \ | ||
| --push-endpoint="${url}${ROUTE_RUN}" \ | ||
| --push-auth-service-account="${runinvoker_svcact}" | ||
|
||
| fi | ||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
| @@ -0,0 +1,107 @@ | ||||||
| #!/usr/bin/env python3 | ||||||
| # -*- coding: UTF-8 -*- | ||||||
|
|
||||||
| """This module stores Swift/BAT-GUANO alert data as a JSON file in Cloud Storage.""" | ||||||
|
|
||||||
| import os | ||||||
| import flask | ||||||
| import pittgoogle | ||||||
| from google.cloud import logging, storage | ||||||
| from google.cloud.exceptions import PreconditionFailed | ||||||
|
|
||||||
| # [FIXME] Make this helpful or else delete it. | ||||||
| # Connect the python logger to the google cloud logger. | ||||||
| # By default, this captures INFO level and above. | ||||||
| # pittgoogle uses the python logger. | ||||||
| # We don't currently use the python logger directly in this script, but we could. | ||||||
| logging.Client().setup_logging() | ||||||
|
|
||||||
| PROJECT_ID = os.getenv("GCP_PROJECT") | ||||||
| TESTID = os.getenv("TESTID") | ||||||
| SURVEY = os.getenv("SURVEY") | ||||||
|
|
||||||
| # Variables for incoming data | ||||||
| # A url route is used in setup.sh when the trigger subscription is created. | ||||||
| # It is possible to define multiple routes in a single module and trigger them using different subscriptions. | ||||||
| ROUTE_RUN = "/" # HTTP route that will trigger run(). Must match deploy.sh | ||||||
|
|
||||||
| # Variables for outgoing data | ||||||
| HTTP_204 = 204 # HTTP code: Success | ||||||
| HTTP_400 = 400 # HTTP code: Bad Request | ||||||
|
|
||||||
| # GCP resources used in this module | ||||||
| TOPIC_ALERTS_JSON = pittgoogle.Topic.from_cloud( | ||||||
| "alerts-json", survey=SURVEY, testid=TESTID, projectid=PROJECT_ID | ||||||
| ) | ||||||
|
Comment on lines
34
to
36
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do these alerts come from Swift as json or avro? If json, I think the name of this topic can just be If that seems confusing in comparison with our LSST streams where
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @troyraen the alerts from Swift are JSON serialized. I named the Pub/Sub resource that way because at the time it seemed more appropriate and descriptive, but I agree that having a |
||||||
| bucket_name = f"{PROJECT_ID}-{SURVEY}_alerts" | ||||||
| if TESTID != "False": | ||||||
| bucket_name = f"{bucket_name}-{TESTID}" | ||||||
|
|
||||||
| client = storage.Client() | ||||||
| bucket = client.get_bucket(client.bucket(bucket_name, user_project=PROJECT_ID)) | ||||||
|
|
||||||
| app = flask.Flask(__name__) | ||||||
|
|
||||||
|
|
||||||
| @app.route(ROUTE_RUN, methods=["POST"]) | ||||||
| def run(): | ||||||
|
||||||
| def run(): | |
| def run() -> tuple[str, int]: |
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| # let it raise. the message will be dropped. |
This comment is a holdover from my original code but no longer makes sense because we've now implemented the try/except right here.
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's figure out how to add the schema version. Otherwise it will be difficult to do things like figure out how the bucket organization maps to the dataset/table organization. The pipeline must know the version for things like naming the bigquery table, so perhaps the easiest solution is to add it as an env var to this module.
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,15 @@ | ||||||||||||||||||||||||
| # As explained here | ||||||||||||||||||||||||
| # https://cloud.google.com/functions/docs/writing/specifying-dependencies-python | ||||||||||||||||||||||||
| # dependencies for a Cloud Function must be specified in a `requirements.txt` | ||||||||||||||||||||||||
| # file (or packaged with the function) in the same directory as `main.py` | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| google-cloud-logging | ||||||||||||||||||||||||
| google-cloud-storage | ||||||||||||||||||||||||
| pittgoogle-client>=0.3.15 | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| # for Cloud Run | ||||||||||||||||||||||||
| # https://cloud.google.com/run/docs/quickstarts/build-and-deploy/deploy-python-service | ||||||||||||||||||||||||
| # pinned following quickstart example. [TODO] consider un-pinning | ||||||||||||||||||||||||
| Flask==3.0.3 | ||||||||||||||||||||||||
| gunicorn==23.0.0 | ||||||||||||||||||||||||
| Werkzeug==3.0.6 | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
| # for Cloud Run | |
| # https://cloud.google.com/run/docs/quickstarts/build-and-deploy/deploy-python-service | |
| # pinned following quickstart example. [TODO] consider un-pinning | |
| Flask==3.0.3 | |
| gunicorn==23.0.0 | |
| Werkzeug==3.0.6 | |
| # for Cloud Run | |
| # https://cloud.google.com/run/docs/quickstarts/build-and-deploy/deploy-python-service | |
| Flask | |
| gunicorn | |
| Werkzeug |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2,15 +2,17 @@ | |
| # Creates or deletes the GCP VM instances needed by the broker. | ||
| # This script will not delete VMs that are in production | ||
|
|
||
|
|
||
| broker_bucket=$1 # name of GCS bucket where broker files are staged | ||
| # name of GCS bucket where broker files are staged | ||
| gcs_broker_bucket=$1 | ||
| # "False" uses production resources | ||
| # any other string will be appended to the names of all resources | ||
| testid="${2:-test}" | ||
| # "False" uses production resources | ||
| # any other string will be appended to the names of all resources | ||
| teardown="${3:-False}" # "True" tearsdown/deletes resources, else setup | ||
| survey="${4:-swift}" | ||
| # "True" tearsdown/deletes resources, else setup | ||
| teardown="${3:-False}" | ||
| # name of the survey this broker instance will ingest | ||
| survey="${4:-swift}" | ||
| zone="${5:-us-central1-a}" | ||
| project_id="${6:-PROJECT_ID}" | ||
|
||
|
|
||
| #--- GCP resources used in this script | ||
| consumerVM="${survey}-consumer" | ||
|
|
@@ -25,19 +27,22 @@ if [ "$teardown" = "True" ]; then | |
| if [ "$testid" != "False" ]; then | ||
| gcloud compute instances delete "$consumerVM" --zone="$zone" | ||
| fi | ||
|
|
||
| #--- Create resources | ||
| #--- Setup resources if they do not exist | ||
| else | ||
| #--- Consumer VM | ||
| # create VM | ||
| machinetype=e2-custom-1-5632 | ||
| # metadata | ||
| googlelogging="google-logging-enabled=true" | ||
| startupscript="startup-script-url=gs://${broker_bucket}/${survey}/vm_install.sh" | ||
| shutdownscript="shutdown-script-url=gs://${broker_bucket}/${survey}/vm_shutdown.sh" | ||
| gcloud compute instances create "$consumerVM" \ | ||
| --zone="$zone" \ | ||
| --machine-type="$machinetype" \ | ||
| --scopes=cloud-platform \ | ||
| --metadata="${googlelogging},${startupscript},${shutdownscript}" | ||
| if ! gcloud compute instances describe "${consumerVM}" --zone="${zone}" --project="${project_id}" >/dev/null 2>&1; then | ||
| machinetype=e2-custom-1-5632 | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do the installs actually succeed with a machine type this small? I recall needing a bigger machine for the set up and then making the machine type smaller for normal operations.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
It worked when I tested it! |
||
| # metadata | ||
| googlelogging="google-logging-enabled=true" | ||
| startupscript="startup-script-url=gs://${gcs_broker_bucket}/${survey}/vm_install.sh" | ||
| shutdownscript="shutdown-script-url=gs://${gcs_broker_bucket}/${survey}/vm_shutdown.sh" | ||
| #--- Create VM | ||
| gcloud compute instances create "$consumerVM" \ | ||
| --zone="$zone" \ | ||
| --machine-type="$machinetype" \ | ||
| --scopes=cloud-platform \ | ||
| --metadata="${googlelogging},${startupscript},${shutdownscript}" | ||
| else | ||
| echo | ||
| echo "VM instance ${consumerVM} already exists in zone ${zone}." | ||
| fi | ||
| fi | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are a bunch of variables in here with names that include "avro" -- I'm sure this is a holdover from the ZTF and LSST modules. For reusability, we should change those names because not all surveys publish avro alerts. For Swift in particular, assuming they publish in json and not avro, having "avro" here is confusing.