Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions broker/cloud_run/lsst/classify_snn/deploy.sh
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ else
--dead-letter-topic="${ps_deadletter_topic}" \
--max-delivery-attempts=5 \
--dead-letter-topic-project="${PROJECT_ID}" \
--message-transforms-file="${BASE_DIR%%/cloud_run/*}/setup_broker/lsst/templates/ps_lsst_flatten_schema_smt.yaml"
--message-transforms-file="${BASE_DIR%%/cloud_run/*}/setup_broker/lsst/templates/smt_flatten_schema.yaml"
# set IAM policies on public Pub/Sub resources
if [ "$testid" = "False" ]; then
user="allUsers"
Expand All @@ -92,7 +92,9 @@ else
--push-endpoint="${url}${ROUTE_RUN}" \
--push-auth-service-account="${runinvoker_svcact}" \
--dead-letter-topic="${ps_deadletter_topic}" \
--max-delivery-attempts=5
--max-delivery-attempts=5 \
--min-retry-delay=10 \
--max-retry-delay=600
Comment on lines +95 to +97
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At some point, double check that the combination of these three flags does what you expect.

gcloud pubsub subscriptions add-iam-policy-binding "${ps_input_subscrip}" \
--member="serviceAccount:${service_account}" \
--role="roles/pubsub.subscriber"
Expand Down
2 changes: 1 addition & 1 deletion broker/cloud_run/lsst/classify_snn/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ pandas==1.5.1
numpy==1.23.3
google-cloud-functions
google-cloud-logging
pittgoogle-client>=0.3.18
pittgoogle-client>=0.3.19

# for Cloud Run
# https://cloud.google.com/run/docs/quickstarts/build-and-deploy/deploy-python-service
Expand Down
6 changes: 4 additions & 2 deletions broker/cloud_run/lsst/classify_upsilon/deploy.sh
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ else
--dead-letter-topic="${ps_deadletter_topic}" \
--max-delivery-attempts=5 \
--dead-letter-topic-project="${PROJECT_ID}" \
--message-transforms-file="${BASE_DIR%%/cloud_run/*}/setup_broker/lsst/templates/ps_lsst_flatten_schema_smt.yaml"
--message-transforms-file="${BASE_DIR%%/cloud_run/*}/setup_broker/lsst/templates/smt_flatten_schema.yaml"
# set IAM policies on public Pub/Sub resources
if [ "$testid" = "False" ]; then
user="allUsers"
Expand Down Expand Up @@ -94,7 +94,9 @@ else
--push-endpoint="${url}${ROUTE_RUN}" \
--push-auth-service-account="${runinvoker_svcact}" \
--dead-letter-topic="${ps_deadletter_topic}" \
--max-delivery-attempts=5
--max-delivery-attempts=5 \
--min-retry-delay=10 \
--max-retry-delay=600
gcloud pubsub subscriptions add-iam-policy-binding "${ps_input_subscrip}" \
--member="serviceAccount:${service_account}" \
--role="roles/pubsub.subscriber"
Expand Down
2 changes: 1 addition & 1 deletion broker/cloud_run/lsst/classify_upsilon/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
# file (or packaged with the function) in the same directory as `main.py`

google-cloud-logging
pittgoogle-client>=0.3.18
pittgoogle-client>=0.3.19
upsilon
# required by upsilon
# https://github.com/dwkim78/upsilon?tab=readme-ov-file#1-dependency
Expand Down
86 changes: 79 additions & 7 deletions broker/cloud_run/lsst/ps_to_storage/main.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
#!/usr/bin/env python3
# -*- coding: UTF-8 -*-

"""This module stores LSST alert data as an Avro file in Cloud Storage."""
"""This module stores LSST alert data as an Avro file in Cloud Storage and publishes it to various Pub/Sub topics."""

import os
from typing import Any
import flask
import pittgoogle
from google.cloud import logging, storage
Expand All @@ -28,6 +29,51 @@
# Variables for outgoing data
HTTP_204 = 204 # HTTP code: Success
HTTP_400 = 400 # HTTP code: Bad Request
LITE_FIELDS_CONFIG = {
"diaSource": {
"fields": {
"diaSourceId",
Comment on lines +32 to +35
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fine for now, but it would be great to define this in a yaml file in pittgoogle-client instead. _create_lite_alert() could move there too.

"midpointMjdTai",
"ra",
"raErr",
"dec",
"decErr",
"psfFlux",
"psfFluxErr",
"band",
},
"is_list": False,
},
"prvDiaSources": {
"fields": {
"diaSourceId",
"midpointMjdTai",
"ra",
"raErr",
"dec",
"decErr",
"psfFlux",
"psfFluxErr",
"band",
},
"is_list": True,
},
"diaObject": {
"fields": {
"diaObjectId",
"lastDiaSourceMjdTai",
"firstDiaSourceMjdTai",
"nDiaSources",
"u_psfFluxErrMean",
"g_psfFluxErrMean",
"r_psfFluxErrMean",
"i_psfFluxErrMean",
"z_psfFluxErrMean",
"y_psfFluxErrMean",
},
"is_list": False,
},
}

# GCP resources used in this module
TOPIC_ALERTS = pittgoogle.Topic.from_cloud(
Expand Down Expand Up @@ -88,12 +134,8 @@ def run():
TOPIC_ALERTS.publish(alert)
# publish the same alert as JSON. Data will be coerced to valid JSON by pittgoogle.
TOPIC_ALERTS_JSON.publish(alert, serializer="json")
# add top-level key for lite stream
alert_lite = pittgoogle.Alert.from_dict(
payload={"alert_lite": alert.dict},
attributes={**alert.attributes},
)
TOPIC_LITE.publish(alert_lite, serializer="json")
# publish a lite version of the alert as JSON
TOPIC_LITE.publish(_create_lite_alert(alert), serializer="json")

return "", HTTP_204

Expand All @@ -109,3 +151,33 @@ def _create_file_metadata(alert: pittgoogle.Alert, event_id: str) -> dict:
metadata["kafka.timestamp"] = alert.attributes["kafka.timestamp"]

return metadata


def _create_lite_alert(alert: pittgoogle.Alert) -> pittgoogle.Alert:
"""Creates a lite Alert object by filtering nested fields from the original Alert dictionary."""

alert_lite_dict = alert.drop_cutouts()
for key, config in LITE_FIELDS_CONFIG.items():
if key in alert_lite_dict:
# replace the original nested object with its filtered version
alert_lite_dict[key] = _process_field(alert_lite_dict.get(key), config)

return pittgoogle.Alert.from_dict(
payload={"alert_lite": alert_lite_dict},
attributes={**alert.attributes},
)


def _process_field(original_value: Any, config: dict) -> Any:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this really accept and return Any type? Looks to me like it only handles dict | (list[dict] | None).

"""Filters a dictionary or a list of dictionaries based on the provided configuration."""
whitelisted_fields = config["fields"]

if config["is_list"]:
return [_filter_dict(item, whitelisted_fields) for item in original_value or []]
return _filter_dict(original_value, whitelisted_fields)


def _filter_dict(alert_dict: dict, whitelisted_fields: set) -> dict:
"""Creates a new dictionary containing only the keys specified in whitelisted_fields."""

return {k: v for k, v in (alert_dict or {}).items() if k in whitelisted_fields}
2 changes: 1 addition & 1 deletion broker/cloud_run/lsst/ps_to_storage/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

google-cloud-logging
google-cloud-storage
pittgoogle-client>=0.3.18
pittgoogle-client>=0.3.19

# for Cloud Run
# https://cloud.google.com/run/docs/quickstarts/build-and-deploy/deploy-python-service
Expand Down
6 changes: 4 additions & 2 deletions broker/cloud_run/lsst/variability/deploy.sh
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ else
--dead-letter-topic="${ps_deadletter_topic}" \
--max-delivery-attempts=5 \
--dead-letter-topic-project="${PROJECT_ID}" \
--message-transforms-file="${BASE_DIR%%/cloud_run/*}/setup_broker/lsst/templates/ps_lsst_flatten_schema_smt.yaml"
--message-transforms-file="${BASE_DIR%%/cloud_run/*}/setup_broker/lsst/templates/smt_flatten_schema.yaml"
# set IAM policies on public Pub/Sub resources
if [ "$testid" = "False" ]; then
user="allUsers"
Expand All @@ -91,7 +91,9 @@ else
--push-endpoint="${url}${ROUTE_RUN}" \
--push-auth-service-account="${runinvoker_svcact}" \
--dead-letter-topic="${ps_deadletter_topic}" \
--max-delivery-attempts=5
--max-delivery-attempts=5 \
--min-retry-delay=10 \
--max-retry-delay=600
gcloud pubsub subscriptions add-iam-policy-binding "${ps_input_subscrip}" \
--member="serviceAccount:${service_account}" \
--role="roles/pubsub.subscriber"
Expand Down
2 changes: 1 addition & 1 deletion broker/cloud_run/lsst/variability/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
# file (or packaged with the function) in the same directory as `main.py`

google-cloud-logging
pittgoogle-client>=0.3.18
pittgoogle-client>=0.3.19

# for Cloud Run
# https://cloud.google.com/run/docs/quickstarts/build-and-deploy/deploy-python-service
Expand Down
5 changes: 2 additions & 3 deletions broker/setup_broker/lsst/setup_broker.sh
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,7 @@ manage_resources() {
gcloud pubsub topics create "${ps_topic_alerts_raw}"
gcloud pubsub topics create "${ps_topic_alerts}"
gcloud pubsub topics create "${ps_topic_alerts_json}"
gcloud pubsub topics create "${ps_topic_alerts_lite}" \
--message-transforms-file=templates/ps_lsst_lite_smt.yaml
gcloud pubsub topics create "${ps_topic_alerts_lite}"
gcloud pubsub topics create "${ps_deadletter_topic}"
gcloud pubsub subscriptions create "${ps_deadletter_subscription}" \
--topic="${ps_deadletter_topic}"
Expand All @@ -144,7 +143,7 @@ manage_resources() {
--max-delivery-attempts=5 \
--dead-letter-topic-project="${PROJECT_ID}" \
--message-filter='attributes.schema_version = "'"${versiontag}"'"' \
--message-transforms-file=templates/ps_lsst_add_top_level_fields_smt.yaml
--message-transforms-file=templates/smt_add_top_level_fields.yaml
# set IAM policies on public Pub/Sub resources
if [ "$testid" = "False" ]; then
user="allUsers"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[
{
"description": "Identifier of the triggering DiaSource.",
"description": "Unique identifier of this DiaSource.",
"mode": "REQUIRED",
"name": "diaSourceId",
"type": "INTEGER"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[
{
"description": "Identifier of the triggering DiaSource.",
"description": "Unique identifier of this DiaSource.",
"mode": "REQUIRED",
"name": "diaSourceId",
"type": "INTEGER"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[
{
"description": "Identifier of the triggering DiaSource.",
"description": "Unique identifier of this DiaSource.",
"mode": "REQUIRED",
"name": "diaSourceId",
"type": "INTEGER"
Expand Down

This file was deleted.

49 changes: 0 additions & 49 deletions broker/setup_broker/lsst/templates/ps_lsst_flatten_schema_smt.yaml

This file was deleted.

54 changes: 0 additions & 54 deletions broker/setup_broker/lsst/templates/ps_lsst_lite_smt.yaml

This file was deleted.

Loading