Skip to content

Conversation

@hernandezc1
Copy link
Collaborator

@hernandezc1 hernandezc1 commented Jul 21, 2025

Summary

Added

  • lvk-alerts-to-storage module
    • This module uploads LVK alert data to a GCS bucket. It also publishes a de-duplicated alert stream to the lvk-alerts Pub/Sub topic
  • setup_broker/lvk/templates
    • bq_lvk_policy.json

Changed

  • setup_broker/lvk/setup_broker.sh
  • consumer/lvk
    • updates the default Pub/Sub topic to: lvk-alerts_raw

@hernandezc1 hernandezc1 self-assigned this Jul 21, 2025
@hernandezc1 hernandezc1 added Enhancement New feature or request Cloud Infrastructure Cloud infrastructure management labels Jul 21, 2025
Copy link
Collaborator

@troyraen troyraen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I marked "Request changes" because I went to do mwvgroup/pittgoogle-client#103 and realized there's a problem with the filename we decided on, so we need to discuss. I'm suggesting other changes here as well but I don't need to review those again.

_alert_type = alert.dict["alert_type"]
_id = alert.dict["superevent_id"]

return f"{VERSIONTAG}/{_date}/{_alert_type}/{_id}.json"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing this out. What we decided on zoom was:

f"{VERSIONTAG}/{_id}/{_alert_type}-{_date}.json"

BUT, I went to patch our client Alert.name_in_bucket and realized that would impossible. These alert packets do not contain the version, so there is no way to construct the name_in_bucket using only the alert data. I don't know the best solution here. Let's think and discuss. Options off the top of my head are:

  1. Our pipeline knows the schema version, so this module could simply add it to the alert packet. We'd need to propagate that all the way through in pubsub messages, bucket objects, and bigquery schemas. LVK alerts from us would be different than from LVK itself or any other broker. That may occasionally be a pain but not a huge problem, I think. I haven't fully thought things through but don't see any other problems with this at the moment. So this is seeming like the best option.
  2. Maybe there's a way to get the consumer VM to add the schema version to the metadata (it does not open the alert packets, so it cannot add to the actual data). A downside is that it's relatively easy for data and metadata get disconnected. Also, I am very reluctant to mess with the consumer. It performs so well. I don't want to interfere.
  3. If our pipeline never drops fields from LVK alerts (so, no lite module, etc.) maybe it's not crucial for the client to be able to construct the name_in_bucket. I can still think of edge cases where we'd regret that and users would hate us for it.
  4. We could drop VERSIONTAG from the name in bucket. I really don't like the idea of making superevent_id the top level folder so maybe we use the LVK run for that, like "O4". Problem is that bigquery tables MUST be separated by schema version. There's no guarantee of a clean mapping between run and schema version. So this would make it really hard to regenerate bigquery tables from the bucket.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, this reminds me that it would be much better to remove this function altogether and just use Alert.name_in_bucket so that we don't have to keep the two in sync.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We decided yesterday on zoom to implement option 1. I'll wait for that before reviewing again.

Copy link
Collaborator Author

@hernandezc1 hernandezc1 Aug 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@troyraen I've implemented option 1! Ready for your review.

def run()
     ...
    # the schema version is not defined in the schema
    # add it manually using the environment variable defined in this script
    alert.attributes["schema_version"] = VERSIONTAG
    # publish the same alert as JSON
    TOPIC_ALERTS.publish(alert)

    return "", HTTP_204

def _create_file_metadata(alert: pittgoogle.Alert, event_id: str) -> dict:
    ...
    metadata["schema_version"] = VERSIONTAG

    return metadata

def _name_in_bucket(alert: pittgoogle.Alert) -> str:
    ...
    _id = alert.sourceid

    return f"{VERSIONTAG}/{_id}/{_alert_type}-{_date}.json"

The schema version is already propagated for BigQuery schemas

Comment on lines 42 to 50
# ensure that we do not teardown production resources
if [ "${testid}" != "False" ]; then
echo
echo "Deleting resources for ${MODULE_NAME} module..."
gsutil rm -r "gs://${gcs_json_bucket}"
gcloud pubsub subscriptions delete "${ps_input_subscrip}"
gcloud pubsub topics delete "${ps_topic_alert_in_bucket}"
gcloud run services delete "${cr_module_name}" --region "${region}"
fi
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add an else statement that prints a message explaining why nothing happens and explaining what to do instead. You can reuse one you already added to a different script.

Comment on lines 91 to 93
metadata["_".join("time_created")] = alert.dict["time_created"]
metadata["_".join("alert_type")] = alert.dict["alert_type"]
metadata["_".join("id")] = alert.dict["superevent_id"]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
metadata["_".join("time_created")] = alert.dict["time_created"]
metadata["_".join("alert_type")] = alert.dict["alert_type"]
metadata["_".join("id")] = alert.dict["superevent_id"]
metadata["time_created"] = alert.dict["time_created"]
metadata["alert_type"] = alert.dict["alert_type"]
metadata["superevent_id"] = alert.dict["superevent_id"]
  1. "_".join("time_created") == "t_i_m_e___c_r_e_a_t_e_d", obviously not what's intended.
  2. In general, use the survey's field name. In particular, "id" is way too vague for a name.

In terms of other metadata would could add, the only other option I see is their "urls" field and I am not inclined to add that here. All of their other fields depend on the type of alert so we can't rely on them being available for this. I suppose we could add a try/except to deal with that but I think it's not worth the effort right now.

Comment on lines +60 to +61
ps_deadletter_subscription=$(define_GCP_resources "${survey}-deadletter")
ps_deadletter_topic="${ps_deadletter_subscription}"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor; the topic is the primary resource, so I'd name that and then reuse it for the subscription rather than the other way around.

echo "${bq_dataset} already exists."
fi
(cd templates && bq mk --table "${PROJECT_ID}:${bq_dataset}.${bq_table_alerts}" "bq_${survey}_${bq_table_alerts}_schema.json") || exit 5
bq update --description "Alert data from LIGO/Virgo/KAGRA. This table is an archive of the lvk-alerts Pub/Sub stream. It has the same schema (excluding skymaps) as the original alert bytes, including nested and repeated fields." "${PROJECT_ID}:${bq_dataset}.${bq_table_alerts}"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
bq update --description "Alert data from LIGO/Virgo/KAGRA. This table is an archive of the lvk-alerts Pub/Sub stream. It has the same schema (excluding skymaps) as the original alert bytes, including nested and repeated fields." "${PROJECT_ID}:${bq_dataset}.${bq_table_alerts}"
bq update --description "LIGO/Virgo/KAGRA (LVK) alerts with schema version v${schema_version}. The data and schema are as produced by LVK except that skymaps are excluded. Skymaps can be retrieved from the Cloud Storage bucket ${gcs_alerts_bucket}." "${PROJECT_ID}:${bq_dataset}.${bq_table_alerts}"

Here's my suggestion. It could probably be worded better, but this is the info that I think users who find their way to this table will need/want to know. The variable gcs_alerts_bucket isn't currently defined. I feel strongly that this script should be the one to create that bucket. There is absolutely nothing special about that bucket to justify creating it in the ps_to_storage deploy script when every other resource is created here. But if you really want to leave it at least for now, you could just copy the name to define gcs_alerts_bucket here.

@hernandezc1 hernandezc1 requested a review from troyraen August 7, 2025 21:40
…rrently `None`) and add add it to the metadata for bucket objects
Copy link
Collaborator

@troyraen troyraen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The file broker/setup_broker/lvk/templates/bq_lvk_alerts_v1_0_schema.json needs to be updated to include the schema_version field.

Instead of adding VERSIONTAG in multiple different places in main.py, add a schema_version field directly to the alert data right after line 69. We want the schema version in the alert data itself, not just the metadata, so that it shows up everywhere -- the cloud storage file, bigquery, and in the payload of pubsub messages published by this module and every downstream module. You could add the field to alert.dict and then it should show up automatically in most other places, but it still wouldn't show up in the file because we send alert.msg.data to cloud storage -- the alert.msg object is frozen, so that property will not get updated when alert.dict is. There are a few ways we could handle this. Off the top of my head, I recommend trying the following. It's a little weird, but the advantage is that it's only two lines (plus an import) and after that everything else should work without any special handling.

import attrs

# We need to know the schema version for a few different things across the broker,
# but it's not included in LVK alerts. Add it now so we never have to worry about it again.
msg_data = f'{"schema_version": "{VERSIONTAG}", '.encode() + alert.msg.data[1:]
alert.msg = attrs.evolve(alert.msg, data=msg_data)

With msg_data, the idea is to prepend the field directly to the json string in alert.msg.data. I think the first character needs to be "{", so that's added in the f-string and the original is stripped off with [1:]. The .encode() is required because it's a bytestring. I don't have an alert loaded right now, so I'm not really sure whether that will work. If there's extra metadata (or whatever) at the beginning of the json string before the first field, let's just switch to appending the new field at the end rather than trying to split the string to stick this in front of the other fields but behind the metadata. Either way, there may be various issues with my syntax in that line. Try it and then call alert.dict to make sure the result is as expected. (If you call alert.dict before updating alert.msg, set alert._dict = None. Then the next time you call alert.dict it will reconstruct the dict from alert.msg.data.)

I don't think you'll have trouble with the attrs.evolve() line, but if you do you can just reconstruct the whole thing like this:

alert.msg = pittgoogle.types_.PubsubMessageLike(
    data=msg_data,
    attributes=alert.msg.attributes,
    message_id=alert.msg.message_id,
    publish_time=alert.msg.publish_time,
    ordering_key=alert.msg.ordering_key
)

Comment on lines 80 to 82
# the schema version is not defined in the schema
# add it manually using the environment variable defined in this script
alert.attributes["schema_version"] = VERSIONTAG
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# the schema version is not defined in the schema
# add it manually using the environment variable defined in this script
alert.attributes["schema_version"] = VERSIONTAG

Add this field directly to the alert right after line 69. Then this attributed will be added automatically by TOPIC_ALERTS.publish().

@hernandezc1
Copy link
Collaborator Author

@troyraen thanks again for your help with this PR! I've gone ahead and implemented/tested the requested changes.

Some slight changes have been made (see below):

import attrs
    ...

    # We need to know the schema version for a few different things across the broker,
    # but it's not included in LVK alerts. Add it now so we never have to worry about it again.
    msg_data = f'{{"schema_version": "{VERSIONTAG}", '.encode() + alert.msg.data[1:] # note: an extra { was needed
    alert.msg = attrs.evolve(alert.msg, data=msg_data)

# I've included the following lines of code to update alert.dict as downstream code in this script uses 
# alert.dict["schema_version"]

    # Force rebuild of alert.dict to include the schema_version field
    alert._dict = None
    _ = alert.dict

    blob = bucket.blob(alert.name_in_bucket)

    ...

    metadata["schema_version"] = alert.dict["schema_version"]

@hernandezc1 hernandezc1 requested a review from troyraen August 19, 2025 15:00
@troyraen troyraen removed their request for review October 2, 2025 21:20
@hernandezc1 hernandezc1 requested a review from troyraen November 8, 2025 04:06
Copy link
Collaborator

@troyraen troyraen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got here and found that I started a review over two months ago that I never finished. I also see that I’ve reviewed a couple times before and you've iterated. So if it's working now, let's 🚀!

Comment on lines +77 to +79
# Force rebuild of alert.dict to include the schema_version field
alert._dict = None
_ = alert.dict
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These lines won't hurt anything but also shouldn't be needed. alert.dict gets lazy-loaded from alert.msg.data the first time it's asked for. Since it hasn't been ask for yet, it's still None at line 77.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Cloud Infrastructure Cloud infrastructure management Enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants