Skip to content
Open
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
8638b5b
update script to create or delete new GCP resources
hernandezc1 Jul 21, 2025
6d7fae4
create bq_lvk_policy.json
hernandezc1 Jul 21, 2025
1024ee7
create file for `lvk-alerts-to-storage` module
hernandezc1 Jul 21, 2025
5648b1d
address codacy issue
hernandezc1 Jul 21, 2025
e967712
update script
hernandezc1 Jul 21, 2025
268d8f7
add versiontag as an environment variable
hernandezc1 Jul 21, 2025
30bd436
update default Pub/Sub topic
hernandezc1 Jul 21, 2025
06c21cf
update date value
hernandezc1 Jul 21, 2025
b4f241d
remove unused GCP resources
hernandezc1 Jul 21, 2025
0076075
unpin requirements
hernandezc1 Jul 21, 2025
a754bc3
add Pub/Sub topic
hernandezc1 Jul 21, 2025
d379e16
update Pub/Sub resource name
hernandezc1 Jul 21, 2025
304e56c
incorporate requested changes
hernandezc1 Aug 5, 2025
05b9f9a
make the alerts bucket public for production instances only
hernandezc1 Aug 6, 2025
c65aa9c
update `schema_version` value for outgoing Pub/Sub msg attributes (cu…
hernandezc1 Aug 8, 2025
5f9f563
update version requirements for pittgoogle-client
hernandezc1 Aug 19, 2025
16a1da0
add `schema_version`
hernandezc1 Aug 19, 2025
24fde97
add the field `schema_version` to the message data
hernandezc1 Aug 19, 2025
a817ac3
Merge branch 'develop' into u/ch/lvk/ps_to_storage
hernandezc1 Sep 24, 2025
ceded5a
Merge branch 'develop' into u/ch/lvk/ps_to_storage
hernandezc1 Sep 26, 2025
8e8b5a7
add `kafkaPublishTimestamp` to the table schema
hernandezc1 Sep 26, 2025
ce17c69
add `kafka.timestamp` to the file metadata
hernandezc1 Sep 26, 2025
173aa5b
create SMT to add the `kafkaPublishTimestamp` field to outgoing message
hernandezc1 Sep 26, 2025
b7d590b
partition BigQuery table
hernandezc1 Oct 3, 2025
359db16
Merge branch 'develop' into u/ch/lvk/ps_to_storage
hernandezc1 Nov 8, 2025
c05e02f
update IAM policies on production resources
hernandezc1 Nov 8, 2025
18b13eb
update user-defined function and rename file
hernandezc1 Nov 8, 2025
f2d2985
update filepath to SMT
hernandezc1 Nov 8, 2025
38875e1
update documentation and IAM policies
hernandezc1 Nov 8, 2025
63eb4df
define $PROJECT_NUMBER
hernandezc1 Nov 8, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions broker/cloud_run/lvk/ps_to_storage/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Use the official lightweight Python image.
# https://hub.docker.com/_/python
FROM python:3.12-slim

# Allow statements and log messages to immediately appear in the Knative logs
ENV PYTHONUNBUFFERED True

# Copy local code to the container image.
ENV APP_HOME /app
WORKDIR $APP_HOME
COPY . ./

# Install production dependencies.
RUN pip install --no-cache-dir -r requirements.txt

# Run the web service on container startup. Here we use the gunicorn
# webserver, with one worker process and 8 threads.
# For environments with multiple CPU cores, increase the number of workers
# to be equal to the cores available.
# Timeout is set to 0 to disable the timeouts of the workers to allow Cloud Run to handle instance scaling.
CMD exec gunicorn --bind :$PORT --workers 1 --threads 8 --timeout 0 main:app
29 changes: 29 additions & 0 deletions broker/cloud_run/lvk/ps_to_storage/cloudbuild.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# https://cloud.google.com/build/docs/deploying-builds/deploy-cloud-run
# containerize the module and deploy it to Cloud Run
steps:
# Build the image
- name: 'gcr.io/cloud-builders/docker'
args: ['build', '-t', '${_REGION}-docker.pkg.dev/${PROJECT_ID}/${_REPOSITORY}/${_MODULE_IMAGE_NAME}', '.']
# Push the image to Artifact Registry
- name: 'gcr.io/cloud-builders/docker'
args: ['push', '${_REGION}-docker.pkg.dev/${PROJECT_ID}/${_REPOSITORY}/${_MODULE_IMAGE_NAME}']
# Deploy image to Cloud Run
- name: 'gcr.io/google.com/cloudsdktool/cloud-sdk'
entrypoint: gcloud
args: ['run', 'deploy', '${_MODULE_NAME}', '--image', '${_REGION}-docker.pkg.dev/${PROJECT_ID}/${_REPOSITORY}/${_MODULE_IMAGE_NAME}', '--region', '${_REGION}', '--set-env-vars', '${_ENV_VARS}']
images:
- '${_REGION}-docker.pkg.dev/${PROJECT_ID}/${_REPOSITORY}/${_MODULE_IMAGE_NAME}'
substitutions:
_SURVEY: 'lvk'
_TESTID: 'testid'
_MODULE_NAME: '${_SURVEY}-alerts-to-storage-${_TESTID}'
_MODULE_IMAGE_NAME: 'gcr.io/${PROJECT_ID}/${_REPOSITORY}/${_MODULE_NAME}'
_REPOSITORY: 'cloud-run-services'
# cloud functions automatically sets the projectid env var using the name "GCP_PROJECT"
# use the same name here for consistency
# [TODO] PROJECT_ID is set in setup.sh. this is confusing and we should revisit the decision.
# i (Raen) think i didn't make it a substitution because i didn't want to set a default for it.
_ENV_VARS: 'GCP_PROJECT=${PROJECT_ID},SURVEY=${_SURVEY},TESTID=${_TESTID},VERSIONTAG=${_VERSIONTAG}'
_REGION: 'us-central1'
options:
dynamic_substitutions: true
93 changes: 93 additions & 0 deletions broker/cloud_run/lvk/ps_to_storage/deploy.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
#! /bin/bash
# Deploys or deletes broker Cloud Run service
# This script will not delete Cloud Run services that are in production

# "False" uses production resources
# any other string will be appended to the names of all resources
testid="${1:-test}"
# "True" tearsdown/deletes resources, else setup
teardown="${2:-False}"
# name of the survey this broker instance will ingest
survey="${3:-lvk}"
region="${4:-us-central1}"
versiontag="${5:-v1_0}"
# get the environment variable
PROJECT_ID=$GOOGLE_CLOUD_PROJECT

MODULE_NAME="alerts-to-storage" # lower case required by cloud run
ROUTE_RUN="/" # url route that will trigger main.run()

define_GCP_resources() {
local base_name="$1"
local separator="${2:--}"
local testid_suffix=""

if [ "$testid" != "False" ] && [ -n "$testid" ]; then
testid_suffix="${separator}${testid}"
fi
echo "${base_name}${testid_suffix}"
}

#--- GCP resources used in this script
artifact_registry_repo=$(define_GCP_resources "${survey}-cloud-run-services")
cr_module_name=$(define_GCP_resources "${survey}-${MODULE_NAME}") # lower case required by cloud run
gcs_json_bucket=$(define_GCP_resources "${PROJECT_ID}-${survey}_alerts")
ps_deadletter_topic=$(define_GCP_resources "${survey}-deadletter")
ps_input_subscrip=$(define_GCP_resources "${survey}-alerts_raw") # pub/sub subscription used to trigger cloud run module
ps_topic_alert_in_bucket=$(define_GCP_resources "projects/${PROJECT_ID}/topics/${survey}-alert_in_bucket")
ps_trigger_topic=$(define_GCP_resources "${survey}-alerts_raw")
runinvoker_svcact="cloud-run-invoker@${PROJECT_ID}.iam.gserviceaccount.com"

if [ "${teardown}" = "True" ]; then
# ensure that we do not teardown production resources
if [ "${testid}" != "False" ]; then
echo
echo "Deleting resources for ${MODULE_NAME} module..."
gsutil rm -r "gs://${gcs_json_bucket}"
gcloud pubsub subscriptions delete "${ps_input_subscrip}"
gcloud pubsub topics delete "${ps_topic_alert_in_bucket}"
gcloud run services delete "${cr_module_name}" --region "${region}"
fi
Comment on lines 44 to 58
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add an else statement that prints a message explaining why nothing happens and explaining what to do instead. You can reuse one you already added to a different script.

else
echo
echo "Creating gcs_json_bucket and uploading files..."
if ! gsutil ls -b "gs://${gcs_json_bucket}" >/dev/null 2>&1; then
gsutil mb -b on -l "${region}" "gs://${gcs_json_bucket}"
gsutil uniformbucketlevelaccess set on "gs://${gcs_json_bucket}"
gsutil requesterpays set on "gs://${gcs_json_bucket}"
gcloud storage buckets add-iam-policy-binding "gs://${gcs_json_bucket}" \
--member="allUsers" \
--role="roles/storage.objectViewer"
else
echo "${gcs_json_bucket} already exists."
fi

echo
echo "Configuring Pub/Sub notifications on GCS bucket..."
trigger_event=OBJECT_FINALIZE
format=json # json or none; if json, file metadata sent in message body
gsutil notification create \
-t "$ps_topic_alert_in_bucket" \
-e "$trigger_event" \
-f "$format" \
"gs://${gcs_json_bucket}"

#--- Deploy Cloud Run service
echo
echo "Creating container image for ${MODULE_NAME} module and deploying to Cloud Run..."
moduledir="." # assumes deploying what's in our current directory
config="${moduledir}/cloudbuild.yaml"
url=$(gcloud builds submit --config="${config}" \
--substitutions="_SURVEY=${survey},_TESTID=${testid},_MODULE_NAME=${cr_module_name},_REPOSITORY=${artifact_registry_repo},_VERSIONTAG=${versiontag}" \
"${moduledir}" | sed -n 's/^Step #2: Service URL: \(.*\)$/\1/p')
echo
echo "Creating trigger subscription for ${MODULE_NAME} Cloud Run service..."
gcloud pubsub subscriptions create "${ps_input_subscrip}" \
--topic "${ps_trigger_topic}" \
--topic-project "${PROJECT_ID}" \
--ack-deadline=600 \
--push-endpoint="${url}${ROUTE_RUN}" \
--push-auth-service-account="${runinvoker_svcact}" \
--dead-letter-topic="${ps_deadletter_topic}" \
--max-delivery-attempts=5
fi
104 changes: 104 additions & 0 deletions broker/cloud_run/lvk/ps_to_storage/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
#!/usr/bin/env python3
# -*- coding: UTF-8 -*-

"""This module stores LVK alert data as a JSON file in Cloud Storage."""

import os
import flask
import pittgoogle
from google.cloud import logging, storage
from google.cloud.exceptions import PreconditionFailed

# [FIXME] Make this helpful or else delete it.
# Connect the python logger to the google cloud logger.
# By default, this captures INFO level and above.
# pittgoogle uses the python logger.
# We don't currently use the python logger directly in this script, but we could.
logging.Client().setup_logging()

PROJECT_ID = os.getenv("GCP_PROJECT")
TESTID = os.getenv("TESTID")
SURVEY = os.getenv("SURVEY")
VERSIONTAG = os.getenv("VERSIONTAG")

# Variables for incoming data
# A url route is used in setup.sh when the trigger subscription is created.
# It is possible to define multiple routes in a single module and trigger them using different subscriptions.
ROUTE_RUN = "/" # HTTP route that will trigger run(). Must match deploy.sh

# Variables for outgoing data
HTTP_204 = 204 # HTTP code: Success
HTTP_400 = 400 # HTTP code: Bad Request

# GCP resources used in this module
TOPIC_ALERTS = pittgoogle.Topic.from_cloud(
"alerts", survey=SURVEY, testid=TESTID, projectid=PROJECT_ID
)
bucket_name = f"{PROJECT_ID}-{SURVEY}_alerts"
if TESTID != "False":
bucket_name = f"{bucket_name}-{TESTID}"

client = storage.Client()
bucket = client.get_bucket(client.bucket(bucket_name, user_project=PROJECT_ID))

app = flask.Flask(__name__)


@app.route(ROUTE_RUN, methods=["POST"])
def run():
"""Uploads alert data to a GCS bucket. Publishes a de-duplicated JSON-serialized "alerts" stream
(${survey}-alerts) containing the original alert bytes. A BigQuery subscription is used to write alert data to
the appropriate BigQuery table.

This module is intended to be deployed as a Cloud Run service. It will operate as an HTTP endpoint
triggered by Pub/Sub messages. This function will be called once for every message sent to this route.
It should accept the incoming HTTP request and return a response.

Returns
-------
response : tuple(str, int)
Tuple containing the response body (string) and HTTP status code (int). Flask will convert the
tuple into a proper HTTP response. Note that the response is a status message for the web server.
"""
# extract the envelope from the request that triggered the endpoint
# this contains a single Pub/Sub message with the alert to be processed
envelope = flask.request.get_json()
try:
alert = pittgoogle.Alert.from_cloud_run(envelope, "lvk")
except pittgoogle.exceptions.BadRequest as exc:
return str(exc), HTTP_400

blob = bucket.blob(_name_in_bucket(alert))
blob.metadata = _create_file_metadata(alert, event_id=envelope["message"]["messageId"])

# raise a PreconditionFailed exception if filename already exists in the bucket using "if_generation_match=0"
try:
blob.upload_from_string(alert.msg.data, if_generation_match=0)
except PreconditionFailed:
# this alert is a duplicate. drop it.
return "", HTTP_204

# publish the same alert as JSON
TOPIC_ALERTS.publish(alert)

return "", HTTP_204


def _create_file_metadata(alert: pittgoogle.Alert, event_id: str) -> dict:
"""Return key/value pairs to be attached to the file as metadata."""
# https://git.ligo.org/emfollow/igwn-gwalert-schema/-/blob/main/igwn.alerts.v1_0.Alert.schema.json
metadata = {"file_origin_message_id": event_id}
metadata["_".join("time_created")] = alert.dict["time_created"]
metadata["_".join("alert_type")] = alert.dict["alert_type"]
metadata["_".join("id")] = alert.dict["superevent_id"]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
metadata["_".join("time_created")] = alert.dict["time_created"]
metadata["_".join("alert_type")] = alert.dict["alert_type"]
metadata["_".join("id")] = alert.dict["superevent_id"]
metadata["time_created"] = alert.dict["time_created"]
metadata["alert_type"] = alert.dict["alert_type"]
metadata["superevent_id"] = alert.dict["superevent_id"]
  1. "_".join("time_created") == "t_i_m_e___c_r_e_a_t_e_d", obviously not what's intended.
  2. In general, use the survey's field name. In particular, "id" is way too vague for a name.

In terms of other metadata would could add, the only other option I see is their "urls" field and I am not inclined to add that here. All of their other fields depend on the type of alert so we can't rely on them being available for this. I suppose we could add a try/except to deal with that but I think it's not worth the effort right now.


return metadata


def _name_in_bucket(alert: pittgoogle.Alert) -> str:
"""Return the name of the file in the bucket."""
_date = alert.dict["time_created"][0:10]
_alert_type = alert.dict["alert_type"]
_id = alert.dict["superevent_id"]

return f"{VERSIONTAG}/{_date}/{_alert_type}/{_id}.json"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing this out. What we decided on zoom was:

f"{VERSIONTAG}/{_id}/{_alert_type}-{_date}.json"

BUT, I went to patch our client Alert.name_in_bucket and realized that would impossible. These alert packets do not contain the version, so there is no way to construct the name_in_bucket using only the alert data. I don't know the best solution here. Let's think and discuss. Options off the top of my head are:

  1. Our pipeline knows the schema version, so this module could simply add it to the alert packet. We'd need to propagate that all the way through in pubsub messages, bucket objects, and bigquery schemas. LVK alerts from us would be different than from LVK itself or any other broker. That may occasionally be a pain but not a huge problem, I think. I haven't fully thought things through but don't see any other problems with this at the moment. So this is seeming like the best option.
  2. Maybe there's a way to get the consumer VM to add the schema version to the metadata (it does not open the alert packets, so it cannot add to the actual data). A downside is that it's relatively easy for data and metadata get disconnected. Also, I am very reluctant to mess with the consumer. It performs so well. I don't want to interfere.
  3. If our pipeline never drops fields from LVK alerts (so, no lite module, etc.) maybe it's not crucial for the client to be able to construct the name_in_bucket. I can still think of edge cases where we'd regret that and users would hate us for it.
  4. We could drop VERSIONTAG from the name in bucket. I really don't like the idea of making superevent_id the top level folder so maybe we use the LVK run for that, like "O4". Problem is that bigquery tables MUST be separated by schema version. There's no guarantee of a clean mapping between run and schema version. So this would make it really hard to regenerate bigquery tables from the bucket.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, this reminds me that it would be much better to remove this function altogether and just use Alert.name_in_bucket so that we don't have to keep the two in sync.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We decided yesterday on zoom to implement option 1. I'll wait for that before reviewing again.

Copy link
Collaborator Author

@hernandezc1 hernandezc1 Aug 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@troyraen I've implemented option 1! Ready for your review.

def run()
     ...
    # the schema version is not defined in the schema
    # add it manually using the environment variable defined in this script
    alert.attributes["schema_version"] = VERSIONTAG
    # publish the same alert as JSON
    TOPIC_ALERTS.publish(alert)

    return "", HTTP_204

def _create_file_metadata(alert: pittgoogle.Alert, event_id: str) -> dict:
    ...
    metadata["schema_version"] = VERSIONTAG

    return metadata

def _name_in_bucket(alert: pittgoogle.Alert) -> str:
    ...
    _id = alert.sourceid

    return f"{VERSIONTAG}/{_id}/{_alert_type}-{_date}.json"

The schema version is already propagated for BigQuery schemas

14 changes: 14 additions & 0 deletions broker/cloud_run/lvk/ps_to_storage/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# As explained here
# https://cloud.google.com/functions/docs/writing/specifying-dependencies-python
# dependencies for a Cloud Function must be specified in a `requirements.txt`
# file (or packaged with the function) in the same directory as `main.py`

google-cloud-logging
google-cloud-storage
pittgoogle-client>=0.3.15

# for Cloud Run
# https://cloud.google.com/run/docs/quickstarts/build-and-deploy/deploy-python-service
Flask
gunicorn
Werkzeug
2 changes: 1 addition & 1 deletion broker/consumer/lvk/vm_startup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ fi

#--- GCP resources used in this script
broker_bucket="${PROJECT_ID}-${survey}-broker_files"
PS_TOPIC_DEFAULT="${survey}-alerts"
PS_TOPIC_DEFAULT="${survey}-alerts_raw"
# use test resources, if requested
if [ "$testid" != "False" ]; then
broker_bucket="${broker_bucket}-${testid}"
Expand Down
Loading