-
Notifications
You must be signed in to change notification settings - Fork 0
Ingest Swift/BAT-GUANO Alert Stream #277
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Changes from 17 commits
a6822ed
8470a38
d9e5e1d
415a33e
d768787
3562bce
8fa6c14
d89a760
241e4bc
d965999
dad2587
08e9486
2633278
1f8ec8f
d6fd04a
03652f7
86cdebc
e151221
18072e8
26547a1
48c7ae4
97d18b3
3a2655e
9b746d2
f1d221e
79805a5
face1ab
bf64911
76504c4
ae974ec
43ed80c
1f1f54b
e710bd7
efe7f18
d4fbb19
2c25fd8
98f3ccf
7587e56
3c5a658
4bf33e5
1fca11f
cf3a4c7
968493c
1122672
837f736
81bb094
45651e8
882187c
037ba42
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,21 @@ | ||
| # Use the official lightweight Python image. | ||
| # https://hub.docker.com/_/python | ||
| FROM python:3.12-slim | ||
|
|
||
| # Allow statements and log messages to immediately appear in the Knative logs | ||
| ENV PYTHONUNBUFFERED True | ||
|
|
||
| # Copy local code to the container image. | ||
| ENV APP_HOME /app | ||
| WORKDIR $APP_HOME | ||
| COPY . ./ | ||
|
|
||
| # Install production dependencies. | ||
| RUN pip install --no-cache-dir -r requirements.txt | ||
|
|
||
| # Run the web service on container startup. Here we use the gunicorn | ||
| # webserver, with one worker process and 8 threads. | ||
| # For environments with multiple CPU cores, increase the number of workers | ||
| # to be equal to the cores available. | ||
| # Timeout is set to 0 to disable the timeouts of the workers to allow Cloud Run to handle instance scaling. | ||
| CMD exec gunicorn --bind :$PORT --workers 1 --threads 8 --timeout 0 main:app |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,29 @@ | ||
| # https://cloud.google.com/build/docs/deploying-builds/deploy-cloud-run | ||
| # containerize the module and deploy it to Cloud Run | ||
| steps: | ||
| # Build the image | ||
| - name: 'gcr.io/cloud-builders/docker' | ||
| args: ['build', '-t', '${_REGION}-docker.pkg.dev/${PROJECT_ID}/${_REPOSITORY}/${_MODULE_IMAGE_NAME}', '.'] | ||
| # Push the image to Artifact Registry | ||
| - name: 'gcr.io/cloud-builders/docker' | ||
| args: ['push', '${_REGION}-docker.pkg.dev/${PROJECT_ID}/${_REPOSITORY}/${_MODULE_IMAGE_NAME}'] | ||
| # Deploy image to Cloud Run | ||
| - name: 'gcr.io/google.com/cloudsdktool/cloud-sdk' | ||
| entrypoint: gcloud | ||
| args: ['run', 'deploy', '${_MODULE_NAME}', '--image', '${_REGION}-docker.pkg.dev/${PROJECT_ID}/${_REPOSITORY}/${_MODULE_IMAGE_NAME}', '--region', '${_REGION}', '--set-env-vars', '${_ENV_VARS}'] | ||
| images: | ||
| - '${_REGION}-docker.pkg.dev/${PROJECT_ID}/${_REPOSITORY}/${_MODULE_IMAGE_NAME}' | ||
| substitutions: | ||
| _SURVEY: 'swift' | ||
| _TESTID: 'testid' | ||
| _MODULE_NAME: '${_SURVEY}-alerts-to-storage-${_TESTID}' | ||
| _MODULE_IMAGE_NAME: 'gcr.io/${PROJECT_ID}/${_REPOSITORY}/${_MODULE_NAME}' | ||
| _REPOSITORY: 'cloud-run-services' | ||
| # cloud functions automatically sets the projectid env var using the name "GCP_PROJECT" | ||
| # use the same name here for consistency | ||
| # [TODO] PROJECT_ID is set in setup.sh. this is confusing and we should revisit the decision. | ||
| # i (Raen) think i didn't make it a substitution because i didn't want to set a default for it. | ||
| _ENV_VARS: 'GCP_PROJECT=${PROJECT_ID},SURVEY=${_SURVEY},TESTID=${_TESTID}' | ||
| _REGION: 'us-central1' | ||
| options: | ||
| dynamic_substitutions: true |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,93 @@ | ||
| #! /bin/bash | ||
| # Deploys or deletes broker Cloud Run service | ||
| # This script will not delete Cloud Run services that are in production | ||
|
|
||
| # "False" uses production resources | ||
| # any other string will be appended to the names of all resources | ||
| testid="${1:-test}" | ||
| # "True" tearsdown/deletes resources, else setup | ||
| teardown="${2:-False}" | ||
| # name of the survey this broker instance will ingest | ||
| survey="${3:-swift}" | ||
| region="${4:-us-central1}" | ||
| PROJECT_ID=$GOOGLE_CLOUD_PROJECT # get the environment variable | ||
|
|
||
| MODULE_NAME="alerts-to-storage" # lower case required by cloud run | ||
| ROUTE_RUN="/" # url route that will trigger main.run() | ||
|
|
||
| # function used to define GCP resources; appends testid if needed | ||
| define_GCP_resources() { | ||
| local base_name="$1" | ||
| local testid_suffix="" | ||
|
|
||
| if [ "$testid" != "False" ]; then | ||
| testid_suffix="-${testid}" | ||
| fi | ||
|
|
||
| echo "${base_name}${testid_suffix}" | ||
| } | ||
|
|
||
| #--- GCP resources used in this script | ||
| artifact_registry_repo=$(define_GCP_resources "${survey}-cloud-run-services") | ||
| avro_bucket=$(define_GCP_resources "${PROJECT_ID}-${survey}_alerts") | ||
| avro_topic=$(define_GCP_resources "projects/${PROJECT_ID}/topics/${survey}-alert_avros") | ||
| avro_subscription=$(define_GCP_resources "${survey}-alert_avros-counter") | ||
| cr_module_name=$(define_GCP_resources "${survey}-${MODULE_NAME}") # lower case required by cloud run | ||
| ps_input_subscrip=$(define_GCP_resources "${survey}-alerts_raw") # pub/sub subscription used to trigger cloud run module | ||
| runinvoker_svcact="cloud-run-invoker@${PROJECT_ID}.iam.gserviceaccount.com" | ||
| trigger_topic=$(define_GCP_resources "${survey}-alerts_raw") | ||
|
|
||
| if [ "${teardown}" = "True" ]; then | ||
| # ensure that we do not teardown production resources | ||
| if [ "${testid}" != "False" ]; then | ||
| gsutil rm -r "gs://${avro_bucket}" | ||
| gcloud pubsub topics delete "${avro_topic}" | ||
| gcloud pubsub subscriptions delete "${avro_subscription}" | ||
| gcloud pubsub subscriptions delete "${ps_input_subscrip}" | ||
| gcloud run services delete "${cr_module_name}" --region "${region}" | ||
| fi | ||
| else | ||
| echo | ||
| echo "Creating avro_bucket..." | ||
|
||
| if ! gsutil ls -b "gs://${avro_bucket}" >/dev/null 2>&1; then | ||
| #--- Create the bucket that will store the alerts | ||
| gsutil mb -l "${region}" "gs://${avro_bucket}" | ||
| gsutil uniformbucketlevelaccess set on "gs://${avro_bucket}" | ||
| gsutil requesterpays set on "gs://${avro_bucket}" | ||
| gcloud storage buckets add-iam-policy-binding "gs://${avro_bucket}" \ | ||
| --member="allUsers" \ | ||
| --role="roles/storage.objectViewer" | ||
| else | ||
| echo "${avro_bucket} already exists." | ||
| fi | ||
|
|
||
| #--- Setup the Pub/Sub notifications on the Avro storage bucket | ||
| echo | ||
| echo "Configuring Pub/Sub notifications on GCS bucket..." | ||
| trigger_event=OBJECT_FINALIZE | ||
| format=json # json or none; if json, file metadata sent in message body | ||
| gsutil notification create \ | ||
| -t "$avro_topic" \ | ||
| -e "$trigger_event" \ | ||
| -f "$format" \ | ||
| "gs://${avro_bucket}" | ||
| gcloud pubsub subscriptions create "${avro_subscription}" --topic="${avro_topic}" | ||
|
|
||
| #--- Deploy the Cloud Run service | ||
| echo "Creating container image and deploying to Cloud Run..." | ||
| moduledir="." # assumes deploying what's in our current directory | ||
| config="${moduledir}/cloudbuild.yaml" | ||
| url=$(gcloud builds submit --config="${config}" \ | ||
| --substitutions="_SURVEY=${survey},_TESTID=${testid},_MODULE_NAME=${cr_module_name},_REPOSITORY=${artifact_registry_repo}" \ | ||
| "${moduledir}" | sed -n 's/^Step #2: Service URL: \(.*\)$/\1/p') | ||
|
|
||
| echo "Creating trigger subscription for Cloud Run..." | ||
| # WARNING: This is set to retry failed deliveries. If there is a bug in main.py this will | ||
| # retry indefinitely, until the message is delete manually. | ||
| gcloud pubsub subscriptions create "${ps_input_subscrip}" \ | ||
| --topic "${trigger_topic}" \ | ||
| --topic-project "${PROJECT_ID}" \ | ||
| --ack-deadline=600 \ | ||
| --push-endpoint="${url}${ROUTE_RUN}" \ | ||
| --push-auth-service-account="${runinvoker_svcact}" | ||
| fi | ||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
| @@ -0,0 +1,107 @@ | ||||||
| #!/usr/bin/env python3 | ||||||
| # -*- coding: UTF-8 -*- | ||||||
|
|
||||||
| """This module stores Swift/BAT-GUANO alert data as an Avro file in Cloud Storage.""" | ||||||
|
|
||||||
| import os | ||||||
| import flask | ||||||
| import pittgoogle | ||||||
| from google.cloud import logging, storage | ||||||
| from google.cloud.exceptions import PreconditionFailed | ||||||
|
|
||||||
| # [FIXME] Make this helpful or else delete it. | ||||||
| # Connect the python logger to the google cloud logger. | ||||||
| # By default, this captures INFO level and above. | ||||||
| # pittgoogle uses the python logger. | ||||||
| # We don't currently use the python logger directly in this script, but we could. | ||||||
| logging.Client().setup_logging() | ||||||
|
|
||||||
| PROJECT_ID = os.getenv("GCP_PROJECT") | ||||||
| TESTID = os.getenv("TESTID") | ||||||
| SURVEY = os.getenv("SURVEY") | ||||||
|
|
||||||
| # Variables for incoming data | ||||||
| # A url route is used in setup.sh when the trigger subscription is created. | ||||||
| # It is possible to define multiple routes in a single module and trigger them using different subscriptions. | ||||||
| ROUTE_RUN = "/" # HTTP route that will trigger run(). Must match deploy.sh | ||||||
|
|
||||||
| # Variables for outgoing data | ||||||
| HTTP_204 = 204 # HTTP code: Success | ||||||
| HTTP_400 = 400 # HTTP code: Bad Request | ||||||
|
|
||||||
| # GCP resources used in this module | ||||||
| TOPIC_ALERTS_JSON = pittgoogle.Topic.from_cloud( | ||||||
| "alerts-json", survey=SURVEY, testid=TESTID, projectid=PROJECT_ID | ||||||
| ) | ||||||
|
Comment on lines
34
to
36
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do these alerts come from Swift as json or avro? If json, I think the name of this topic can just be If that seems confusing in comparison with our LSST streams where
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @troyraen the alerts from Swift are JSON serialized. I named the Pub/Sub resource that way because at the time it seemed more appropriate and descriptive, but I agree that having a |
||||||
| bucket_name = f"{PROJECT_ID}-{SURVEY}_alerts" | ||||||
| if TESTID != "False": | ||||||
| bucket_name = f"{bucket_name}-{TESTID}" | ||||||
|
|
||||||
| client = storage.Client() | ||||||
| bucket = client.get_bucket(client.bucket(bucket_name, user_project=PROJECT_ID)) | ||||||
|
|
||||||
| app = flask.Flask(__name__) | ||||||
|
|
||||||
|
|
||||||
| @app.route(ROUTE_RUN, methods=["POST"]) | ||||||
| def run(): | ||||||
|
||||||
| def run(): | |
| def run() -> tuple[str, int]: |
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| # let it raise. the message will be dropped. |
This comment is a holdover from my original code but no longer makes sense because we've now implemented the try/except right here.
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,15 @@ | ||||||||||||||||||||||||
| # As explained here | ||||||||||||||||||||||||
| # https://cloud.google.com/functions/docs/writing/specifying-dependencies-python | ||||||||||||||||||||||||
| # dependencies for a Cloud Function must be specified in a `requirements.txt` | ||||||||||||||||||||||||
| # file (or packaged with the function) in the same directory as `main.py` | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| google-cloud-logging | ||||||||||||||||||||||||
| google-cloud-storage | ||||||||||||||||||||||||
| pittgoogle-client>=0.3.15 | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| # for Cloud Run | ||||||||||||||||||||||||
| # https://cloud.google.com/run/docs/quickstarts/build-and-deploy/deploy-python-service | ||||||||||||||||||||||||
| # pinned following quickstart example. [TODO] consider un-pinning | ||||||||||||||||||||||||
| Flask==3.0.3 | ||||||||||||||||||||||||
| gunicorn==23.0.0 | ||||||||||||||||||||||||
| Werkzeug==3.0.6 | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
| # for Cloud Run | |
| # https://cloud.google.com/run/docs/quickstarts/build-and-deploy/deploy-python-service | |
| # pinned following quickstart example. [TODO] consider un-pinning | |
| Flask==3.0.3 | |
| gunicorn==23.0.0 | |
| Werkzeug==3.0.6 | |
| # for Cloud Run | |
| # https://cloud.google.com/run/docs/quickstarts/build-and-deploy/deploy-python-service | |
| Flask | |
| gunicorn | |
| Werkzeug |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,35 @@ | ||
| # Start the Swift/BAT-GUANO consumer VM | ||
|
|
||
| See `broker/setup_broker/swift/README.md` for setup instructions. | ||
|
|
||
| To start the consumer VM: | ||
|
|
||
| ```bash | ||
| survey="swift" | ||
| testid="mytest" | ||
| consumerVM="${survey}-consumer-${testid}" | ||
| zone="us-central1-a" | ||
|
|
||
| # Set the VM metadata | ||
| KAFKA_TOPIC="enter Kafka topic" | ||
| PS_TOPIC="${survey}-alerts-${testid}" | ||
| gcloud compute instances add-metadata ${consumerVM} --zone=${zone} \ | ||
| --metadata KAFKA_TOPIC=${KAFKA_TOPIC},PS_TOPIC=${PS_TOPIC} | ||
|
|
||
| # Start the VM | ||
| gcloud compute instances start ${consumerVM} --zone ${zone} | ||
| # this launches the startup script which configures and starts the | ||
| # Kafka -> Pub/Sub connector | ||
| ``` | ||
|
|
||
| To stop stop the consumer VM: | ||
|
|
||
| ```bash | ||
| survey="swift" | ||
| testid="mytest" | ||
| consumerVM="${survey}-consumer-${testid}" | ||
| zone="us-central1-a" | ||
|
|
||
| # Stop the VM | ||
| gcloud compute instances stop ${consumerVM} --zone ${zone} | ||
| ``` |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,13 @@ | ||
| # Kafka Admin client configs | ||
| # This file is part of a workflow that creates an authenticated connection to the Kafka broker. | ||
| # In cases where we can connect without authentication (e.g., ZTF), this file is not used. | ||
| # For config options, see https://kafka.apache.org/documentation/#adminclientconfigs | ||
| # For Swift-specific options, see https://gcn.nasa.gov/docs/client#java | ||
|
|
||
| security.protocol=SASL_SSL | ||
| sasl.mechanism=OAUTHBEARER | ||
| sasl.login.callback.handler.class=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginCallbackHandler | ||
| sasl.oauthbearer.token.endpoint.url=https://auth.gcn.nasa.gov/oauth2/token | ||
| sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ | ||
| clientId="CLIENT_ID" \ | ||
| clientSecret="CLIENT_SECRET"; |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,41 @@ | ||
| # Kafka Connect sink connector configs | ||
| # For config options, see https://docs.confluent.io/platform/current/installation/configuration/connect/sink-connect-configs.html | ||
| # For additional Pub/Sub-specific options, see https://github.com/googleapis/java-pubsub-group-kafka-connector?tab=readme-ov-file#sink-connector | ||
| # | ||
| # -------------------------------------------------------------------------- | ||
| # This file is adapted from: | ||
| # https://github.com/googleapis/java-pubsub-group-kafka-connector/blob/main/config/cps-sink-connector.properties | ||
| # The original copyright and license are reproduced below. | ||
| # | ||
| # Copyright 2022 Google LLC | ||
| # | ||
| # Licensed under the Apache License, Version 2.0 (the "License"); | ||
| # you may not use this file except in compliance with the License. | ||
| # You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, software | ||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
| # -------------------------------------------------------------------------- | ||
|
|
||
| # Unique name for the Pub/Sub sink connector. | ||
| name=CPSSinkConnector | ||
| # Tha Java class for the Pub/Sub sink connector. | ||
| connector.class=com.google.pubsub.kafka.sink.CloudPubSubSinkConnector | ||
| # The maximum number of tasks that should be created for this connector. | ||
| tasks.max=1 | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is the alert rate you expect from this stream? I'm guessing that one task is probably fine, but it's worth checking.
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Now I see in the data listings PR, "less than 1 alert per week". So 1 task will be fine. |
||
| # Set the key converter for the Pub/Sub sink connector. | ||
| key.converter=org.apache.kafka.connect.converters.ByteArrayConverter | ||
| # Set the value converter for the Pub/Sub sink connector. | ||
| value.converter=org.apache.kafka.connect.converters.ByteArrayConverter | ||
| # Set kafka the topic | ||
| topics=KAFKA_TOPIC | ||
| # Set the Pub/Sub configs | ||
| cps.project=PROJECT_ID | ||
| cps.topic=PS_TOPIC | ||
| # include Kafka topic, partition, offset, timestamp as msg attributes | ||
| metadata.publish=true | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,35 @@ | ||
| # Kafka Connect worker configuration | ||
| # This file is part of a workflow that creates an authenticated connection to the Kafka broker. | ||
| # For config options, see https://docs.confluent.io/platform/current/connect/references/allconfigs.html#worker-configuration-properties | ||
| # See also: https://kafka.apache.org/documentation/#adminclientconfigs | ||
|
|
||
| bootstrap.servers=kafka.gcn.nasa.gov:9092 | ||
| plugin.path=/usr/local/share/kafka/plugins | ||
| offset.storage.file.filename=/tmp/connect.offsets | ||
| connections.max.idle.ms=5400000 | ||
|
|
||
| # ByteArrayConverter provides a “pass-through” option that does no conversion. | ||
| key.converter=org.apache.kafka.connect.converters.ByteArrayConverter | ||
| value.converter=org.apache.kafka.connect.converters.ByteArrayConverter | ||
|
|
||
| # workers need to use SASL | ||
| sasl.mechanism=OAUTHBEARER | ||
| sasl.kerberos.service.name=kafka | ||
| security.protocol=SASL_SSL | ||
| sasl.login.callback.handler.class=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginCallbackHandler | ||
| sasl.oauthbearer.token.endpoint.url=https://auth.gcn.nasa.gov/oauth2/token | ||
| sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ | ||
| clientId="CLIENT_ID" \ | ||
| clientSecret="CLIENT_SECRET"; | ||
|
|
||
| # settings with `consumer.` prefixes are passed through to the Kafka consumer | ||
| consumer.group.id=GROUP_ID | ||
| consumer.auto.offset.reset=earliest | ||
| consumer.sasl.mechanism=OAUTHBEARER | ||
| consumer.sasl.kerberos.service.name=kafka | ||
| consumer.security.protocol=SASL_SSL | ||
| consumer.sasl.login.callback.handler.class=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginCallbackHandler | ||
| consumer.sasl.oauthbearer.token.endpoint.url=https://auth.gcn.nasa.gov/oauth2/token | ||
| consumer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ | ||
| clientId="CLIENT_ID" \ | ||
| clientSecret="CLIENT_SECRET"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are a bunch of variables in here with names that include "avro" -- I'm sure this is a holdover from the ZTF and LSST modules. For reusability, we should change those names because not all surveys publish avro alerts. For Swift in particular, assuming they publish in json and not avro, having "avro" here is confusing.