Skip to content
Open
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
8638b5b
update script to create or delete new GCP resources
hernandezc1 Jul 21, 2025
6d7fae4
create bq_lvk_policy.json
hernandezc1 Jul 21, 2025
1024ee7
create file for `lvk-alerts-to-storage` module
hernandezc1 Jul 21, 2025
5648b1d
address codacy issue
hernandezc1 Jul 21, 2025
e967712
update script
hernandezc1 Jul 21, 2025
268d8f7
add versiontag as an environment variable
hernandezc1 Jul 21, 2025
30bd436
update default Pub/Sub topic
hernandezc1 Jul 21, 2025
06c21cf
update date value
hernandezc1 Jul 21, 2025
b4f241d
remove unused GCP resources
hernandezc1 Jul 21, 2025
0076075
unpin requirements
hernandezc1 Jul 21, 2025
a754bc3
add Pub/Sub topic
hernandezc1 Jul 21, 2025
d379e16
update Pub/Sub resource name
hernandezc1 Jul 21, 2025
304e56c
incorporate requested changes
hernandezc1 Aug 5, 2025
05b9f9a
make the alerts bucket public for production instances only
hernandezc1 Aug 6, 2025
c65aa9c
update `schema_version` value for outgoing Pub/Sub msg attributes (cu…
hernandezc1 Aug 8, 2025
5f9f563
update version requirements for pittgoogle-client
hernandezc1 Aug 19, 2025
16a1da0
add `schema_version`
hernandezc1 Aug 19, 2025
24fde97
add the field `schema_version` to the message data
hernandezc1 Aug 19, 2025
a817ac3
Merge branch 'develop' into u/ch/lvk/ps_to_storage
hernandezc1 Sep 24, 2025
ceded5a
Merge branch 'develop' into u/ch/lvk/ps_to_storage
hernandezc1 Sep 26, 2025
8e8b5a7
add `kafkaPublishTimestamp` to the table schema
hernandezc1 Sep 26, 2025
ce17c69
add `kafka.timestamp` to the file metadata
hernandezc1 Sep 26, 2025
173aa5b
create SMT to add the `kafkaPublishTimestamp` field to outgoing message
hernandezc1 Sep 26, 2025
b7d590b
partition BigQuery table
hernandezc1 Oct 3, 2025
359db16
Merge branch 'develop' into u/ch/lvk/ps_to_storage
hernandezc1 Nov 8, 2025
c05e02f
update IAM policies on production resources
hernandezc1 Nov 8, 2025
18b13eb
update user-defined function and rename file
hernandezc1 Nov 8, 2025
f2d2985
update filepath to SMT
hernandezc1 Nov 8, 2025
38875e1
update documentation and IAM policies
hernandezc1 Nov 8, 2025
63eb4df
define $PROJECT_NUMBER
hernandezc1 Nov 8, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions broker/cloud_run/lvk/ps_to_storage/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Use the official lightweight Python image.
# https://hub.docker.com/_/python
FROM python:3.12-slim

# Allow statements and log messages to immediately appear in the Knative logs
ENV PYTHONUNBUFFERED True

# Copy local code to the container image.
ENV APP_HOME /app
WORKDIR $APP_HOME
COPY . ./

# Install production dependencies.
RUN pip install --no-cache-dir -r requirements.txt

# Run the web service on container startup. Here we use the gunicorn
# webserver, with one worker process and 8 threads.
# For environments with multiple CPU cores, increase the number of workers
# to be equal to the cores available.
# Timeout is set to 0 to disable the timeouts of the workers to allow Cloud Run to handle instance scaling.
CMD exec gunicorn --bind :$PORT --workers 1 --threads 8 --timeout 0 main:app
29 changes: 29 additions & 0 deletions broker/cloud_run/lvk/ps_to_storage/cloudbuild.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# https://cloud.google.com/build/docs/deploying-builds/deploy-cloud-run
# containerize the module and deploy it to Cloud Run
steps:
# Build the image
- name: 'gcr.io/cloud-builders/docker'
args: ['build', '-t', '${_REGION}-docker.pkg.dev/${PROJECT_ID}/${_REPOSITORY}/${_MODULE_IMAGE_NAME}', '.']
# Push the image to Artifact Registry
- name: 'gcr.io/cloud-builders/docker'
args: ['push', '${_REGION}-docker.pkg.dev/${PROJECT_ID}/${_REPOSITORY}/${_MODULE_IMAGE_NAME}']
# Deploy image to Cloud Run
- name: 'gcr.io/google.com/cloudsdktool/cloud-sdk'
entrypoint: gcloud
args: ['run', 'deploy', '${_MODULE_NAME}', '--image', '${_REGION}-docker.pkg.dev/${PROJECT_ID}/${_REPOSITORY}/${_MODULE_IMAGE_NAME}', '--region', '${_REGION}', '--set-env-vars', '${_ENV_VARS}']
images:
- '${_REGION}-docker.pkg.dev/${PROJECT_ID}/${_REPOSITORY}/${_MODULE_IMAGE_NAME}'
substitutions:
_SURVEY: 'lvk'
_TESTID: 'testid'
_MODULE_NAME: '${_SURVEY}-alerts-to-storage-${_TESTID}'
_MODULE_IMAGE_NAME: 'gcr.io/${PROJECT_ID}/${_REPOSITORY}/${_MODULE_NAME}'
_REPOSITORY: 'cloud-run-services'
# cloud functions automatically sets the projectid env var using the name "GCP_PROJECT"
# use the same name here for consistency
# [TODO] PROJECT_ID is set in setup.sh. this is confusing and we should revisit the decision.
# i (Raen) think i didn't make it a substitution because i didn't want to set a default for it.
_ENV_VARS: 'GCP_PROJECT=${PROJECT_ID},SURVEY=${_SURVEY},TESTID=${_TESTID},VERSIONTAG=${_VERSIONTAG}'
_REGION: 'us-central1'
options:
dynamic_substitutions: true
105 changes: 105 additions & 0 deletions broker/cloud_run/lvk/ps_to_storage/deploy.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
#! /bin/bash
# Deploys or deletes broker Cloud Run service
# This script will not delete Cloud Run services that are in production

# "False" uses production resources
# any other string will be appended to the names of all resources
testid="${1:-test}"
# "True" tearsdown/deletes resources, else setup
teardown="${2:-False}"
# name of the survey this broker instance will ingest
survey="${3:-lvk}"
region="${4:-us-central1}"
versiontag="${5:-v1_0}"
# get the environment variable
PROJECT_ID=$GOOGLE_CLOUD_PROJECT

MODULE_NAME="alerts-to-storage" # lower case required by cloud run
ROUTE_RUN="/" # url route that will trigger main.run()

define_GCP_resources() {
local base_name="$1"
local separator="${2:--}"
local testid_suffix=""

if [ "$testid" != "False" ] && [ -n "$testid" ]; then
testid_suffix="${separator}${testid}"
fi
echo "${base_name}${testid_suffix}"
}

#--- GCP resources used in this script
artifact_registry_repo=$(define_GCP_resources "${survey}-cloud-run-services")
cr_module_name=$(define_GCP_resources "${survey}-${MODULE_NAME}") # lower case required by cloud run
gcs_alerts_bucket=$(define_GCP_resources "${PROJECT_ID}-${survey}_alerts")
ps_deadletter_topic=$(define_GCP_resources "${survey}-deadletter")
ps_input_subscrip=$(define_GCP_resources "${survey}-alerts_raw") # pub/sub subscription used to trigger cloud run module
ps_topic_alerts_in_bucket=$(define_GCP_resources "projects/${PROJECT_ID}/topics/${survey}-alerts_in_bucket")
ps_trigger_topic=$(define_GCP_resources "${survey}-alerts_raw")
runinvoker_svcact="cloud-run-invoker@${PROJECT_ID}.iam.gserviceaccount.com"

if [ "${teardown}" = "True" ]; then
# ensure that we do not teardown production resources
if [ "${testid}" != "False" ]; then
echo
echo "Deleting resources for ${MODULE_NAME} module..."
gsutil rm -r "gs://${gcs_alerts_bucket}"
gcloud pubsub subscriptions delete "${ps_input_subscrip}"
gcloud pubsub topics delete "${ps_topic_alerts_in_bucket}"
gcloud run services delete "${cr_module_name}" --region "${region}"
else
echo 'ERROR: No testid supplied.'
echo 'To avoid accidents, this script will not delete production resources.'
echo 'If that is your intention, you must delete them manually.'
echo 'Otherwise, please supply a testid.'
exit 1
fi
else
echo
echo "Creating gcs_alerts_bucket and setting permissions..."
if ! gsutil ls -b "gs://${gcs_alerts_bucket}" >/dev/null 2>&1; then
#--- Create the bucket that will store the alerts
gsutil mb -b on -l "${region}" "gs://${gcs_alerts_bucket}"
gsutil uniformbucketlevelaccess set on "gs://${gcs_alerts_bucket}"
gsutil requesterpays set on "gs://${gcs_alerts_bucket}"
# set IAM policies on public GCP resources
if [ "$testid" = "False" ]; then
gcloud storage buckets add-iam-policy-binding "gs://${gcs_alerts_bucket}" \
--member="allUsers" \
--role="roles/storage.objectViewer"
fi
else
echo "${gcs_alerts_bucket} already exists."
fi

#--- Setup the Pub/Sub notifications on the JSON storage bucket
echo
echo "Configuring Pub/Sub notifications on GCS bucket..."
trigger_event=OBJECT_FINALIZE
format=json # json or none; if json, file metadata sent in message body
gsutil notification create \
-t "$ps_topic_alerts_in_bucket" \
-e "$trigger_event" \
-f "$format" \
"gs://${gcs_alerts_bucket}"

#--- Deploy Cloud Run service
echo
echo "Creating container image for ${MODULE_NAME} module and deploying to Cloud Run..."
moduledir="." # assumes deploying what's in our current directory
config="${moduledir}/cloudbuild.yaml"
# deploy the service and capture the endpoint's URL
url=$(gcloud builds submit --config="${config}" \
--substitutions="_SURVEY=${survey},_TESTID=${testid},_MODULE_NAME=${cr_module_name},_REPOSITORY=${artifact_registry_repo},_VERSIONTAG=${versiontag}" \
"${moduledir}" | sed -n 's/^Step #2: Service URL: \(.*\)$/\1/p')
echo
echo "Creating trigger subscription for ${MODULE_NAME} Cloud Run service..."
gcloud pubsub subscriptions create "${ps_input_subscrip}" \
--topic "${ps_trigger_topic}" \
--topic-project "${PROJECT_ID}" \
--ack-deadline=600 \
--push-endpoint="${url}${ROUTE_RUN}" \
--push-auth-service-account="${runinvoker_svcact}" \
--dead-letter-topic="${ps_deadletter_topic}" \
--max-delivery-attempts=5
fi
107 changes: 107 additions & 0 deletions broker/cloud_run/lvk/ps_to_storage/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
#!/usr/bin/env python3
# -*- coding: UTF-8 -*-

"""This module stores LVK alert data as a JSON file in Cloud Storage."""

import os
import flask
import pittgoogle
from google.cloud import logging, storage
from google.cloud.exceptions import PreconditionFailed

# [FIXME] Make this helpful or else delete it.
# Connect the python logger to the google cloud logger.
# By default, this captures INFO level and above.
# pittgoogle uses the python logger.
# We don't currently use the python logger directly in this script, but we could.
logging.Client().setup_logging()

PROJECT_ID = os.getenv("GCP_PROJECT")
TESTID = os.getenv("TESTID")
SURVEY = os.getenv("SURVEY")
VERSIONTAG = os.getenv("VERSIONTAG")

# Variables for incoming data
# A url route is used in setup.sh when the trigger subscription is created.
# It is possible to define multiple routes in a single module and trigger them using different subscriptions.
ROUTE_RUN = "/" # HTTP route that will trigger run(). Must match deploy.sh

# Variables for outgoing data
HTTP_204 = 204 # HTTP code: Success
HTTP_400 = 400 # HTTP code: Bad Request

# GCP resources used in this module
TOPIC_ALERTS = pittgoogle.Topic.from_cloud(
"alerts", survey=SURVEY, testid=TESTID, projectid=PROJECT_ID
)
bucket_name = f"{PROJECT_ID}-{SURVEY}_alerts"
if TESTID != "False":
bucket_name = f"{bucket_name}-{TESTID}"

client = storage.Client()
bucket = client.get_bucket(client.bucket(bucket_name, user_project=PROJECT_ID))

app = flask.Flask(__name__)


@app.route(ROUTE_RUN, methods=["POST"])
def run():
"""Uploads alert data to a GCS bucket. Publishes a de-duplicated JSON-serialized "alerts" stream
(${survey}-alerts) containing the original alert bytes. A BigQuery subscription is used to write alert data to
the appropriate BigQuery table.

This module is intended to be deployed as a Cloud Run service. It will operate as an HTTP endpoint
triggered by Pub/Sub messages. This function will be called once for every message sent to this route.
It should accept the incoming HTTP request and return a response.

Returns
-------
response : tuple(str, int)
Tuple containing the response body (string) and HTTP status code (int). Flask will convert the
tuple into a proper HTTP response. Note that the response is a status message for the web server.
"""
# extract the envelope from the request that triggered the endpoint
# this contains a single Pub/Sub message with the alert to be processed
envelope = flask.request.get_json()
try:
alert = pittgoogle.Alert.from_cloud_run(envelope, "lvk")
except pittgoogle.exceptions.BadRequest as exc:
return str(exc), HTTP_400

blob = bucket.blob(_name_in_bucket(alert))
blob.metadata = _create_file_metadata(alert, event_id=envelope["message"]["messageId"])

# raise a PreconditionFailed exception if filename already exists in the bucket using "if_generation_match=0"
try:
blob.upload_from_string(alert.msg.data, if_generation_match=0)
except PreconditionFailed:
# this alert is a duplicate. drop it.
return "", HTTP_204
# the schema version is not defined in the schema
# add it manually using the environment variable defined in this script
alert.attributes["schema_version"] = VERSIONTAG
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# the schema version is not defined in the schema
# add it manually using the environment variable defined in this script
alert.attributes["schema_version"] = VERSIONTAG

Add this field directly to the alert right after line 69. Then this attributed will be added automatically by TOPIC_ALERTS.publish().

# publish the same alert as JSON
TOPIC_ALERTS.publish(alert)

return "", HTTP_204


def _create_file_metadata(alert: pittgoogle.Alert, event_id: str) -> dict:
"""Return key/value pairs to be attached to the file as metadata."""
# https://git.ligo.org/emfollow/igwn-gwalert-schema/-/blob/main/igwn.alerts.v1_0.Alert.schema.json
metadata = {"file_origin_message_id": event_id}
metadata["time_created"] = alert.dict["time_created"]
metadata["alert_type"] = alert.dict["alert_type"]
metadata["superevent_id"] = alert.dict["superevent_id"]
metadata["schema_version"] = VERSIONTAG

return metadata


def _name_in_bucket(alert: pittgoogle.Alert) -> str:
"""Return the name of the file in the bucket."""
_date = alert.dict["time_created"][0:10]
_alert_type = alert.dict["alert_type"]
_id = alert.sourceid

return f"{VERSIONTAG}/{_id}/{_alert_type}-{_date}.json"
14 changes: 14 additions & 0 deletions broker/cloud_run/lvk/ps_to_storage/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# As explained here
# https://cloud.google.com/functions/docs/writing/specifying-dependencies-python
# dependencies for a Cloud Function must be specified in a `requirements.txt`
# file (or packaged with the function) in the same directory as `main.py`

google-cloud-logging
google-cloud-storage
pittgoogle-client>=0.3.15

# for Cloud Run
# https://cloud.google.com/run/docs/quickstarts/build-and-deploy/deploy-python-service
Flask
gunicorn
Werkzeug
2 changes: 1 addition & 1 deletion broker/consumer/lvk/vm_startup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ fi

#--- GCP resources used in this script
broker_bucket="${PROJECT_ID}-${survey}-broker_files"
PS_TOPIC_DEFAULT="${survey}-alerts"
PS_TOPIC_DEFAULT="${survey}-alerts_raw"
# use test resources, if requested
if [ "$testid" != "False" ]; then
broker_bucket="${broker_bucket}-${testid}"
Expand Down
Loading