Skip to content

Conversation

@hernandezc1
Copy link
Collaborator

@hernandezc1 hernandezc1 commented Jun 17, 2025

This PR creates the necessary resources to ingest the JSON-Serialized GCN Notice Types generated by the Neil Gehrels Swift Observatory. Alerts are published to a Pub/Sub topic: swift-alerts. A BigQuery subscription attached to the topic writes the alert data to a BigQuery table (alerts_v4_5_0) in the dataset swift_alerts

@hernandezc1 hernandezc1 self-assigned this Jun 17, 2025
@hernandezc1 hernandezc1 added Enhancement New feature or request Pipeline: Conductor Components that orchestrate the live pipeline labels Jun 17, 2025
@troyraen
Copy link
Collaborator

Yay!

One thought: We already listen to one GCN stream (LVK) and may want to add more in the future. Consider how we can make this easy for ourselves. For example, maybe all of our GCN consumers can use the same base container image and ingest different streams based only on an environment variable (and maybe separate credentials)? I don't know the answer or even all the possibilities, just putting a bug in your ear.

@hernandezc1 hernandezc1 requested a review from troyraen June 19, 2025 15:43
Copy link
Collaborator

@troyraen troyraen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, this will be great! A few things to clarify below.

I assume this is consumer will be one that we'll leave running 24/7?

I strongly encourage you to include the storage of alerts in a bucket with this PR. A bucket is more valuable to us than a bigquery table. We can create or recreate a table from alerts in a bucket but it's difficult to create a bucket of alerts from a bigquery table. It would also be easy to "replay" a pubsub stream using alerts in a bucket but that would be difficult if trying to use a table. We've already run into this problem with LSST (and/or LVK?) when we wanted to change the table but had trouble because we didn't have a bucket to draw from.

# Tha Java class for the Pub/Sub sink connector.
connector.class=com.google.pubsub.kafka.sink.CloudPubSubSinkConnector
# The maximum number of tasks that should be created for this connector.
tasks.max=1
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the alert rate you expect from this stream? I'm guessing that one task is probably fine, but it's worth checking.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now I see in the data listings PR, "less than 1 alert per week". So 1 task will be fine.

@@ -0,0 +1,77 @@
#! /bin/bash
# Installs the software required to run the Kafka Consumer.
# Assumes a Debian 10 OS.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Double check this. I think you're using Debian 11.

Copy link
Collaborator Author

@hernandezc1 hernandezc1 Jun 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@troyraen it's actually using Debian 12! Will update the comments in this script. I was reusing code from previous vm_install scripts (e.g., LVK).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's actually using Debian 12!

even better!

snap install yq

#--- Install Java and the dev kit
# see https://www.digitalocean.com/community/tutorials/how-to-install-java-with-apt-on-debian-10
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to above. I think this should be a Debian 11 link.

echo "Installing the Kafka -> Pub/Sub connector"
(
plugindir=/usr/local/share/kafka/plugins
CONNECTOR_RELEASE="1.1.0"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This version is more than 2 yrs old now. Latest version is 1.3.2, released 2025-06-02. It's probably worth a quick test to see if we can upgrade to the latest version without much trouble. If it turns into a rabbit hole, ok to punt that task and continue with this version for now.


#--- GCP resources used directly in this script
broker_bucket=$(define_GCP_resources "${PROJECT_ID}-${survey}-broker_files")
bq_dataset_alerts=$(define_GCP_resources "${survey}_alerts")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
bq_dataset_alerts=$(define_GCP_resources "${survey}_alerts")
bq_dataset_alerts=$(define_GCP_resources "${survey}")

Started this conversation on slack... Our datasets for other surveys are just named as the name of the survey. I see that we've started putting our value-added products in a separate dataset but I think that only creates more confusion than it's worth.

bq --location="${region}" mk --dataset "${bq_dataset_alerts}"

(cd templates && bq mk --table "${PROJECT_ID}:${bq_dataset_alerts}.${alerts_table}" "bq_${survey}_${alerts_table}_schema.json") || exit 5
bq update --description "Alert data from Swift/BAT-GUANO. This table is an archive of the swift-alerts Pub/Sub stream. It has the same schema as the original alert bytes, including nested and repeated fields." "${PROJECT_ID}:${bq_dataset_alerts}.${alerts_table}"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a link to the survey's documentation of the schema or some other page where users can find more info, if you know it.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, maybe we should link to one of our pages instead. Maybe the survey page at docs/source/surveys/swift.rst being added in mwvgroup/pittgoogle-client#100?

Comment on lines 43 to 47
if [ "$base_name" = "${survey}_alerts" ]; then
testid_suffix="_${testid}" # complies with BigQuery naming conventions
else
testid_suffix="-${testid}"
fi
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recommending finding a more clear and direct way to identify whether "-" or "_" should be used here. Maybe a boolean function arg? Using the resource name makes this code quite brittle because we have to remember to change the guts of the function if we change the resource name and/or add new resources.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function has been updated

Comment on lines 96 to 98
# in order to create BigQuery subscriptions, ensure that the following service account:
# service-<project number>@gcp-sa-pubsub.iam.gserviceaccount.com" has the
# bigquery.dataEditor role for each table
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can the code for this be included here? Ideally, the setup script(s) should take care of all setup and the developer should not have to do things like this manually. If there is a good reason for the developer to do something manually, those instructions should be included with the other setup instructions rather than being buried in code comments.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I've now included the code here

Comment on lines 108 to 111
# set IAM policies on resources
user="allUsers"
roleid="projects/${GOOGLE_CLOUD_PROJECT}/roles/userPublic"
gcloud pubsub topics add-iam-policy-binding "${topic_alerts}" --member="${user}" --role="${roleid}"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make the bigquery dataset public as well.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The process of making the dataset public involves a few more steps than I was expecting. The documentation I referenced can be accessed here

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recall it being more difficult for bigquery than pubsub as well. Looks like you worked it out, so great job!

Comment on lines 113 to 125
else
if [ "$environment_type" = "testing" ]; then
# delete testing resources
o="GSUtil:parallel_process_count=1" # disable multiprocessing for Macs
gsutil -m -o "${o}" rm -r "gs://${broker_bucket}"
bq rm -r -f "${PROJECT_ID}:${bq_dataset_alerts}"
gcloud pubsub topics delete "${topic_alerts}"
gcloud pubsub topics delete "${deadletter_topic_bigquery_import}"
gcloud pubsub subscriptions delete "${subscription_alerts_reservoir}"
gcloud pubsub subscriptions delete "${deadletter_subscription_bigquery_import}"
gcloud pubsub subscriptions delete "${subscription_bigquery_import}"
fi
fi
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Add a comment explaining that the consumer VM will be deleted by the create_vm.sh script.

  2. Add a print statement for the case environment_type="production" explaining why nothing happens and what the developer needs to do instead. You can copy/paste the one from the ZTF setup_broker.sh.

"description": "URL of HEALPix localization probability file."
},
{
"name": "healpix_file",
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@troyraen do we want to keep the healpix_file field here? This information will be available in the GCS bucket. We had a similar conversation when we first started storing alert data in BigQuery for LVK

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My default instinct is to not alter the survey's schema. And/but if we are serving a copy of this file from GCP, we should definitely figure out how/where to advertise that since getting it from within GCP will be more convenient for at least some users.

I'm not sure I remember the details about what's going on here... Is this the file containing the probability that the event originated from a particular sky location (ie, a probability per healpix pixel)? And are we ingesting that file and serving it from a GCS bucket in addition to the alerts? If so, maybe we leave this original schema field intact but add another field called something like "healpix_file_gcp" or "healpix_file_pg" or "healpix_file_cloud"? I'm much more comfortable adding a new field than changing the existing one. Happy to talk this through in more detail on Thursday if I'm missing the point here.

@hernandezc1 hernandezc1 requested a review from troyraen June 27, 2025 18:45
Copy link
Collaborator

@troyraen troyraen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few things to look at. Most important is (1) the "avro" resources; and (2) name of alert in bucket. I'm also noticing that the ways we assign permissions to users/resources has gotten pretty scattered. It would be good to consolidate and handle them consistently.

Comment on lines 33 to 35
TOPIC_ALERTS_JSON = pittgoogle.Topic.from_cloud(
"alerts-json", survey=SURVEY, testid=TESTID, projectid=PROJECT_ID
)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do these alerts come from Swift as json or avro? If json, I think the name of this topic can just be swift-alerts since we're not changing the serialization. Also, if this stays as swift-alerts-json does that mean we don't publish any topic that's just called swift-alerts? I think from a usability/consistency standpoint we should always publish a stream called <survey>-alerts that is just a pass through of the survey's full (but deduplicated) alert stream from Kafka (or whatever) into Pub/Sub.

If that seems confusing in comparison with our LSST streams where lsst-alerts is avro and lsst-alerts-json is json, maybe we consider changing those names so that lsst-alerts is the json version and we make the avro one called lsst-alerts-avro? Benefit of the current naming is that lsst-alerts is byte-for-byte the same as what Rubin publishes. That was my original intention for all of our <survey>-alerts streams, and in that sense using swift-alerts is consistent (assuming Swift really does publish these as json -- otherwise, sorry for this irrelevant tangent). But since all of our topics downstream of <survey>-alerts use json exclusively, I can see an argument for making all <survey>-alerts streams json as well.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@troyraen the alerts from Swift are JSON serialized. I named the Pub/Sub resource that way because at the time it seemed more appropriate and descriptive, but I agree that having a <survey>-alerts topic that is just a pass through of the survey's full (deduplicated) alert stream is the convention this module should adopt

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are a bunch of variables in here with names that include "avro" -- I'm sure this is a holdover from the ZTF and LSST modules. For reusability, we should change those names because not all surveys publish avro alerts. For Swift in particular, assuming they publish in json and not avro, having "avro" here is confusing.

cr_module_name=$(define_GCP_resources "${survey}-${MODULE_NAME}") # lower case required by cloud run
gcs_avro_bucket=$(define_GCP_resources "${PROJECT_ID}-${survey}_alerts")
ps_input_subscrip=$(define_GCP_resources "${survey}-alerts_raw") # pub/sub subscription used to trigger cloud run module
ps_subscription_avro=$(define_GCP_resources "${survey}-alert_avros-counter")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you actually using these counter subscriptions? If not, get rid of this. I think ZTF is the only broker pipeline that uses them.

FWIW, ZTF uses them with an ancillary module that helps us track broker performance. The module very useful, but the way I wrote it is complicated and makes it expensive and that's why we haven't added it to any other broker pipeline. We should rewrite it using bigquery subscriptions (which didn't exist when I wrote the original) -- #172.

gcs_avro_bucket=$(define_GCP_resources "${PROJECT_ID}-${survey}_alerts")
ps_input_subscrip=$(define_GCP_resources "${survey}-alerts_raw") # pub/sub subscription used to trigger cloud run module
ps_subscription_avro=$(define_GCP_resources "${survey}-alert_avros-counter")
ps_topic_avro=$(define_GCP_resources "projects/${PROJECT_ID}/topics/${survey}-alert_avros")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this is the original name of the topic, but using "avros" is not generically good for the aforementioned reasons, and if Swift publishes json then we definitely shouldn't use it here. Let's think of something better. <survey>-alert_in_bucket? Just off the top of my head.

Comment on lines 87 to 94
# WARNING: This is set to retry failed deliveries. If there is a bug in main.py this will
# retry indefinitely, until the message is delete manually.
gcloud pubsub subscriptions create "${ps_input_subscrip}" \
--topic "${ps_trigger_topic}" \
--topic-project "${PROJECT_ID}" \
--ack-deadline=600 \
--push-endpoint="${url}${ROUTE_RUN}" \
--push-auth-service-account="${runinvoker_svcact}"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you figure out how to limit the number of retries for cloud run services? If so, implement it here so we can drop this disaster waiting to happen 🥴.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@troyraen the only solution I can think of is the following:

# add to line 43
ps_deadletter_topic_input_subscrip=$(define_GCP_resources "${survey}-upsilon-deadletter") 

# add to line 51
gcloud pubsub topics delete "${ps_deadletter_topic_input_subscrip}"

# add to line 61
gcloud pubsub topics create "${ps_deadletter_topic_input_subscrip}"

gcloud pubsub subscriptions create "${ps_input_subscrip}" \
  --topic="${ps_trigger_topic}" \
  --topic-project="${PROJECT_ID}" \
  --ack-deadline=600 \
  --push-endpoint="${url}${ROUTE_RUN}" \
  --push-auth-service-account="${runinvoker_svcact}" \
  --dead-letter-topic="${ps_deadletter_topic_input_subscrip}" \
  --max-delivery-attempts=5

Rather than retrying indefinitely, the message will be published to a dead letter topic after 5 delivery attempts

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. What do you think about having only one deadletter topic that we use for every module? I'm resistant to having to manage a different one for each module but I also don't want to create a mess there if/when we actually need to use it. I don't have a clear sense of if/how we will actually want to work with messages that get delivered there. Do you? If we only have one deadletter topic and multiple modules start failing and dumping messages there, will we actually have a need to dig through them and sort out which ones came from which module? I think I would be more inclined to look at the bigquery tables to figure out which messages did/didn't make it through a given module. What do you think?

Comment on lines 121 to 126
# set IAM policies on public Pub/Sub resources
if [ "$testid" = "False" ]; then
user="allUsers"
roleid="projects/${GOOGLE_CLOUD_PROJECT}/roles/userPublic"
gcloud pubsub topics add-iam-policy-binding "${ps_topic_alerts_json}" --member="${user}" --role="${roleid}"
fi
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

'alerts_raw' needs this same policy.

Comment on lines 58 to 60
gcloud storage buckets add-iam-policy-binding "gs://${gcs_avro_bucket}" \
--member="allUsers" \
--role="roles/storage.objectViewer"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to the bigquery policy, we should incorporate the permissions being granted here into our custom userPublic role so that we consolidate all the permissions we intend to grant to the public and we can apply the same role/policy to every resource. Also, we should really only give public access to production resources.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to standardize permissions across broker instances for all surveys in a separate PR

# name of the survey this broker instance will ingest
survey="${4:-swift}"
zone="${5:-us-central1-a}"
project_id="${6:-PROJECT_ID}"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to define yet another name for this variable. Just use $PROJECT_ID directly where applicable. (GCP makes this variable confusing by using different names for it in different contexts -- PROJECT_ID and GOOGLE_CLOUD_PROJECT being most common, but there's at least one more. In our scripts we try to standardize on PROJECT_ID.)

Comment on lines 78 to 80
# grant public access to the dataset; for more information, see:
# https://cloud.google.com/bigquery/docs/control-access-to-resources-iam#grant_access_to_a_dataset
(cd templates && bq update --source "bq_${survey}_policy.json" "${PROJECT_ID}:${bq_dataset}") || exit 5
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move this to the same block that grants public access to the pubsub topics so that it's only done for production resources.

fi
else
echo
echo "Creating avro_bucket..."
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's revisit which scripts create and delete which resources. I know we had a whole discussion about it when we first created these deploy.sh scripts for individual modules and decided to put this bucket creation here, but this is the only public resource that we handle this way and I've been confused by it more than once. Every other resource gets created by setup_broker.sh. At a minimum, our handling of this bucket and the bigquery dataset/table should be consistent.

@hernandezc1 hernandezc1 requested a review from troyraen July 17, 2025 14:30
@troyraen troyraen removed their request for review October 2, 2025 21:21
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Enhancement New feature or request Pipeline: Conductor Components that orchestrate the live pipeline

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants