Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,14 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
protobuf 5.29.3 -> 6.33.0, pyarrow 19.0.1 -> 21.0.0 (python < 3.11) or 22.0 (python >=3.11),
pytest-cov 6.0.0 -> 7.0.0, showballstemmer 2.2.0 -> 3.0.1.

### Added

- `pittgoogle/pubsub.py`
- The `Subscription` class now supports the arguments: `attribute_filter` and `udf`
- The `_create()` function for the `Subscription` class now supports the creation of subscriptions that use
Pub/Sub's built-in filters (i.e., filter based on message attributes) and/or single message transforms through
user-defined functions

## \[v0.3.19\] - 2025-10-07

### Changed
Expand Down
55 changes: 50 additions & 5 deletions pittgoogle/pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import attrs.validators
import google.api_core.exceptions
import google.cloud.pubsub_v1
from google.pubsub_v1.types import JavaScriptUDF, MessageTransform
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do import google.pubsub_v1.types instead of from ....

Also, is this a new dependency that needs to be added to the toml file?


from . import exceptions
from .alert import Alert
Expand Down Expand Up @@ -322,20 +323,42 @@ class Subscription:
Schema name of the alerts in the subscription. Passed to :class:`pittgoogle.alert.Alert` for unpacking.
If not provided, some properties of the Alert may not be available. For a list of schema names, see
:meth:`pittgoogle.registry.Schemas.names`.
attribute_filter (str, optional):
Specify a filter to only receive the messages whose attributes match the filter. The filter is an immutable
property of a subscription. After you create a subscription, you cannot update the subscription to modify
the filter.
udf (str, optional):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe call this variable smt_udf to include the info about what the string is used for?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now I see that google calls this variable javascript_udf, so that would be a good name for it here as well. It's helpful to know that this is javascript. Could also add in the SMT info and call it smt_javascript_udf.

Specify a JavaScript User-Defined Function (UDF). UDFs attached to a subscription can enable a wide range
of use cases, including: message filtering (based on the message payload and/or attributes), simple data
transformations, data masking and redaction, and data format conversions.
Comment on lines +326 to +333
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't need to go into the details here, but do give the user a heads up that both of these strings must conform to very specific requirements and then provide links to the google documentation where they can learn what those requirements are.

In the udf docstring, incorporate "Pub/Sub Single Message Transforms (SMTs)" so it's clear what this is used for.

Check our other docstrings and examples to see what words we typically use to describe message attributes and payloads. New users probably won't immediately understand what they mean and I may have intentionally used different words elsewhere that I thought would be more recognizable. Would be good to be consistent throughout our docs. (Maybe we do use "attributes" and "payload", I just literally don't remember right now.)


Example:

Create a subscription to Pitt-Google's 'ztf-loop' topic and pull messages:
Create a subscription to Pitt-Google's 'lsst-loop' topic and pull messages:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make it clear that this is simulated data. (Ideally, the topic itself would be called something like lsstsims-loop, but also ok just to make a note right here instead of changing the topic name.)


.. code-block:: python

# Topic that the subscription should be connected to
topic = pittgoogle.Topic(name="ztf-loop", projectid=pittgoogle.ProjectIds().pittgoogle)
topic = pittgoogle.Topic(name="lsst-loop", projectid=pittgoogle.ProjectIds().pittgoogle)

# Specify filters (Optional)
keepDiaObjects = "attributes:diaObject_diaObjectId"
filterByNPrevDetections = '''
Comment on lines +345 to +346
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally, I would name these two variables attribute_filter and udf to match the kwarg names so it's clear from the start where each is destined to be applied. But whatever you prefer.

Add a comment for each explaining what kind of messages will pass the filter. (Does the first one actually do anything with just a field name?)

function filterByNPrevDetections(message, metadata) {
const attrs = message.attributes || {};
const nPrevDetections = attrs.n_prev_detections ? parseInt(attrs.n_prev_detections) : null;
return (nPrevDetections > 20) ? message : null;
}
'''

# Create the subscription
subscription = pittgoogle.Subscription(
name="my-ztf-loop-subscription", topic=topic, schema_name="ztf"
)
name="my-lsst-loop-subscription",
topic=topic,
schema_name="lsst",
attribute_filter=keepDiaObjects,
udf=filterByNPrevDetections
)
subscription.touch()

# Pull a small batch of alerts
Expand All @@ -356,6 +379,8 @@ class Subscription:
),
)
schema_name: str | None = attrs.field(default=None)
attribute_filter: Optional[str] = attrs.field(default=None)
udf: Optional[str] = attrs.field(default=None)
Comment on lines +382 to +383
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think these should just be kwargs in the touch() function. These are only used once when creating the subscription. And when a user creates a Subscription object for a gcp subscription that already exists, our code doesn't make the filter or udf available in the Subscription object even if they exist on the actual subscription. So I don't think they should be class attributes.

And if the user passes one in but the subscription already exists, should probably raise a warning saying the subscription exists so the kwarg wasn't used.


@property
def projectid(self) -> str:
Expand Down Expand Up @@ -409,8 +434,28 @@ def _create(self) -> google.cloud.pubsub_v1.types.Subscription:
if self.topic is None:
raise TypeError("The subscription needs to be created but no topic was provided.")

if self.udf:
import re
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this library is quite small, so it's fine to just import it at the top of this file.


# the function name must match what is defined in the UDF code
# we parse through the code using regex to find it
match = re.search(r"function\s+([a-zA-Z0-9_]+)\s*\(", self.udf.replace("\n", " "))
_function_name = match.group(1) if match else "filter"
if not match:
LOGGER.warning("Could not parse function name from UDF; using default 'filter'.")
Comment on lines +443 to +445
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't necessarily a filter though, right? It could be a transformation that doesn't filter.


udf = JavaScriptUDF(code=self.udf, function_name=_function_name)
transforms = [MessageTransform(javascript_udf=udf)]

try:
return self.client.create_subscription(name=self.path, topic=self.topic.path)
return self.client.create_subscription(
request={
"name": self.path,
"topic": self.topic.path,
"filter": self.attribute_filter or None,
"message_transforms": transforms or None,
}
)

# this error message is not very clear. let's help.
except google.api_core.exceptions.NotFound as excep:
Expand Down