Skip to content

Commit 2f23d1d

Browse files
authored
Merge branch 'develop' into u/ch/lsst/main.py
2 parents 2226ea8 + e827d34 commit 2f23d1d

File tree

13 files changed

+430
-371
lines changed

13 files changed

+430
-371
lines changed

broker/cloud_run/lsst/classify_snn/deploy.sh

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,9 @@ teardown="${2:-False}"
1111
survey="${3:-lsst}"
1212
region="${4:-us-central1}"
1313
# get the environment variable
14+
BASE_DIR=$(pwd)
1415
PROJECT_ID=$GOOGLE_CLOUD_PROJECT
16+
PROJECT_NUMBER=$(gcloud projects describe "$PROJECT_ID" --format="value(projectNumber)")
1517

1618
MODULE_NAME="supernnova" # lower case required by cloud run
1719
ROUTE_RUN="/" # url route that will trigger main.run()
@@ -30,12 +32,13 @@ define_GCP_resources() {
3032
#--- GCP resources used in this script
3133
artifact_registry_repo=$(define_GCP_resources "${survey}-cloud-run-services")
3234
bq_dataset=$(define_GCP_resources "${survey}" "_")
33-
bq_table="SuperNNova"
35+
bq_table="${MODULE_NAME}"
3436
cr_module_name=$(define_GCP_resources "${survey}-${MODULE_NAME}") # lower case required by cloud run
35-
ps_input_subscrip=$(define_GCP_resources "${survey}-SuperNNova") # pub/sub subscription used to trigger cloud run module
36-
ps_output_topic=$(define_GCP_resources "${survey}-SuperNNova")
37+
ps_input_subscrip=$(define_GCP_resources "${survey}-${MODULE_NAME}") # pub/sub subscription used to trigger cloud run module
38+
ps_output_topic=$(define_GCP_resources "${survey}-${MODULE_NAME}")
3739
ps_trigger_topic=$(define_GCP_resources "${survey}-lite")
3840
runinvoker_svcact="cloud-run-invoker@${PROJECT_ID}.iam.gserviceaccount.com"
41+
service_account="service-${PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com"
3942
# topics and subscriptions involved in writing data to BigQuery
4043
ps_bigquery_subscription=$(define_GCP_resources "${survey}-${MODULE_NAME}-bigquery-import")
4144
ps_deadletter_topic=$(define_GCP_resources "${survey}-deadletter")
@@ -51,7 +54,8 @@ if [ "${teardown}" = "True" ]; then
5154
gcloud run services delete "${cr_module_name}" --region "${region}"
5255
fi
5356
else
54-
echo "Configuring Pub/Sub resources..."
57+
echo
58+
echo "Configuring Pub/Sub resources for ${MODULE_NAME} module..."
5559
gcloud pubsub topics create "${ps_output_topic}"
5660
gcloud pubsub subscriptions create "${ps_bigquery_subscription}" \
5761
--topic="${ps_output_topic}" \
@@ -60,12 +64,14 @@ else
6064
--drop-unknown-fields \
6165
--dead-letter-topic="${ps_deadletter_topic}" \
6266
--max-delivery-attempts=5 \
63-
--dead-letter-topic-project="${PROJECT_ID}"
67+
--dead-letter-topic-project="${PROJECT_ID}" \
68+
--message-transforms-file="${BASE_DIR%%/cloud_run/*}/setup_broker/lsst/templates/ps_lsst_flatten_schema_smt.yaml"
6469
# set IAM policies on public Pub/Sub resources
6570
if [ "$testid" = "False" ]; then
6671
user="allUsers"
67-
roleid="projects/${GOOGLE_CLOUD_PROJECT}/roles/userPublic"
72+
roleid="roles/pubsub.subscriber"
6873
gcloud pubsub topics add-iam-policy-binding "${ps_output_topic}" --member="${user}" --role="${roleid}"
74+
gcloud pubsub subscriptions add-iam-policy-binding "${ps_bigquery_subscription}" --member="serviceAccount:${service_account}" --role="${roleid}"
6975
fi
7076

7177
#--- Deploy Cloud Run service

broker/cloud_run/lsst/classify_snn/main.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@
4747
# GCP resources used in this module
4848
# pittgoogle will construct the full resource names from the module name, SURVEY, and TESTID
4949
TOPIC = pittgoogle.Topic.from_cloud(
50-
"SuperNNova", survey=SURVEY, testid=TESTID, projectid=PROJECT_ID
50+
"supernnova", survey=SURVEY, testid=TESTID, projectid=PROJECT_ID
5151
)
5252

5353

broker/cloud_run/lsst/classify_upsilon/deploy.sh

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,9 @@ teardown="${2:-False}"
1111
survey="${3:-lsst}"
1212
region="${4:-us-central1}"
1313
# get the environment variable
14+
BASE_DIR=$(pwd)
1415
PROJECT_ID=$GOOGLE_CLOUD_PROJECT
16+
PROJECT_NUMBER=$(gcloud projects describe "$PROJECT_ID" --format="value(projectNumber)")
1517

1618
MODULE_NAME="upsilon" # lower case required by cloud run
1719
ROUTE_RUN="/" # url route that will trigger main.run()
@@ -36,6 +38,7 @@ ps_input_subscrip=$(define_GCP_resources "${survey}-upsilon") # pub/sub subscrip
3638
ps_output_topic=$(define_GCP_resources "${survey}-upsilon")
3739
ps_trigger_topic=$(define_GCP_resources "${survey}-lite")
3840
runinvoker_svcact="cloud-run-invoker@${PROJECT_ID}.iam.gserviceaccount.com"
41+
service_account="service-${PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com"
3942
# topics and subscriptions involved in writing data to BigQuery
4043
ps_bigquery_subscription=$(define_GCP_resources "${survey}-${MODULE_NAME}-bigquery-import")
4144
ps_deadletter_topic=$(define_GCP_resources "${survey}-deadletter")
@@ -52,7 +55,8 @@ if [ "${teardown}" = "True" ]; then
5255
fi
5356

5457
else
55-
echo "Configuring Pub/Sub resources..."
58+
echo
59+
echo "Configuring Pub/Sub resources for ${MODULE_NAME} module..."
5660
gcloud pubsub topics create "${ps_output_topic}"
5761
gcloud pubsub subscriptions create "${ps_bigquery_subscription}" \
5862
--topic="${ps_output_topic}" \
@@ -61,12 +65,14 @@ else
6165
--drop-unknown-fields \
6266
--dead-letter-topic="${ps_deadletter_topic}" \
6367
--max-delivery-attempts=5 \
64-
--dead-letter-topic-project="${PROJECT_ID}"
68+
--dead-letter-topic-project="${PROJECT_ID}" \
69+
--message-transforms-file="${BASE_DIR%%/cloud_run/*}/setup_broker/lsst/templates/ps_lsst_flatten_schema_smt.yaml"
6570
# set IAM policies on public Pub/Sub resources
6671
if [ "$testid" = "False" ]; then
6772
user="allUsers"
68-
roleid="projects/${GOOGLE_CLOUD_PROJECT}/roles/userPublic"
73+
roleid="roles/pubsub.subscriber"
6974
gcloud pubsub topics add-iam-policy-binding "${ps_output_topic}" --member="${user}" --role="${roleid}"
75+
gcloud pubsub subscriptions add-iam-policy-binding "${ps_bigquery_subscription}" --member="serviceAccount:${service_account}" --role="${roleid}"
7076
fi
7177

7278
#--- Deploy Cloud Run

broker/cloud_run/lsst/variability/deploy.sh

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,9 @@ teardown="${2:-False}"
1111
survey="${3:-lsst}"
1212
region="${4:-us-central1}"
1313
# get the environment variable
14+
BASE_DIR=$(pwd)
1415
PROJECT_ID=$GOOGLE_CLOUD_PROJECT
16+
PROJECT_NUMBER=$(gcloud projects describe "$PROJECT_ID" --format="value(projectNumber)")
1517

1618
MODULE_NAME="variability" # lower case required by cloud run
1719
ROUTE_RUN="/" # url route that will trigger main.run()
@@ -36,6 +38,7 @@ ps_input_subscrip=$(define_GCP_resources "${survey}-${MODULE_NAME}") # pub/sub s
3638
ps_output_topic=$(define_GCP_resources "${survey}-${MODULE_NAME}")
3739
ps_trigger_topic=$(define_GCP_resources "${survey}-lite")
3840
runinvoker_svcact="cloud-run-invoker@${PROJECT_ID}.iam.gserviceaccount.com"
41+
service_account="service-${PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com"
3942
# topics and subscriptions involved in writing data to BigQuery
4043
ps_bigquery_subscription=$(define_GCP_resources "${survey}-${MODULE_NAME}-bigquery-import")
4144
ps_deadletter_topic=$(define_GCP_resources "${survey}-deadletter")
@@ -51,7 +54,8 @@ if [ "${teardown}" = "True" ]; then
5154
gcloud run services delete "${cr_module_name}" --region "${region}"
5255
fi
5356
else
54-
echo "Configuring Pub/Sub resources..."
57+
echo
58+
echo "Configuring Pub/Sub resources for ${MODULE_NAME} module..."
5559
gcloud pubsub topics create "${ps_output_topic}"
5660
gcloud pubsub subscriptions create "${ps_bigquery_subscription}" \
5761
--topic="${ps_output_topic}" \
@@ -60,12 +64,14 @@ else
6064
--drop-unknown-fields \
6165
--dead-letter-topic="${ps_deadletter_topic}" \
6266
--max-delivery-attempts=5 \
63-
--dead-letter-topic-project="${PROJECT_ID}"
67+
--dead-letter-topic-project="${PROJECT_ID}" \
68+
--message-transforms-file="${BASE_DIR%%/cloud_run/*}/setup_broker/lsst/templates/ps_lsst_flatten_schema_smt.yaml"
6469
# set IAM policies on public Pub/Sub resources
6570
if [ "$testid" = "False" ]; then
6671
user="allUsers"
67-
roleid="projects/${GOOGLE_CLOUD_PROJECT}/roles/userPublic"
72+
roleid="roles/pubsub.subscriber"
6873
gcloud pubsub topics add-iam-policy-binding "${ps_output_topic}" --member="${user}" --role="${roleid}"
74+
gcloud pubsub subscriptions add-iam-policy-binding "${ps_bigquery_subscription}" --member="serviceAccount:${service_account}" --role="${roleid}"
6975
fi
7076

7177
#--- Deploy Cloud Run service

broker/consumer/lsst/vm_startup.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ fi
4444
) || exit
4545

4646
#--- Set the topic names to the "FORCE" metadata attributes if exist, else defaults
47-
KAFKA_TOPIC_DEFAULT="alerts-simulated"
47+
KAFKA_TOPIC_DEFAULT="lsst-alerts-v9.0"
4848
KAFKA_TOPIC="${KAFKA_TOPIC_FORCE:-${KAFKA_TOPIC_DEFAULT}}"
4949
PS_TOPIC="${PS_TOPIC_FORCE:-${PS_TOPIC_DEFAULT}}"
5050
# set VM metadata, just for clarity and easy viewing

broker/setup_broker/lsst/setup_broker.sh

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ define_GCP_resources() {
5151
artifact_registry_repo=$(define_GCP_resources "${survey}-cloud-run-services")
5252
bq_dataset=$(define_GCP_resources "${survey}" "_")
5353
bq_table_alerts="alerts_${versiontag}"
54-
bq_table_supernnova="SuperNNova"
54+
bq_table_supernnova="supernnova"
5555
bq_table_upsilon="upsilon"
5656
bq_table_variability="variability"
5757
gcs_broker_bucket=$(define_GCP_resources "${PROJECT_ID}-${survey}-broker_files")
@@ -86,10 +86,10 @@ manage_resources() {
8686
else
8787
echo "${bq_dataset} already exists."
8888
fi
89-
(cd templates && bq mk --table --time_partitioning_type=DAY "${PROJECT_ID}:${bq_dataset}.${bq_table_alerts}" "bq_${survey}_${bq_table_alerts}_schema.json") || exit 5
90-
(cd templates && bq mk --table "${PROJECT_ID}:${bq_dataset}.${bq_table_supernnova}" "bq_${survey}_${bq_table_supernnova}_schema.json") || exit 5
91-
(cd templates && bq mk --table "${PROJECT_ID}:${bq_dataset}.${bq_table_variability}" "bq_${survey}_${bq_table_variability}_schema.json") || exit 5
92-
(cd templates && bq mk --table "${PROJECT_ID}:${bq_dataset}.${bq_table_upsilon}" "bq_${survey}_${bq_table_upsilon}_schema.json") || exit 5
89+
(cd templates && bq mk --table --clustering_fields=healpix9,healpix19,healpix29 --time_partitioning_field=kafkaPublishTimestamp --time_partitioning_type=DAY "${PROJECT_ID}:${bq_dataset}.${bq_table_alerts}" "bq_${survey}_${bq_table_alerts}_schema.json") || exit 5
90+
(cd templates && bq mk --table --time_partitioning_field=kafkaPublishTimestamp --time_partitioning_type=DAY "${PROJECT_ID}:${bq_dataset}.${bq_table_supernnova}" "bq_${survey}_${bq_table_supernnova}_schema.json") || exit 5
91+
(cd templates && bq mk --table --time_partitioning_field=kafkaPublishTimestamp --time_partitioning_type=DAY "${PROJECT_ID}:${bq_dataset}.${bq_table_variability}" "bq_${survey}_${bq_table_variability}_schema.json") || exit 5
92+
(cd templates && bq mk --table --time_partitioning_field=kafkaPublishTimestamp --time_partitioning_type=DAY "${PROJECT_ID}:${bq_dataset}.${bq_table_upsilon}" "bq_${survey}_${bq_table_upsilon}_schema.json") || exit 5
9393
bq update --description "Alert data from LSST. This table is an archive of the lsst-alerts Pub/Sub stream. It has the same schema as the original alert bytes, including nested and repeated fields." "${PROJECT_ID}:${bq_dataset}.${bq_table_alerts}"
9494
bq update --description "Binary classification results from SuperNNova." "${PROJECT_ID}:${bq_dataset}.${bq_table_supernnova}"
9595

@@ -143,14 +143,21 @@ manage_resources() {
143143
--dead-letter-topic="${ps_deadletter_topic}" \
144144
--max-delivery-attempts=5 \
145145
--dead-letter-topic-project="${PROJECT_ID}" \
146-
--message-filter='attributes.schema_version = "'"${versiontag}"'"'
146+
--message-filter='attributes.schema_version = "'"${versiontag}"'"' \
147+
--message-transforms-file=templates/ps_lsst_add_top_level_fields_smt.yaml
147148
# set IAM policies on public Pub/Sub resources
148149
if [ "$testid" = "False" ]; then
149150
user="allUsers"
150-
roleid="projects/${GOOGLE_CLOUD_PROJECT}/roles/userPublic"
151+
roleid="roles/pubsub.subscriber"
151152
gcloud pubsub topics add-iam-policy-binding "${ps_topic_alerts}" --member="${user}" --role="${roleid}"
152153
gcloud pubsub topics add-iam-policy-binding "${ps_topic_alerts_json}" --member="${user}" --role="${roleid}"
153154
gcloud pubsub topics add-iam-policy-binding "${ps_topic_alerts_lite}" --member="${user}" --role="${roleid}"
155+
gcloud pubsub topics add-iam-policy-binding "${ps_deadletter_topic}" \
156+
--member="serviceAccount:${service_account}" \
157+
--role="roles/pubsub.publisher"
158+
gcloud pubsub subscriptions add-iam-policy-binding "${ps_bigquery_subscription}" \
159+
--member="serviceAccount:${service_account}" \
160+
--role="roles/pubsub.subscriber"
154161
fi
155162

156163
#--- Create Artifact Registry Repository
Lines changed: 36 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -1,64 +1,44 @@
11
[
22
{
3-
"description": "Unique identifiers from the semantically compressed alert packet.",
4-
"fields":[
5-
{
6-
"description": "diaSource",
7-
"fields": [
8-
{
9-
"description": "Unique identifier of this diaSource.",
10-
"mode": "REQUIRED",
11-
"name": "diaSourceId",
12-
"type": "INTEGER"
13-
}
14-
],
15-
"mode": "REQUIRED",
16-
"name": "diaSource",
17-
"type": "RECORD"
18-
},
19-
{
20-
"description": "diaObject",
21-
"fields": [
22-
{
23-
"description": "Unique identifier of this diaObject.",
24-
"mode": "REQUIRED",
25-
"name": "diaObjectId",
26-
"type": "INTEGER"
27-
}
28-
],
29-
"mode": "REQUIRED",
30-
"name": "diaObject",
31-
"type": "RECORD"
32-
}
33-
],
3+
"description": "Identifier of the triggering DiaSource.",
344
"mode": "REQUIRED",
35-
"name": "alert_lite",
36-
"type": "RECORD"
5+
"name": "diaSourceId",
6+
"type": "INTEGER"
377
},
388
{
39-
"description": "Classification results using SuperNNova (Möller & de Boissière 2019).",
40-
"fields": [
41-
{
42-
"description": "probability object is a type Ia supernova",
43-
"mode": "NULLABLE",
44-
"name": "prob_class0",
45-
"type": "FLOAT"
46-
},
47-
{
48-
"description": "probability object is non-Ia",
49-
"mode": "NULLABLE",
50-
"name": "prob_class1",
51-
"type": "FLOAT"
52-
},
53-
{
54-
"description": "predicted class; 0 = Ia, 1 = non-Ia",
55-
"mode": "NULLABLE",
56-
"name": "predicted_class",
57-
"type": "INTEGER"
58-
}
59-
],
9+
"description": "Unique identifier of this DiaObject.",
10+
"mode": "NULLABLE",
11+
"name": "diaObjectId",
12+
"type": "INTEGER"
13+
},
14+
{
15+
"description": "Unique identifier of the object.",
16+
"mode": "NULLABLE",
17+
"name": "ssObjectId",
18+
"type": "INTEGER"
19+
},
20+
{
21+
"description": "probability object is a type Ia supernova",
22+
"mode": "NULLABLE",
23+
"name": "prob_class0",
24+
"type": "FLOAT"
25+
},
26+
{
27+
"description": "probability object is non-Ia",
28+
"mode": "NULLABLE",
29+
"name": "prob_class1",
30+
"type": "FLOAT"
31+
},
32+
{
33+
"description": "predicted class; 0 = Ia, 1 = non-Ia",
34+
"mode": "NULLABLE",
35+
"name": "predicted_class",
36+
"type": "INTEGER"
37+
},
38+
{
39+
"description": "Kafka timestamp from originating LSST alert.",
6040
"mode": "REQUIRED",
61-
"name": "SuperNNova",
62-
"type": "RECORD"
41+
"name": "kafkaPublishTimestamp",
42+
"type": "TIMESTAMP"
6343
}
6444
]

broker/setup_broker/lsst/templates/bq_lsst_alerts_v8_0_schema.json

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2014,5 +2014,23 @@
20142014
"type": "FLOAT"
20152015
}
20162016
]
2017+
},
2018+
{
2019+
"description": "HEALPix order 9 pixel index at the source’s right ascension (RA) and declination",
2020+
"mode": "REQUIRED",
2021+
"name": "healpix9",
2022+
"type": "INTEGER"
2023+
},
2024+
{
2025+
"description": "HEALPix order 19 pixel index at the source’s right ascension (RA) and declination",
2026+
"mode": "REQUIRED",
2027+
"name": "healpix19",
2028+
"type": "INTEGER"
2029+
},
2030+
{
2031+
"description": "HEALPix order 29 pixel index at the source’s right ascension (RA) and declination",
2032+
"mode": "REQUIRED",
2033+
"name": "healpix29",
2034+
"type": "INTEGER"
20172035
}
20182036
]

broker/setup_broker/lsst/templates/bq_lsst_alerts_v9_0_schema.json

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2050,5 +2050,11 @@
20502050
"mode": "REQUIRED",
20512051
"name": "healpix29",
20522052
"type": "INTEGER"
2053+
},
2054+
{
2055+
"description": "Kafka timestamp from originating LSST alert.",
2056+
"mode": "REQUIRED",
2057+
"name": "kafkaPublishTimestamp",
2058+
"type": "TIMESTAMP"
20532059
}
20542060
]

0 commit comments

Comments
 (0)