-
Notifications
You must be signed in to change notification settings - Fork 0
Ingest Swift/BAT-GUANO Alert Stream #277
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Changes from all commits
a6822ed
8470a38
d9e5e1d
415a33e
d768787
3562bce
8fa6c14
d89a760
241e4bc
d965999
dad2587
08e9486
2633278
1f8ec8f
d6fd04a
03652f7
86cdebc
e151221
18072e8
26547a1
48c7ae4
97d18b3
3a2655e
9b746d2
f1d221e
79805a5
face1ab
bf64911
76504c4
ae974ec
43ed80c
1f1f54b
e710bd7
efe7f18
d4fbb19
2c25fd8
98f3ccf
7587e56
3c5a658
4bf33e5
1fca11f
cf3a4c7
968493c
1122672
837f736
81bb094
45651e8
882187c
037ba42
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,21 @@ | ||
| # Use the official lightweight Python image. | ||
| # https://hub.docker.com/_/python | ||
| FROM python:3.12-slim | ||
|
|
||
| # Allow statements and log messages to immediately appear in the Knative logs | ||
| ENV PYTHONUNBUFFERED True | ||
|
|
||
| # Copy local code to the container image. | ||
| ENV APP_HOME /app | ||
| WORKDIR $APP_HOME | ||
| COPY . ./ | ||
|
|
||
| # Install production dependencies. | ||
| RUN pip install --no-cache-dir -r requirements.txt | ||
|
|
||
| # Run the web service on container startup. Here we use the gunicorn | ||
| # webserver, with one worker process and 8 threads. | ||
| # For environments with multiple CPU cores, increase the number of workers | ||
| # to be equal to the cores available. | ||
| # Timeout is set to 0 to disable the timeouts of the workers to allow Cloud Run to handle instance scaling. | ||
| CMD exec gunicorn --bind :$PORT --workers 1 --threads 8 --timeout 0 main:app |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,29 @@ | ||
| # https://cloud.google.com/build/docs/deploying-builds/deploy-cloud-run | ||
| # containerize the module and deploy it to Cloud Run | ||
| steps: | ||
| # Build the image | ||
| - name: 'gcr.io/cloud-builders/docker' | ||
| args: ['build', '-t', '${_REGION}-docker.pkg.dev/${PROJECT_ID}/${_REPOSITORY}/${_MODULE_IMAGE_NAME}', '.'] | ||
| # Push the image to Artifact Registry | ||
| - name: 'gcr.io/cloud-builders/docker' | ||
| args: ['push', '${_REGION}-docker.pkg.dev/${PROJECT_ID}/${_REPOSITORY}/${_MODULE_IMAGE_NAME}'] | ||
| # Deploy image to Cloud Run | ||
| - name: 'gcr.io/google.com/cloudsdktool/cloud-sdk' | ||
| entrypoint: gcloud | ||
| args: ['run', 'deploy', '${_MODULE_NAME}', '--image', '${_REGION}-docker.pkg.dev/${PROJECT_ID}/${_REPOSITORY}/${_MODULE_IMAGE_NAME}', '--region', '${_REGION}', '--set-env-vars', '${_ENV_VARS}'] | ||
| images: | ||
| - '${_REGION}-docker.pkg.dev/${PROJECT_ID}/${_REPOSITORY}/${_MODULE_IMAGE_NAME}' | ||
| substitutions: | ||
| _SURVEY: 'swift' | ||
| _TESTID: 'testid' | ||
| _MODULE_NAME: '${_SURVEY}-alerts-to-storage-${_TESTID}' | ||
| _MODULE_IMAGE_NAME: 'gcr.io/${PROJECT_ID}/${_REPOSITORY}/${_MODULE_NAME}' | ||
| _REPOSITORY: 'cloud-run-services' | ||
| # cloud functions automatically sets the projectid env var using the name "GCP_PROJECT" | ||
| # use the same name here for consistency | ||
| # [TODO] PROJECT_ID is set in setup.sh. this is confusing and we should revisit the decision. | ||
| # i (Raen) think i didn't make it a substitution because i didn't want to set a default for it. | ||
| _ENV_VARS: 'GCP_PROJECT=${PROJECT_ID},SURVEY=${_SURVEY},TESTID=${_TESTID},VERSIONTAG=${_VERSIONTAG}' | ||
| _REGION: 'us-central1' | ||
| options: | ||
| dynamic_substitutions: true |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,105 @@ | ||
| #! /bin/bash | ||
| # Deploys or deletes broker Cloud Run service | ||
| # This script will not delete Cloud Run services that are in production | ||
|
|
||
| # "False" uses production resources | ||
| # any other string will be appended to the names of all resources | ||
| testid="${1:-test}" | ||
| # "True" tearsdown/deletes resources, else setup | ||
| teardown="${2:-False}" | ||
| # name of the survey this broker instance will ingest | ||
| survey="${3:-swift}" | ||
| region="${4:-us-central1}" | ||
| versiontag="${5:-v4_5_0}" | ||
| # get the environment variable | ||
| PROJECT_ID=$GOOGLE_CLOUD_PROJECT | ||
|
|
||
| MODULE_NAME="alerts-to-storage" # lower case required by cloud run | ||
| ROUTE_RUN="/" # url route that will trigger main.run() | ||
|
|
||
| define_GCP_resources() { | ||
| local base_name="$1" | ||
| local separator="${2:--}" | ||
| local testid_suffix="" | ||
|
|
||
| if [ "$testid" != "False" ] && [ -n "$testid" ]; then | ||
| testid_suffix="${separator}${testid}" | ||
| fi | ||
| echo "${base_name}${testid_suffix}" | ||
| } | ||
|
|
||
| #--- GCP resources used in this script | ||
| artifact_registry_repo=$(define_GCP_resources "${survey}-cloud-run-services") | ||
| cr_module_name=$(define_GCP_resources "${survey}-${MODULE_NAME}") # lower case required by cloud run | ||
| gcs_alerts_bucket=$(define_GCP_resources "${PROJECT_ID}-${survey}_alerts") | ||
| ps_deadletter_topic=$(define_GCP_resources "${survey}-deadletter") | ||
| ps_input_subscrip=$(define_GCP_resources "${survey}-alerts_raw") # pub/sub subscription used to trigger cloud run module | ||
| ps_topic_alerts_in_bucket=$(define_GCP_resources "projects/${PROJECT_ID}/topics/${survey}-alerts_in_bucket") | ||
| ps_trigger_topic=$(define_GCP_resources "${survey}-alerts_raw") | ||
| runinvoker_svcact="cloud-run-invoker@${PROJECT_ID}.iam.gserviceaccount.com" | ||
|
|
||
| if [ "${teardown}" = "True" ]; then | ||
| # ensure that we do not teardown production resources | ||
| if [ "${testid}" != "False" ]; then | ||
| echo | ||
| echo "Deleting resources for ${MODULE_NAME} module..." | ||
| gsutil rm -r "gs://${gcs_alerts_bucket}" | ||
| gcloud pubsub topics delete "${ps_topic_alerts_in_bucket}" | ||
| gcloud pubsub subscriptions delete "${ps_input_subscrip}" | ||
| gcloud run services delete "${cr_module_name}" --region "${region}" | ||
| else | ||
| echo 'ERROR: No testid supplied.' | ||
| echo 'To avoid accidents, this script will not delete production resources.' | ||
| echo 'If that is your intention, you must delete them manually.' | ||
| echo 'Otherwise, please supply a testid.' | ||
| exit 1 | ||
| fi | ||
| else | ||
| echo | ||
| echo "Creating gcs_alerts_bucket and setting permissions..." | ||
| if ! gsutil ls -b "gs://${gcs_alerts_bucket}" >/dev/null 2>&1; then | ||
| #--- Create the bucket that will store the alerts | ||
| gsutil mb -l "${region}" "gs://${gcs_alerts_bucket}" | ||
| gsutil uniformbucketlevelaccess set on "gs://${gcs_alerts_bucket}" | ||
| gsutil requesterpays set on "gs://${gcs_alerts_bucket}" | ||
| # set IAM policies on public GCP resources | ||
| if [ "$testid" = "False" ]; then | ||
| gcloud storage buckets add-iam-policy-binding "gs://${gcs_alerts_bucket}" \ | ||
| --member="allUsers" \ | ||
| --role="roles/storage.objectViewer" | ||
| fi | ||
| else | ||
| echo "${gcs_alerts_bucket} already exists." | ||
| fi | ||
|
|
||
| #--- Setup the Pub/Sub notifications on the JSON storage bucket | ||
| echo | ||
| echo "Configuring Pub/Sub notifications on GCS bucket..." | ||
| trigger_event=OBJECT_FINALIZE | ||
| format=json # json or none; if json, file metadata sent in message body | ||
| gsutil notification create \ | ||
| -t "$ps_topic_alerts_in_bucket" \ | ||
| -e "$trigger_event" \ | ||
| -f "$format" \ | ||
| "gs://${gcs_alerts_bucket}" | ||
|
|
||
| #--- Deploy the Cloud Run service | ||
| echo | ||
| echo "Creating container image for ${MODULE_NAME} module and deploying to Cloud Run..." | ||
| moduledir="." # assumes deploying what's in our current directory | ||
| config="${moduledir}/cloudbuild.yaml" | ||
| # deploy the service and capture the endpoint's URL | ||
| url=$(gcloud builds submit --config="${config}" \ | ||
| --substitutions="_SURVEY=${survey},_TESTID=${testid},_MODULE_NAME=${cr_module_name},_REPOSITORY=${artifact_registry_repo},_VERSIONTAG=${versiontag}" \ | ||
| "${moduledir}" | sed -n 's/^Step #2: Service URL: \(.*\)$/\1/p') | ||
| echo | ||
| echo "Creating trigger subscription for ${MODULE_NAME} Cloud Run service..." | ||
| gcloud pubsub subscriptions create "${ps_input_subscrip}" \ | ||
| --topic "${ps_trigger_topic}" \ | ||
| --topic-project "${PROJECT_ID}" \ | ||
| --ack-deadline=600 \ | ||
| --push-endpoint="${url}${ROUTE_RUN}" \ | ||
| --push-auth-service-account="${runinvoker_svcact}" \ | ||
| --dead-letter-topic="${ps_deadletter_topic}" \ | ||
| --max-delivery-attempts=5 | ||
| fi |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,107 @@ | ||
| #!/usr/bin/env python3 | ||
| # -*- coding: UTF-8 -*- | ||
|
|
||
| """This module stores Swift/BAT-GUANO alert data as a JSON file in Cloud Storage.""" | ||
|
|
||
| import os | ||
| import flask | ||
| import pittgoogle | ||
| from google.cloud import logging, storage | ||
| from google.cloud.exceptions import PreconditionFailed | ||
|
|
||
| # [FIXME] Make this helpful or else delete it. | ||
| # Connect the python logger to the google cloud logger. | ||
| # By default, this captures INFO level and above. | ||
| # pittgoogle uses the python logger. | ||
| # We don't currently use the python logger directly in this script, but we could. | ||
| logging.Client().setup_logging() | ||
|
|
||
| PROJECT_ID = os.getenv("GCP_PROJECT") | ||
| TESTID = os.getenv("TESTID") | ||
| SURVEY = os.getenv("SURVEY") | ||
| VERSIONTAG = os.getenv("VERSIONTAG") | ||
|
|
||
| # Variables for incoming data | ||
| # A url route is used in setup.sh when the trigger subscription is created. | ||
| # It is possible to define multiple routes in a single module and trigger them using different subscriptions. | ||
| ROUTE_RUN = "/" # HTTP route that will trigger run(). Must match deploy.sh | ||
|
|
||
| # Variables for outgoing data | ||
| HTTP_204 = 204 # HTTP code: Success | ||
| HTTP_400 = 400 # HTTP code: Bad Request | ||
|
|
||
| # GCP resources used in this module | ||
| TOPIC_ALERTS_JSON = pittgoogle.Topic.from_cloud( | ||
| "alerts", survey=SURVEY, testid=TESTID, projectid=PROJECT_ID | ||
| ) | ||
| bucket_name = f"{PROJECT_ID}-{SURVEY}_alerts" | ||
| if TESTID != "False": | ||
| bucket_name = f"{bucket_name}-{TESTID}" | ||
|
|
||
| client = storage.Client() | ||
| bucket = client.get_bucket(client.bucket(bucket_name, user_project=PROJECT_ID)) | ||
|
|
||
| app = flask.Flask(__name__) | ||
|
|
||
|
|
||
| @app.route(ROUTE_RUN, methods=["POST"]) | ||
| def run() -> tuple[str, int]: | ||
| """Uploads alert data to a GCS bucket. Publishes a de-duplicated JSON-serialized "alerts" stream | ||
| (${survey}-alerts) containing the original alert bytes. A BigQuery subscription is used to write alert data to | ||
| the appropriate BigQuery table. | ||
|
|
||
| This module is intended to be deployed as a Cloud Run service. It will operate as an HTTP endpoint | ||
| triggered by Pub/Sub messages. This function will be called once for every message sent to this route. | ||
| It should accept the incoming HTTP request and return a response. | ||
|
|
||
| Returns | ||
| ------- | ||
| response : tuple(str, int) | ||
| Tuple containing the response body (string) and HTTP status code (int). Flask will convert the | ||
| tuple into a proper HTTP response. Note that the response is a status message for the web server. | ||
| """ | ||
| # extract the envelope from the request that triggered the endpoint | ||
| # this contains a single Pub/Sub message with the alert to be processed | ||
| envelope = flask.request.get_json() | ||
| try: | ||
| alert = pittgoogle.Alert.from_cloud_run(envelope, schema_name="default") | ||
| except pittgoogle.exceptions.BadRequest as exc: | ||
| return str(exc), HTTP_400 | ||
|
|
||
| blob = bucket.blob(_name_in_bucket(alert)) | ||
| blob.metadata = _create_file_metadata(alert, event_id=envelope["message"]["messageId"]) | ||
|
|
||
| # raise a PreconditionFailed exception if filename already exists in the bucket using "if_generation_match=0" | ||
| try: | ||
| blob.upload_from_string(alert.msg.data, if_generation_match=0) | ||
| except PreconditionFailed: | ||
| # this alert is a duplicate. drop it. | ||
| return "", HTTP_204 | ||
|
|
||
| # publish the same alert as JSON | ||
| TOPIC_ALERTS_JSON.publish(alert) | ||
|
|
||
| return "", HTTP_204 | ||
|
|
||
|
|
||
| def _create_file_metadata(alert: pittgoogle.Alert, event_id: str) -> dict: | ||
| """Return key/value pairs to be attached to the file as metadata.""" | ||
| # https://github.com/nasa-gcn/gcn-schema/blob/main/gcn/notices/swift/bat/Guano.example.json | ||
| metadata = {"file_origin_message_id": event_id} | ||
| metadata["_".join("alert_datetime")] = alert.dict["alert_datetime"] | ||
| metadata["_".join("alert_type")] = alert.dict["alert_type"] | ||
| metadata["_".join("classification")] = alert.dict["classification"] | ||
| metadata["_".join("id")] = alert.dict["id"] | ||
|
|
||
| return metadata | ||
|
|
||
|
|
||
| def _name_in_bucket(alert: pittgoogle.Alert) -> str: | ||
| """Return the name of the file in the bucket.""" | ||
| # not easily able to extract schema version, see: | ||
| # https://github.com/nasa-gcn/gcn-schema/blob/main/gcn/notices/swift/bat/Guano.example.json | ||
| _date = alert.dict["alert_datetime"][0:10] | ||
| _alert_type = alert.dict["alert_type"] | ||
| _id = alert.dict["id"][0] | ||
|
|
||
| return f"{VERSIONTAG}/{_date}/{_alert_type}/{_id}.json" |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,14 @@ | ||
| # As explained here | ||
| # https://cloud.google.com/functions/docs/writing/specifying-dependencies-python | ||
| # dependencies for a Cloud Function must be specified in a `requirements.txt` | ||
| # file (or packaged with the function) in the same directory as `main.py` | ||
|
|
||
| google-cloud-logging | ||
| google-cloud-storage | ||
| pittgoogle-client>=0.3.15 | ||
|
|
||
| # for Cloud Run | ||
| # https://cloud.google.com/run/docs/quickstarts/build-and-deploy/deploy-python-service | ||
| Flask | ||
| gunicorn | ||
| Werkzeug |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,35 @@ | ||
| # Start the Swift/BAT-GUANO consumer VM | ||
|
|
||
| See `broker/setup_broker/swift/README.md` for setup instructions. | ||
|
|
||
| To start the consumer VM: | ||
|
|
||
| ```bash | ||
| survey="swift" | ||
| testid="mytest" | ||
| consumerVM="${survey}-consumer-${testid}" | ||
| zone="us-central1-a" | ||
|
|
||
| # Set the VM metadata | ||
| KAFKA_TOPIC="enter Kafka topic" | ||
| PS_TOPIC="${survey}-alerts-${testid}" | ||
| gcloud compute instances add-metadata ${consumerVM} --zone=${zone} \ | ||
| --metadata KAFKA_TOPIC=${KAFKA_TOPIC},PS_TOPIC=${PS_TOPIC} | ||
|
|
||
| # Start the VM | ||
| gcloud compute instances start ${consumerVM} --zone ${zone} | ||
| # this launches the startup script which configures and starts the | ||
| # Kafka -> Pub/Sub connector | ||
| ``` | ||
|
|
||
| To stop stop the consumer VM: | ||
|
|
||
| ```bash | ||
| survey="swift" | ||
| testid="mytest" | ||
| consumerVM="${survey}-consumer-${testid}" | ||
| zone="us-central1-a" | ||
|
|
||
| # Stop the VM | ||
| gcloud compute instances stop ${consumerVM} --zone ${zone} | ||
| ``` |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,13 @@ | ||
| # Kafka Admin client configs | ||
| # This file is part of a workflow that creates an authenticated connection to the Kafka broker. | ||
| # In cases where we can connect without authentication (e.g., ZTF), this file is not used. | ||
| # For config options, see https://kafka.apache.org/documentation/#adminclientconfigs | ||
| # For Swift-specific options, see https://gcn.nasa.gov/docs/client#java | ||
|
|
||
| security.protocol=SASL_SSL | ||
| sasl.mechanism=OAUTHBEARER | ||
| sasl.login.callback.handler.class=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginCallbackHandler | ||
| sasl.oauthbearer.token.endpoint.url=https://auth.gcn.nasa.gov/oauth2/token | ||
| sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ | ||
| clientId="CLIENT_ID" \ | ||
| clientSecret="CLIENT_SECRET"; |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,41 @@ | ||
| # Kafka Connect sink connector configs | ||
| # For config options, see https://docs.confluent.io/platform/current/installation/configuration/connect/sink-connect-configs.html | ||
| # For additional Pub/Sub-specific options, see https://github.com/googleapis/java-pubsub-group-kafka-connector?tab=readme-ov-file#sink-connector | ||
| # | ||
| # -------------------------------------------------------------------------- | ||
| # This file is adapted from: | ||
| # https://github.com/googleapis/java-pubsub-group-kafka-connector/blob/main/config/cps-sink-connector.properties | ||
| # The original copyright and license are reproduced below. | ||
| # | ||
| # Copyright 2022 Google LLC | ||
| # | ||
| # Licensed under the Apache License, Version 2.0 (the "License"); | ||
| # you may not use this file except in compliance with the License. | ||
| # You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, software | ||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
| # -------------------------------------------------------------------------- | ||
|
|
||
| # Unique name for the Pub/Sub sink connector. | ||
| name=CPSSinkConnector | ||
| # Tha Java class for the Pub/Sub sink connector. | ||
| connector.class=com.google.pubsub.kafka.sink.CloudPubSubSinkConnector | ||
| # The maximum number of tasks that should be created for this connector. | ||
| tasks.max=1 | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is the alert rate you expect from this stream? I'm guessing that one task is probably fine, but it's worth checking.
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Now I see in the data listings PR, "less than 1 alert per week". So 1 task will be fine. |
||
| # Set the key converter for the Pub/Sub sink connector. | ||
| key.converter=org.apache.kafka.connect.converters.ByteArrayConverter | ||
| # Set the value converter for the Pub/Sub sink connector. | ||
| value.converter=org.apache.kafka.connect.converters.ByteArrayConverter | ||
| # Set kafka the topic | ||
| topics=KAFKA_TOPIC | ||
| # Set the Pub/Sub configs | ||
| cps.project=PROJECT_ID | ||
| cps.topic=PS_TOPIC | ||
| # include Kafka topic, partition, offset, timestamp as msg attributes | ||
| metadata.publish=true | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are a bunch of variables in here with names that include "avro" -- I'm sure this is a holdover from the ZTF and LSST modules. For reusability, we should change those names because not all surveys publish avro alerts. For Swift in particular, assuming they publish in json and not avro, having "avro" here is confusing.