Skip to content
Open
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion broker/cloud_run/lsst/classify_snn/deploy.sh
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,9 @@ else
--push-endpoint="${url}${ROUTE_RUN}" \
--push-auth-service-account="${runinvoker_svcact}" \
--dead-letter-topic="${ps_deadletter_topic}" \
--max-delivery-attempts=5
--max-delivery-attempts=5 \
--min-retry-delay=10 \
--max-retry-delay=600
Comment on lines +95 to +97
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At some point, double check that the combination of these three flags does what you expect.

gcloud pubsub subscriptions add-iam-policy-binding "${ps_input_subscrip}" \
--member="serviceAccount:${service_account}" \
--role="roles/pubsub.subscriber"
Expand Down
2 changes: 1 addition & 1 deletion broker/cloud_run/lsst/classify_snn/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ pandas==1.5.1
numpy==1.23.3
google-cloud-functions
google-cloud-logging
pittgoogle-client>=0.3.18
pittgoogle-client>=0.3.19

# for Cloud Run
# https://cloud.google.com/run/docs/quickstarts/build-and-deploy/deploy-python-service
Expand Down
4 changes: 3 additions & 1 deletion broker/cloud_run/lsst/classify_upsilon/deploy.sh
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,9 @@ else
--push-endpoint="${url}${ROUTE_RUN}" \
--push-auth-service-account="${runinvoker_svcact}" \
--dead-letter-topic="${ps_deadletter_topic}" \
--max-delivery-attempts=5
--max-delivery-attempts=5 \
--min-retry-delay=10 \
--max-retry-delay=600
gcloud pubsub subscriptions add-iam-policy-binding "${ps_input_subscrip}" \
--member="serviceAccount:${service_account}" \
--role="roles/pubsub.subscriber"
Expand Down
2 changes: 1 addition & 1 deletion broker/cloud_run/lsst/classify_upsilon/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
# file (or packaged with the function) in the same directory as `main.py`

google-cloud-logging
pittgoogle-client>=0.3.18
pittgoogle-client>=0.3.19
upsilon
# required by upsilon
# https://github.com/dwkim78/upsilon?tab=readme-ov-file#1-dependency
Expand Down
86 changes: 79 additions & 7 deletions broker/cloud_run/lsst/ps_to_storage/main.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
#!/usr/bin/env python3
# -*- coding: UTF-8 -*-

"""This module stores LSST alert data as an Avro file in Cloud Storage."""
"""This module stores LSST alert data as an Avro file in Cloud Storage and publishes it to various Pub/Sub topics."""

import os
from typing import Any
import flask
import pittgoogle
from google.cloud import logging, storage
Expand All @@ -28,6 +29,51 @@
# Variables for outgoing data
HTTP_204 = 204 # HTTP code: Success
HTTP_400 = 400 # HTTP code: Bad Request
LITE_FIELDS_CONFIG = {
"diaSource": {
"fields": {
"diaSourceId",
Comment on lines +32 to +35
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fine for now, but it would be great to define this in a yaml file in pittgoogle-client instead. _create_lite_alert() could move there too.

"midpointMjdTai",
"ra",
"raErr",
"dec",
"decErr",
"psfFlux",
"psfFluxErr",
"band",
},
"is_list": False,
},
"prvDiaSources": {
"fields": {
"diaSourceId",
"midpointMjdTai",
"ra",
"raErr",
"dec",
"decErr",
"psfFlux",
"psfFluxErr",
"band",
},
"is_list": True,
},
"diaObject": {
"fields": {
"diaObjectId",
"lastDiaSourceMjdTai",
"firstDiaSourceMjdTai",
"nDiaSources",
"u_psfFluxErrMean",
"g_psfFluxErrMean",
"r_psfFluxErrMean",
"i_psfFluxErrMean",
"z_psfFluxErrMean",
"y_psfFluxErrMean",
},
"is_list": False,
},
}

# GCP resources used in this module
TOPIC_ALERTS = pittgoogle.Topic.from_cloud(
Expand Down Expand Up @@ -88,12 +134,8 @@ def run():
TOPIC_ALERTS.publish(alert)
# publish the same alert as JSON. Data will be coerced to valid JSON by pittgoogle.
TOPIC_ALERTS_JSON.publish(alert, serializer="json")
# add top-level key for lite stream
alert_lite = pittgoogle.Alert.from_dict(
payload={"alert_lite": alert.dict},
attributes={**alert.attributes},
)
TOPIC_LITE.publish(alert_lite, serializer="json")
# publish a lite version of the alert as JSON
TOPIC_LITE.publish(_create_lite_alert(alert), serializer="json")

return "", HTTP_204

Expand All @@ -109,3 +151,33 @@ def _create_file_metadata(alert: pittgoogle.Alert, event_id: str) -> dict:
metadata["kafka.timestamp"] = alert.attributes["kafka.timestamp"]

return metadata


def _create_lite_alert(alert: pittgoogle.Alert) -> pittgoogle.Alert:
"""Creates a lite Alert object by filtering nested fields from the original Alert dictionary."""

alert_lite_dict = alert.drop_cutouts()
for key, config in LITE_FIELDS_CONFIG.items():
if key in alert_lite_dict:
# replace the original nested object with its filtered version
alert_lite_dict[key] = _process_field(alert_lite_dict.get(key), config)

return pittgoogle.Alert.from_dict(
payload={"alert_lite": alert_lite_dict},
attributes={**alert.attributes},
)


def _process_field(original_value: Any, config: dict) -> Any:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this really accept and return Any type? Looks to me like it only handles dict | (list[dict] | None).

"""Filters a dictionary or a list of dictionaries based on the provided configuration."""
whitelisted_fields = config["fields"]

if config["is_list"]:
return [_filter_dict(item, whitelisted_fields) for item in original_value or []]
return _filter_dict(original_value, whitelisted_fields)


def _filter_dict(alert_dict: dict, whitelisted_fields: set) -> dict:
"""Creates a new dictionary containing only the keys specified in whitelisted_fields."""

return {k: v for k, v in (alert_dict or {}).items() if k in whitelisted_fields}
2 changes: 1 addition & 1 deletion broker/cloud_run/lsst/ps_to_storage/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

google-cloud-logging
google-cloud-storage
pittgoogle-client>=0.3.18
pittgoogle-client>=0.3.19

# for Cloud Run
# https://cloud.google.com/run/docs/quickstarts/build-and-deploy/deploy-python-service
Expand Down
4 changes: 3 additions & 1 deletion broker/cloud_run/lsst/variability/deploy.sh
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,9 @@ else
--push-endpoint="${url}${ROUTE_RUN}" \
--push-auth-service-account="${runinvoker_svcact}" \
--dead-letter-topic="${ps_deadletter_topic}" \
--max-delivery-attempts=5
--max-delivery-attempts=5 \
--min-retry-delay=10 \
--max-retry-delay=600
gcloud pubsub subscriptions add-iam-policy-binding "${ps_input_subscrip}" \
--member="serviceAccount:${service_account}" \
--role="roles/pubsub.subscriber"
Expand Down
2 changes: 1 addition & 1 deletion broker/cloud_run/lsst/variability/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
# file (or packaged with the function) in the same directory as `main.py`

google-cloud-logging
pittgoogle-client>=0.3.18
pittgoogle-client>=0.3.19

# for Cloud Run
# https://cloud.google.com/run/docs/quickstarts/build-and-deploy/deploy-python-service
Expand Down
3 changes: 1 addition & 2 deletions broker/setup_broker/lsst/setup_broker.sh
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,7 @@ manage_resources() {
gcloud pubsub topics create "${ps_topic_alerts_raw}"
gcloud pubsub topics create "${ps_topic_alerts}"
gcloud pubsub topics create "${ps_topic_alerts_json}"
gcloud pubsub topics create "${ps_topic_alerts_lite}" \
--message-transforms-file=templates/ps_lsst_lite_smt.yaml
gcloud pubsub topics create "${ps_topic_alerts_lite}"
gcloud pubsub topics create "${ps_deadletter_topic}"
gcloud pubsub subscriptions create "${ps_deadletter_subscription}" \
--topic="${ps_deadletter_topic}"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[
{
"description": "Identifier of the triggering DiaSource.",
"description": "Unique identifier of this DiaSource.",
"mode": "REQUIRED",
"name": "diaSourceId",
"type": "INTEGER"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[
{
"description": "Identifier of the triggering DiaSource.",
"description": "Unique identifier of this DiaSource.",
"mode": "REQUIRED",
"name": "diaSourceId",
"type": "INTEGER"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[
{
"description": "Identifier of the triggering DiaSource.",
"description": "Unique identifier of this DiaSource.",
"mode": "REQUIRED",
"name": "diaSourceId",
"type": "INTEGER"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,39 @@
# https://cloud.google.com/pubsub/docs/smts/create-topic-smt#create
- javascriptUdf:
code: >
function addTopLevelFields(message, metadata) {
const data = JSON.parse(message.data);
const attrs = message.attributes || {};
function addTopLevelFields(message, metadata) {
const attrs = message.attributes || {};
const dataStr = message.data.toString();

const payload = {
...data, // spread the original JSON fields into the root
healpix9: attrs.healpix9 ? Number(attrs.healpix9) : null,
healpix19: attrs.healpix19 ? Number(attrs.healpix19) : null,
healpix29: attrs.healpix29 ? Number(attrs.healpix29) : null,
kafkaPublishTimestamp: attrs["kafka.timestamp"] ? Number(attrs["kafka.timestamp"]) * 1000 : null
};
// Create an empty object to hold the new fields we want to inject into the JSON payload
const newFields = {};

return {
data: JSON.stringify(payload),
attributes: attrs // preserve attributes
};
// Extract the following attributes and add them to newFields
// We avoid casting fields as JavaScript numbers to prevent precision loss
if (attrs.healpix9) newFields.healpix9 = attrs.healpix9.toString();
if (attrs.healpix19) newFields.healpix19 = attrs.healpix19.toString();
if (attrs.healpix29) newFields.healpix29 = attrs.healpix29.toString();
if (attrs["kafka.timestamp"]) {
newFields.kafkaPublishTimestamp = attrs["kafka.timestamp"] * 1000;
}

// Define the data as a set of key-value pairs to be added to the JSON payload
const newPairs = Object.entries(newFields)
.map(([k, v]) => `"${k}":${v}`);

if (newPairs.length === 0) {
// No new fields; return the original message
return message;
}

// Inject the new fields into the JSON payload
const newData = dataStr.endsWith("}")
? dataStr.slice(0, -1) + "," + newPairs.join(",") + "}"
: dataStr;

return {
data: newData,
attributes: attrs
};
}
functionName: addTopLevelFields
72 changes: 26 additions & 46 deletions broker/setup_broker/lsst/templates/ps_lsst_flatten_schema_smt.yaml
Original file line number Diff line number Diff line change
@@ -1,49 +1,29 @@
# https://cloud.google.com/pubsub/docs/smts/create-topic-smt#create
- javascriptUdf:
code: >
function reformatValueAddedAlert(message, metadata) {
const data = JSON.parse(message.data);
const alertLite = data["alert_lite"] || {};
const attrs = message.attributes || {};

// Find the value_added key
const valueAddedKey = Object.keys(data).find(k => k !== "alert_lite");
const valueAddedField = valueAddedKey ? (data[valueAddedKey] || {}) : {};

// Whitelist fields
const diaObjectFields = ["diaObjectId"];
const ssSourceFields = ["ssObjectId"];
const diaSourceId = ["diaSourceId"];

// Extract whitelisted fields
function extractFields(obj, fields) {
if (!obj) return obj;
const extracted = {};
for (const f of fields) {
if (obj.hasOwnProperty(f)) {
extracted[f] = obj[f];
}
}
return extracted;
}

const flattened = {};

// Extract diaSourceId, diaObjectId and ssObjectId
Object.assign(flattened, extractFields(alertLite["diaObject"], diaObjectFields));
Object.assign(flattened, extractFields(alertLite["ssSource"], ssSourceFields));
Object.assign(flattened, extractFields(alertLite, diaSourceId));

// Spread all fields from value_added into top-level
Object.assign(flattened, valueAddedField);

// Add top-level field
flattened.kafkaPublishTimestamp = attrs["kafka.timestamp"] ? Number(attrs["kafka.timestamp"]) * 1000 : null;

// Return transformed message and preserve attributes
return {
data: JSON.stringify(flattened),
attributes: message.attributes
};
}
functionName: reformatValueAddedAlert
function flattenValueAddedAlert(message, metadata) {
const attrs = message.attributes || {};
const data = JSON.parse(message.data);

// Find and extract the value_added dictionary
const valueAddedKey = Object.keys(data).find(k => k !== "alert_lite");
const valueAddedData = valueAddedKey ? (data[valueAddedKey] || {}) : {};

// Build the final payload, starting with value_added fields
const payload = {
...valueAddedData,

// Use the attributes to add the remaining key-value pairs
diaSourceId: attrs.diaSource_diaSourceId ? attrs.diaSource_diaSourceId.toString() : null,
diaObjectId: attrs.diaObject_diaObjectId ? attrs.diaObject_diaObjectId.toString() : null,
ssObjectId: attrs.ssSource_ssObjectId ? attrs.ssSource_ssObjectId.toString() : null,
kafkaPublishTimestamp: attrs["kafka.timestamp"] ? attrs["kafka.timestamp"] * 1000 : null
};

// Return the flattened message
return {
data: JSON.stringify(payload),
attributes: message.attributes
};
}
functionName: flattenValueAddedAlert
Loading