Skip to content

Commit c4fdb27

Browse files
authored
Merge pull request #232 from mwvgroup/u/ch/store_BigQuery
Store LVK alerts in BigQuery
2 parents d1e5a6c + 4282062 commit c4fdb27

File tree

10 files changed

+421
-17
lines changed

10 files changed

+421
-17
lines changed
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
# Store alerts in BigQuery
2+
3+
This Cloud Function stores alert data in a BigQuery table.
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
#! /bin/bash
2+
# Deploys or deletes broker Cloud Function
3+
# This script will not delete Cloud Functions that are in production
4+
5+
# "False" uses production resources
6+
# any other string will be appended to the names of all resources
7+
testid="${1:-test}"
8+
# "True" tearsdown/deletes resources, else setup
9+
teardown="${2:-False}"
10+
# name of the survey this broker instance will ingest
11+
survey="${3:-lvk}"
12+
# schema version
13+
versiontag="${4:-v1_0}"
14+
15+
#--- GCP resources used in this script
16+
store_bq_trigger_topic="${survey}-alerts"
17+
store_bq_CF_name="${survey}-store_in_bigquery"
18+
19+
# use test resources, if requested
20+
if [ "${testid}" != "False" ]; then
21+
store_bq_trigger_topic="${store_bq_trigger_topic}-${testid}"
22+
store_bq_CF_name="${store_bq_CF_name}-${testid}"
23+
fi
24+
25+
if [ "${teardown}" = "True" ]; then
26+
# ensure that we do not teardown production resources
27+
if [ "${testid}" != "False" ]; then
28+
gcloud functions delete "${store_bq_CF_name}"
29+
fi
30+
31+
else # Deploy the Cloud Functions
32+
#--- BigQuery storage cloud function
33+
echo "Deploying Cloud Function: ${store_bq_CF_name}"
34+
store_bq_entry_point="run"
35+
memory=512MB
36+
37+
gcloud functions deploy "${store_bq_CF_name}" \
38+
--entry-point "${store_bq_entry_point}" \
39+
--runtime python312 \
40+
--memory "${memory}" \
41+
--trigger-topic "${store_bq_trigger_topic}" \
42+
--set-env-vars TESTID="${testid}",SURVEY="${survey}",GCP_PROJECT="${GOOGLE_CLOUD_PROJECT}",VERSIONTAG="${versiontag}"
43+
fi
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
#!/usr/bin/env python3
2+
# -*- coding: UTF-8 -*-
3+
4+
"""This module stores alert data in BigQuery tables."""
5+
6+
import base64
7+
import os
8+
import pittgoogle
9+
from google.cloud import functions_v1, pubsub_v1, logging
10+
11+
PROJECT_ID = os.getenv("GCP_PROJECT")
12+
SURVEY = os.getenv("SURVEY")
13+
TESTID = os.getenv("TESTID")
14+
VERSIONTAG = os.getenv("VERSIONTAG")
15+
16+
# connect to the cloud logger
17+
log_name = "store-bigquery-cloudfnc" # same log for all broker instances
18+
logging_client = logging.Client()
19+
logger = logging_client.logger(log_name)
20+
21+
# GCP resources used in this module
22+
TABLE = pittgoogle.Table.from_cloud(f"alerts_{VERSIONTAG}", survey=SURVEY, testid=TESTID)
23+
TOPIC = pittgoogle.Topic.from_cloud("bigquery", survey=SURVEY, testid=TESTID, projectid=PROJECT_ID)
24+
25+
26+
def run(event: dict, _context: functions_v1.context.Context) -> None:
27+
"""Send alert data to various BigQuery tables.
28+
29+
Args:
30+
event: Pub/Sub message data and attributes.
31+
`data` field contains the message data in a base64-encoded string.
32+
`attributes` field contains the message's custom attributes in a dict.
33+
34+
context: Metadata describing the Cloud Function's trigging event.
35+
36+
'context' is an unused argument in the function that is required
37+
see https://cloud.google.com/functions/1stgendocs/writing/write-event-driven-functions#background-functions
38+
"""
39+
40+
# decode the base64-encoded message data
41+
decoded_data = base64.b64decode(event["data"])
42+
43+
# create a PubsubMessage-like object with the existing event dictionary
44+
pubsub_message = pubsub_v1.types.PubsubMessage(
45+
data=decoded_data, attributes=event.get("attributes", {})
46+
)
47+
48+
# unpack the alert
49+
alert = pittgoogle.Alert.from_msg(msg=pubsub_message, schema_name="default_schema")
50+
51+
# send the alert to BigQuery table
52+
alert_table = insert_rows_alerts(alert)
53+
54+
# announce what's been done
55+
TOPIC.publish(_create_outgoing_alert(alert, alert_table))
56+
57+
58+
def insert_rows_alerts(alert: pittgoogle.alert.Alert):
59+
"""Insert rows into the `alerts` table via the streaming API."""
60+
# send to bigquery
61+
errors = TABLE.insert_rows([alert])
62+
63+
# handle errors; if none, save table id for Pub/Sub message
64+
if len(errors) == 0:
65+
table_dict = {"alerts_table": f"{TABLE.id}"}
66+
else:
67+
msg = f"Error inserting to alerts table: {errors}"
68+
logger.log_text(msg, severity="WARNING")
69+
table_dict = {"alerts_table": None}
70+
71+
return table_dict
72+
73+
74+
def _create_outgoing_alert(
75+
alert: pittgoogle.alert.Alert, table_dict: dict
76+
) -> pittgoogle.alert.Alert:
77+
"""Create an announcement of the table storage operation to Pub/Sub."""
78+
# collect attributes
79+
attrs = {
80+
**alert.attributes,
81+
"alerts_table": table_dict["alerts_table"],
82+
"alert_type": alert.dict["alert_type"],
83+
"superevent_id": alert.dict["superevent_id"],
84+
}
85+
86+
# set empty message body; everything is in the attributes
87+
msg = {}
88+
89+
# create outgoing alert
90+
alert_out = pittgoogle.Alert.from_dict(
91+
payload=msg, attributes=attrs, schema_name="default_schema"
92+
)
93+
94+
return alert_out
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
# As explained here
2+
# https://cloud.google.com/functions/docs/writing/specifying-dependencies-python
3+
# dependencies for a Cloud Function must be specified in a `requirements.txt`
4+
# file (or packaged with the function) in the same directory as `main.py`
5+
6+
google-cloud-functions
7+
google-cloud-logging
8+
pittgoogle-client>=0.3.11

broker/consumer/lvk/ps-connector.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ name=CPSSinkConnector
2727
# Tha Java class for the Pub/Sub sink connector.
2828
connector.class=com.google.pubsub.kafka.sink.CloudPubSubSinkConnector
2929
# The maximum number of tasks that should be created for this connector.
30-
tasks.max=10
30+
tasks.max=1
3131
# Set the key converter for the Pub/Sub sink connector.
3232
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
3333
# Set the value converter for the Pub/Sub sink connector.

broker/consumer/lvk/psconnect-worker-authenticated.properties

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
bootstrap.servers=kafka.gcn.nasa.gov:9092
77
plugin.path=/usr/local/share/kafka/plugins
88
offset.storage.file.filename=/tmp/connect.offsets
9+
connections.max.idle.ms=5400000
910

1011
# ByteArrayConverter provides a “pass-through” option that does no conversion.
1112
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
@@ -22,7 +23,7 @@ sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginMo
2223
clientSecret="CLIENT_SECRET";
2324

2425
# settings with `consumer.` prefixes are passed through to the Kafka consumer
25-
consumer.group.id=pitt-google-broker-test
26+
consumer.group.id=GROUP_ID
2627
consumer.auto.offset.reset=earliest
2728
consumer.sasl.mechanism=OAUTHBEARER
2829
consumer.sasl.kerberos.service.name=kafka

broker/consumer/lvk/vm_startup.sh

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ fi
2323

2424
#--- GCP resources used in this script
2525
broker_bucket="${PROJECT_ID}-${survey}-broker_files"
26-
PS_TOPIC_DEFAULT="${survey}-alerts_raw"
26+
PS_TOPIC_DEFAULT="${survey}-alerts"
2727
# use test resources, if requested
2828
if [ "$testid" != "False" ]; then
2929
broker_bucket="${broker_bucket}-${testid}"
@@ -60,21 +60,22 @@ client_id="${survey}-${PROJECT_ID}-client-id"
6060
client_secret="${survey}-${PROJECT_ID}-client-secret"
6161
CLIENT_ID=$(gcloud secrets versions access latest --secret="${client_id}")
6262
CLIENT_SECRET=$(gcloud secrets versions access latest --secret="${client_secret}")
63+
group_id="pittgooglebroker"
64+
# use test resources, if requested
65+
if [ "$testid" != "False" ]; then
66+
group_id="${group_id}-${testid}"
67+
fi
6368

6469
cd "${workingdir}" || exit
6570

6671
fconfig=admin.properties
67-
sed -i "s/CLIENT_ID/${CLIENT_ID}/g" ${fconfig}
68-
sed -i "s/CLIENT_SECRET/${CLIENT_SECRET}/g" ${fconfig}
72+
sed -i "s/CLIENT_ID/${CLIENT_ID}/g" ${fconfig} && sed -i "s/CLIENT_SECRET/${CLIENT_SECRET}/g" ${fconfig}
6973

7074
fconfig=psconnect-worker-authenticated.properties
71-
sed -i "s/CLIENT_ID/${CLIENT_ID}/g" ${fconfig}
72-
sed -i "s/CLIENT_SECRET/${CLIENT_SECRET}/g" ${fconfig}
75+
sed -i "s/CLIENT_ID/${CLIENT_ID}/g" ${fconfig} && sed -i "s/CLIENT_SECRET/${CLIENT_SECRET}/g" ${fconfig} && sed -i "s/GROUP_ID/${group_id}/g" ${fconfig}
7376

7477
fconfig=ps-connector.properties
75-
sed -i "s/PROJECT_ID/${PROJECT_ID}/g" ${fconfig}
76-
sed -i "s/PS_TOPIC/${PS_TOPIC}/g" ${fconfig}
77-
sed -i "s/KAFKA_TOPIC/${KAFKA_TOPIC}/g" ${fconfig}
78+
sed -i "s/PROJECT_ID/${PROJECT_ID}/g" ${fconfig} && sed -i "s/PS_TOPIC/${PS_TOPIC}/g" ${fconfig} && sed -i "s/KAFKA_TOPIC/${KAFKA_TOPIC}/g" ${fconfig}
7879

7980
#--- Check until alerts start streaming into the topic
8081
alerts_flowing=false

broker/setup_broker/lvk/README.md

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,9 +99,10 @@ Initialize parameters and call the deployment script:
9999
testid="mytest"
100100
teardown="False"
101101
survey="lvk"
102+
schema_version="1.0"
102103
region="us-central1"
103104

104-
./setup_broker.sh "${testid}" "${teardown}" "${survey}" "${region}"
105+
./setup_broker.sh "${testid}" "${teardown}" "${survey}" "${schema_version}" "${region}"
105106
```
106107

107108
This will create all of the necessary GCP resources. Allow the consumer VM to finish its installation process. Once
@@ -141,7 +142,8 @@ Similar to [deploy broker instance](#deploy-broker-instance). Initialize paramet
141142
testid="mytest"
142143
teardown="True"
143144
survey="lvk"
145+
schema_version="1.0"
144146
region="us-central1"
145147

146-
./setup_broker.sh "${testid}" "${teardown}" "${survey}" "${region}"
148+
./setup_broker.sh "${testid}" "${teardown}" "${survey}" "${schema_version}" "${region}"
147149
```

broker/setup_broker/lvk/setup_broker.sh

Lines changed: 40 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,16 @@
11
#! /bin/bash
22
# Create and configure GCP resources needed to run the nightly broker.
33

4-
testid="${1:-test}"
54
# "False" uses production resources
65
# any other string will be appended to the names of all resources
7-
teardown="${2:-False}"
6+
testid="${1:-test}"
87
# "True" tearsdown/deletes resources, else setup
9-
survey="${3:-lvk}"
8+
teardown="${2:-False}"
109
# name of the survey this broker instance will ingest
11-
region="${4:-us-central1}"
10+
survey="${3:-lvk}"
11+
schema_version="${4:-1.0}"
12+
versiontag=v$(echo "${schema_version}" | tr . _) # 1.0 -> v1_0
13+
region="${5:-us-central1}"
1214
zone="${region}-a" # just use zone "a" instead of adding another script arg
1315

1416
PROJECT_ID=$GOOGLE_CLOUD_PROJECT # get the environment variable
@@ -20,6 +22,7 @@ echo
2022
echo "GOOGLE_CLOUD_PROJECT = ${PROJECT_ID}"
2123
echo "survey = ${survey}"
2224
echo "testid = ${testid}"
25+
echo "schema_version = ${schema_version}"
2326
echo "teardown = ${teardown}"
2427
echo
2528
echo "Continue? [y/(n)]: "
@@ -34,15 +37,32 @@ fi
3437

3538
#--- GCP resources used directly in this script
3639
broker_bucket="${PROJECT_ID}-${survey}-broker_files"
40+
bq_dataset="${survey}"
3741
topic_alerts="${survey}-alerts"
42+
topic_storebigquery="${survey}-bigquery"
43+
3844
# use test resources, if requested
3945
if [ "$testid" != "False" ]; then
4046
broker_bucket="${broker_bucket}-${testid}"
47+
bq_dataset="${bq_dataset}_${testid}"
4148
topic_alerts="${topic_alerts}-${testid}"
49+
topic_storebigquery="${topic_storebigquery}-${testid}"
4250
fi
4351

44-
#--- Create (or delete) GCS, Pub/Sub resources
52+
alerts_table="alerts_${versiontag}"
53+
54+
#--- Create (or delete) BigQuery, GCS, Pub/Sub resources
55+
echo
56+
echo "Configuring BigQuery, GCS, Pub/Sub resources..."
4557
if [ "${teardown}" != "True" ]; then
58+
# create bigquery dataset and table
59+
bq --location="${region}" mk --dataset "${bq_dataset}"
60+
61+
cd templates || exit 5
62+
bq mk --table "${PROJECT_ID}:${bq_dataset}.${alerts_table}" "bq_${survey}_${alerts_table}_schema.json" || exit 5
63+
bq update --description "Alert data from LIGO/Virgo/KAGRA. This table is an archive of the lvk-alerts Pub/Sub stream. It has the same schema as the original alert bytes, including nested and repeated fields." "${PROJECT_ID}:${bq_dataset}.${alerts_table}"
64+
cd .. || exit 5
65+
4666
# create broker bucket and upload files
4767
echo "Creating broker_bucket and uploading files..."
4868
gsutil mb -b on -l "${region}" "gs://${broker_bucket}"
@@ -51,22 +71,37 @@ if [ "${teardown}" != "True" ]; then
5171
# create pubsub
5272
echo "Configuring Pub/Sub resources..."
5373
gcloud pubsub topics create "${topic_alerts}"
74+
gcloud pubsub topics create "${topic_storebigquery}"
5475

5576
# Set IAM policies on resources
5677
user="allUsers"
5778
roleid="projects/${GOOGLE_CLOUD_PROJECT}/roles/userPublic"
5879
gcloud pubsub topics add-iam-policy-binding "${topic_alerts}" --member="${user}" --role="${roleid}"
80+
gcloud pubsub topics add-iam-policy-binding "${topic_storebigquery}" --member="${user}" --role="${roleid}"
5981

6082
else
6183
# ensure that we do not teardown production resources
6284
if [ "${testid}" != "False" ]; then
6385
o="GSUtil:parallel_process_count=1" # disable multiprocessing for Macs
6486
gsutil -m -o "${o}" rm -r "gs://${broker_bucket}"
87+
bq rm -r -f "${PROJECT_ID}:${bq_dataset}"
6588
gcloud pubsub topics delete "${topic_alerts}"
89+
gcloud pubsub topics delete "${topic_storebigquery}"
6690
fi
6791
fi
6892

6993
#--- Create VM instances
7094
echo
7195
echo "Configuring VMs..."
7296
./create_vms.sh "${broker_bucket}" "${testid}" "${teardown}" "${survey}" "${zone}"
97+
98+
#--- Deploy Cloud Functions
99+
echo
100+
echo "Configuring Cloud Functions..."
101+
cd .. && cd .. && cd cloud_functions && cd lvk || exit 5
102+
103+
#--- BigQuery storage cloud function
104+
cd store_BigQuery && ./deploy.sh "$testid" "$teardown" "$survey" "$versiontag" || exit 5
105+
106+
#--- return to setup_broker/lvk directory
107+
cd .. && cd .. && cd .. && cd setup_broker && cd lvk || exit 5

0 commit comments

Comments
 (0)