-
Notifications
You must be signed in to change notification settings - Fork 83
feat(dataframe): add lookup_join for custom lookup-based enrichment #895
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
ovv
commented
May 26, 2025
- Introduce StreamingDataFrame.lookup_join() to support flexible, in-place enrichment of records using user-defined lookup strategies.
- Add abstract BaseLookup and BaseField classes for implementing custom lookup join logic.
- Provide QuixConfigurationService lookup implementation for configuration-based enrichment.
- Introduce StreamingDataFrame.lookup_join() to support flexible, in-place enrichment of records using user-defined lookup strategies. - Add abstract BaseLookup and BaseField classes for implementing custom lookup join logic. - Provide QuixConfigurationService lookup implementation for configuration-based enrichment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
Introduce a new lookup-based enrichment feature for StreamingDataFrame, enabling flexible, in-place joins via custom lookup strategies and providing a QuixConfigurationService implementation.
- Add abstract BaseLookup/BaseField classes and StreamingDataFrame.lookup_join API.
- Implement QuixConfigurationService lookup with configuration models, version handling, HTTP fetching, and LRU caching.
- Include end-to-end tests for lookup_join behavior and update dependencies for jsonpath_ng.
Reviewed Changes
Copilot reviewed 13 out of 13 changed files in this pull request and generated 1 comment.
Show a summary per file
File | Description |
---|---|
tests/test_quixstreams/test_dataframe/test_lookup.py | Add unit tests for lookup_join scenarios |
quixstreams/dataframe/dataframe.py | Introduce lookup_join method and necessary imports |
quixstreams/dataframe/joins/lookups/base.py | Define BaseLookup and BaseField abstractions |
quixstreams/dataframe/joins/lookups/quix_configuration_service/models.py | Add dataclasses for configuration events and versions |
quixstreams/dataframe/joins/lookups/quix_configuration_service/lookup.py | Implement QuixConfigurationService lookup logic |
quixstreams/dataframe/joins/lookups/quix_configuration_service/cache.py | Implement LRU cache for configuration version data |
quixstreams/dataframe/joins/lookups/quix_configuration_service/environment.py | Define retry backoff and replica environment settings |
quixstreams/dataframe/joins/lookups/quix_configuration_service/init.py | Expose QuixConfigurationService and Field aliases |
quixstreams/dataframe/joins/lookups/init.py | Expose BaseLookup, BaseField, and service exports |
pyproject.toml | Update dependency modules to include jsonpath_ng |
conda/meta.yaml | Add jsonpath_ng as a runtime requirement |
Comments suppressed due to low confidence (2)
quixstreams/dataframe/joins/lookups/quix_configuration_service/lookup.py:365
- The variable name
type
shadows the built-intype
; consider renaming it toconfig_type
for clarity.
for type, fields in fields_by_type.items():
quixstreams/dataframe/joins/lookups/quix_configuration_service/lookup.py:243
- The parameter name
type
shadows the built-intype
; consider renaming it toconfig_type
or similar.
def _config_id(self, type: str, key: str) -> str:
Co-authored-by: Copilot <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR adds a new lookup_join
method to StreamingDataFrame
for custom, in-place enrichment of records via user-defined lookup strategies, defines abstract lookup/join interfaces, and delivers a concrete QuixConfigurationService
implementation (with models, caching, and env configuration).
- Introduce
StreamingDataFrame.lookup_join()
for custom lookup-based enrichment - Add
BaseLookup
andBaseField
abstractions for lookup joins - Implement
QuixConfigurationService
lookup, itsField
/Configuration
models, LRU cache, and environment settings - Cover lookup join behavior with new unit tests and bump
jsonpath_ng
dependency
Reviewed Changes
Copilot reviewed 13 out of 13 changed files in this pull request and generated 4 comments.
Show a summary per file
File | Description |
---|---|
quixstreams/dataframe/dataframe.py | Add lookup_join method and imports for lookup interfaces |
quixstreams/dataframe/joins/lookups/base.py | Define BaseLookup and BaseField abstractions |
quixstreams/dataframe/joins/lookups/quix_configuration_service/lookup.py | Implement Lookup join logic, consumer thread, caching |
quixstreams/dataframe/joins/lookups/quix_configuration_service/models.py | Add Field , ConfigurationVersion , and Configuration |
quixstreams/dataframe/joins/lookups/quix_configuration_service/cache.py | Implement VersionDataLRU LRU cache |
quixstreams/dataframe/joins/lookups/quix_configuration_service/environment.py | Define retry delays and replica name from env |
tests/test_quixstreams/test_dataframe/test_lookup.py | Add tests for lookup_join with various on parameters |
pyproject.toml & conda/meta.yaml | Add jsonpath_ng to project dependencies |
quixstreams/dataframe/joins/lookups/quix_configuration_service/models.py
Show resolved
Hide resolved
quixstreams/dataframe/joins/lookups/quix_configuration_service/lookup.py
Outdated
Show resolved
Hide resolved
quixstreams/dataframe/joins/lookups/quix_configuration_service/lookup.py
Outdated
Show resolved
Hide resolved
quixstreams/dataframe/joins/lookups/quix_configuration_service/lookup.py
Outdated
Show resolved
Hide resolved
Co-authored-by: Daniil Gusev <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR introduces a new lookup join functionality for enriching streaming dataframes via custom lookup strategies. The changes include:
- Implementation of StreamingDataFrame.lookup_join in dataframe.py.
- Creation of BaseLookup and BaseField abstractions along with a QuixConfigurationService lookup implementation.
- New tests for lookup join functionality and supporting code for configuration-based enrichment.
Reviewed Changes
Copilot reviewed 14 out of 14 changed files in this pull request and generated no comments.
Show a summary per file
File | Description |
---|---|
tests/test_quixstreams/test_dataframe/test_lookup.py | Adds tests for the lookup join behavior using dummy lookup and field classes. |
quixstreams/dataframe/joins/lookups/quix_configuration_service/models.py | Provides configuration models and parsing logic for configuration events. |
quixstreams/dataframe/joins/lookups/quix_configuration_service/lookup.py | Implements the lookup join logic, including configuration event consumption and cache management. |
quixstreams/dataframe/joins/lookups/quix_configuration_service/environment.py | Adds environment configuration variables for lookup enrichment. |
quixstreams/dataframe/joins/lookups/quix_configuration_service/cache.py | Implements an LRU cache for configuration version data. |
quixstreams/dataframe/dataframe.py | Introduces the new lookup_join method on StreamingDataFrame. |
Other files | Updates supporting files, dependency declarations, and license information as needed. |
Comments suppressed due to low confidence (1)
quixstreams/dataframe/joins/lookups/quix_configuration_service/lookup.py:365
- Avoid using 'type' as a variable name since it shadows the built-in type() function. Consider renaming it to 'field_type' to improve clarity.
for type, fields in fields_by_type.items():
Co-authored-by: Copilot <[email protected]> Co-authored-by: Daniil Gusev <[email protected]>
Related to #484 |
…uixio#895) Co-authored-by: Copilot <[email protected]> Co-authored-by: Daniil Gusev <[email protected]>