Skip to content

partition_by with complex signals - includes agg and group_by #1215

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 35 commits into from
Jul 12, 2025

Conversation

dmpetrov
Copy link
Member

@dmpetrov dmpetrov commented Jul 8, 2025

Allow complex signals (e.g., File type) in agg's partition_by parameter for improved usability (resolves #1211).

ToDo:

  • in unit tests, replace to_records() logic with double underscore code to more native to_values()
  • simplify code a bit
  • reduce amount of unit tests
  • linter and tests

PS: it turn out, the problem was much more complicated since group_by changes schema while the changes become tricky when a nested column is used in partition_by - see to_partial() and related


Slack Thread

Summary by Sourcery

Add support for using complex signal types (DataModel subclasses) directly in the partition_by parameter of the agg method.

New Features:

  • Allow DataModel subclasses (e.g., File, Image) to be passed directly to partition_by in agg.
  • Introduce _process_complex_signal_partition to resolve complex signal types into their unique identifier columns.

Enhancements:

  • Extend PartitionByType to include type for complex signals in the dataset query definitions.
  • Update agg logic and docstrings to integrate complex signal handling alongside existing column-based partitioning.

Documentation:

  • Enhance agg method documentation with examples demonstrating file-based partitioning.

Tests:

  • Add functional tests for partitioning by complex signals, mixed type partitioning, and error scenarios for invalid or missing DataModel types.

Summary by Sourcery

Enable using complex DataModel signals directly in partition_by for both agg and group_by by expanding nested model fields into SQL columns and extending schema partials, enrich documentation with examples, and add comprehensive tests for various partitioning scenarios

New Features:

  • Support passing DataModel subclasses (e.g., File, ImageFile) directly to agg’s partition_by parameter
  • Extend complex signal partitioning support to group_by, expanding nested signals into their identifier columns

Enhancements:

  • Add _process_complex_signal_column to recursively resolve and expand complex signal paths into SQL columns
  • Enhance SignalSchema.to_partial and group_by to handle full and field-level inclusion of custom types and deduplicate nested signals
  • Adjust dataset query builder to select unique columns and correctly alias partition_by and group_by results

Documentation:

  • Update agg and group_by docstrings with examples demonstrating file-based and nested complex signal partitioning

Tests:

  • Add extensive unit tests for SignalSchema.to_partial covering entire, nested, mixed, and error scenarios
  • Add functional tests for agg and group_by partitioning by complex signals, nested fields, mixed types, and error handling

Copy link
Contributor

sourcery-ai bot commented Jul 8, 2025

Reviewer's Guide

This PR enables using complex DataModel-based signal types (e.g., File, ImageFile, nested Pydantic models) directly in the partition_by parameters of both agg and group_by. It introduces recursive expansion of nested custom signals into individual SQL columns, adapts SignalSchema.to_partial to handle full and partial custom types, adjusts the query builder for deduplicated columns, and enhances type conversion utilities. Comprehensive unit tests verify partitioning and grouping behavior, nested-field access, error handling, and mixed-type scenarios.

Class diagram for updated SignalSchema and DataChain complex signal partitioning

classDiagram
    class DataChain {
        +agg(..., partition_by)
        +group_by(..., partition_by)
        -_process_complex_signal_column(column_name: str) list[Column]
    }
    class SignalSchema {
        +to_partial(*columns: str) SignalSchema
        +group_by(partition_by: Sequence[str], new_column: Sequence[Column]) SignalSchema
        +db_signals(name: Optional[str])
    }
    class ModelStore {
        +to_pydantic(val) -> Optional[type[BaseModel]]
        +is_partial(parent_type) -> bool
    }
    DataChain --> SignalSchema : uses
    SignalSchema --> ModelStore : uses
    DataChain --> ModelStore : uses
    class Column {
        +name: str
        +type: Any
    }
    class File {
        <<Pydantic BaseModel>>
        +name: str
        +size: int
        +path: str
    }
    SignalSchema --> File : handles complex signals
    DataChain --> Column : returns partition columns
Loading

Class diagram for recursive expansion of complex signals in partition_by

classDiagram
    class DataChain {
        -_process_complex_signal_column(column_name: str) list[Column]
    }
    class Column {
        +name: str
        +type: Any
    }
    class File {
        <<Pydantic BaseModel>>
        +name: str
        +size: int
        +path: str
    }
    DataChain --> File : expands fields
    DataChain --> Column : returns list[Column]
    File o-- Column : fields become columns
Loading

File-Level Changes

Change Details Files
Recursive expansion of complex signals for partition_by
  • Introduced _process_complex_signal_column to resolve and expand nested custom signal paths into SQL Column objects
  • Integrated this method into agg and group_by, replacing manual string-to-column logic
  • Handled exact path resolution fallback and recursive subtree traversal for nested Pydantic models
src/datachain/lib/dc/datachain.py
Enhanced SignalSchema for nested and partial custom types
  • Adjusted db_signals to normalize nested names by replacing dots with double underscores
  • Updated to_partial to promote entire complex signals or generate partial custom types for specific fields
  • Added SignalSchema.group_by method to merge partition_by and aggregation outputs into a new schema
src/datachain/lib/signal_schema.py
Query builder deduplicates columns in group_by results
  • Replaced flat column list in apply_sql_clause with a dict-based approach to ensure unique column selection
  • Preserved labels and functions alongside raw columns
src/datachain/query/dataset.py
Support partial and list types in type conversion
  • Extended sql_to_python to infer list item types when available
  • Added ModelStore.is_partial helper to detect generated partial Pydantic models
src/datachain/lib/convert/sql_to_python.py
src/datachain/lib/model_store.py
Comprehensive tests for complex signal partitioning and grouping
  • Expanded test_signal_schema.py with nested and multiple-path to_partial scenarios
  • Added test_partition_by.py covering agg and group_by with File, nested Pydantic types, mixed partitions, functions, and error cases
tests/unit/lib/test_signal_schema.py
tests/unit/lib/test_partition_by.py

Assessment against linked issues

Issue Objective Addressed Explanation
#1211 Allow aggregation by complex signals like file.
#1211 Allow partitioning by complex signals like file.

Possibly linked issues


Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @dmpetrov - I've reviewed your changes and they look great!

Prompt for AI Agents
Please address the comments from this code review:
## Individual Comments

### Comment 1
<location> `src/datachain/lib/dc/datachain.py:844` </location>
<code_context>
                 elif isinstance(col, Function):
                     column = col.get_column(self.signals_schema)
                     processed_partition_columns.append(column)
+                elif isinstance(col, type) and issubclass(col, DataModel):
+                    # Handle complex signal types (like File, Image, etc.)
+                    complex_columns = self._process_complex_signal_partition(col)
+                    processed_partition_columns.extend(complex_columns)
                 else:
                     # Assume it's already a ColumnElement
</code_context>

<issue_to_address>
The handling of complex signal types assumes a single schema match.

Currently, only the first matching signal is used when multiple signals share the same DataModel type. Please clarify if this is intentional, and consider adding a warning or error if not.
</issue_to_address>

<suggested_fix>
<<<<<<< SEARCH
                elif isinstance(col, type) and issubclass(col, DataModel):
                    # Handle complex signal types (like File, Image, etc.)
                    complex_columns = self._process_complex_signal_partition(col)
                    processed_partition_columns.extend(complex_columns)
=======
                elif isinstance(col, type) and issubclass(col, DataModel):
                    # Handle complex signal types (like File, Image, etc.)
                    # Check for multiple matching signals in the schema
                    matching_signals = [
                        signal for signal in self.signals_schema
                        if isinstance(signal, col)
                    ]
                    if len(matching_signals) > 1:
                        import warnings
                        warnings.warn(
                            f"Multiple signals found for DataModel type {col.__name__} in signals_schema. "
                            "Only the first match will be used. Please clarify if this is intentional."
                        )
                    if matching_signals:
                        complex_columns = self._process_complex_signal_partition(type(matching_signals[0]))
                        processed_partition_columns.extend(complex_columns)
                    else:
                        raise ValueError(f"No signals found for DataModel type {col.__name__} in signals_schema.")
>>>>>>> REPLACE

</suggested_fix>

### Comment 2
<location> `src/datachain/lib/dc/datachain.py:966` </location>
<code_context>
+        
+        # Generate column objects for each unique key
+        partition_columns = []
+        for key in unique_keys:
+            col_name = f"{signal_name}.{key}"
+            col_db_name = ColumnMeta.to_db_name(col_name)
+            try:
+                col_type = self.signals_schema.get_column_type(col_db_name)
+                column = Column(col_db_name, python_to_sql(col_type))
+                partition_columns.append(column)
+            except Exception:
+                # Skip columns that don't exist in the schema
+                continue
+        
+        if not partition_columns:
</code_context>

<issue_to_address>
Silently skipping missing columns may hide schema issues.

Consider logging a warning or raising an error when expected unique keys are missing from the schema to aid debugging.
</issue_to_address>

<suggested_fix>
<<<<<<< SEARCH
        for key in unique_keys:
            col_name = f"{signal_name}.{key}"
            col_db_name = ColumnMeta.to_db_name(col_name)
            try:
                col_type = self.signals_schema.get_column_type(col_db_name)
                column = Column(col_db_name, python_to_sql(col_type))
                partition_columns.append(column)
            except Exception:
                # Skip columns that don't exist in the schema
                continue
=======
        import logging

        for key in unique_keys:
            col_name = f"{signal_name}.{key}"
            col_db_name = ColumnMeta.to_db_name(col_name)
            try:
                col_type = self.signals_schema.get_column_type(col_db_name)
                column = Column(col_db_name, python_to_sql(col_type))
                partition_columns.append(column)
            except Exception:
                logging.warning(
                    f"Unique key '{key}' (db column '{col_db_name}') not found in schema for signal '{signal_name}'."
                )
                continue
>>>>>>> REPLACE

</suggested_fix>

### Comment 3
<location> `src/datachain/lib/dc/datachain.py:977` </location>
<code_context>
+                # Skip columns that don't exist in the schema
+                continue
+        
+        if not partition_columns:
+            raise ValueError(
+                f"No valid partition columns found for signal type {signal_type}"
</code_context>

<issue_to_address>
Raising ValueError for no valid partition columns is appropriate but could be more informative.

Consider adding the signal name and attempted unique keys to the error message for easier debugging.
</issue_to_address>

<suggested_fix>
<<<<<<< SEARCH
        if not partition_columns:
            raise ValueError(
                f"No valid partition columns found for signal type {signal_type}"
            )
=======
        if not partition_columns:
            raise ValueError(
                f"No valid partition columns found for signal type '{signal_type}', signal name '{signal_name}'. "
                f"Attempted unique keys: {unique_keys}"
            )
>>>>>>> REPLACE

</suggested_fix>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Comment on lines 844 to 847
elif isinstance(col, type) and issubclass(col, DataModel):
# Handle complex signal types (like File, Image, etc.)
complex_columns = self._process_complex_signal_partition(col)
processed_partition_columns.extend(complex_columns)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (bug_risk): The handling of complex signal types assumes a single schema match.

Currently, only the first matching signal is used when multiple signals share the same DataModel type. Please clarify if this is intentional, and consider adding a warning or error if not.

Suggested change
elif isinstance(col, type) and issubclass(col, DataModel):
# Handle complex signal types (like File, Image, etc.)
complex_columns = self._process_complex_signal_partition(col)
processed_partition_columns.extend(complex_columns)
elif isinstance(col, type) and issubclass(col, DataModel):
# Handle complex signal types (like File, Image, etc.)
# Check for multiple matching signals in the schema
matching_signals = [
signal for signal in self.signals_schema
if isinstance(signal, col)
]
if len(matching_signals) > 1:
import warnings
warnings.warn(
f"Multiple signals found for DataModel type {col.__name__} in signals_schema. "
"Only the first match will be used. Please clarify if this is intentional."
)
if matching_signals:
complex_columns = self._process_complex_signal_partition(type(matching_signals[0]))
processed_partition_columns.extend(complex_columns)
else:
raise ValueError(f"No signals found for DataModel type {col.__name__} in signals_schema.")

Comment on lines 966 to 975
for key in unique_keys:
col_name = f"{signal_name}.{key}"
col_db_name = ColumnMeta.to_db_name(col_name)
try:
col_type = self.signals_schema.get_column_type(col_db_name)
column = Column(col_db_name, python_to_sql(col_type))
partition_columns.append(column)
except Exception:
# Skip columns that don't exist in the schema
continue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (bug_risk): Silently skipping missing columns may hide schema issues.

Consider logging a warning or raising an error when expected unique keys are missing from the schema to aid debugging.

Suggested change
for key in unique_keys:
col_name = f"{signal_name}.{key}"
col_db_name = ColumnMeta.to_db_name(col_name)
try:
col_type = self.signals_schema.get_column_type(col_db_name)
column = Column(col_db_name, python_to_sql(col_type))
partition_columns.append(column)
except Exception:
# Skip columns that don't exist in the schema
continue
import logging
for key in unique_keys:
col_name = f"{signal_name}.{key}"
col_db_name = ColumnMeta.to_db_name(col_name)
try:
col_type = self.signals_schema.get_column_type(col_db_name)
column = Column(col_db_name, python_to_sql(col_type))
partition_columns.append(column)
except Exception:
logging.warning(
f"Unique key '{key}' (db column '{col_db_name}') not found in schema for signal '{signal_name}'."
)
continue

Comment on lines 977 to 980
if not partition_columns:
raise ValueError(
f"No valid partition columns found for signal type {signal_type}"
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: Raising ValueError for no valid partition columns is appropriate but could be more informative.

Consider adding the signal name and attempted unique keys to the error message for easier debugging.

Suggested change
if not partition_columns:
raise ValueError(
f"No valid partition columns found for signal type {signal_type}"
)
if not partition_columns:
raise ValueError(
f"No valid partition columns found for signal type '{signal_type}', signal name '{signal_name}'. "
f"Attempted unique keys: {unique_keys}"
)

)

# Find the signal name in the schema that matches this type
signal_name = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (code-quality): Use the built-in function next instead of a for-loop (use-next)

Copy link

cloudflare-workers-and-pages bot commented Jul 8, 2025

Deploying datachain-documentation with  Cloudflare Pages  Cloudflare Pages

Latest commit: 09b8a6e
Status: ✅  Deploy successful!
Preview URL: https://06b72849.datachain-documentation.pages.dev
Branch Preview URL: https://cursor-propose-pr-for-comple.datachain-documentation.pages.dev

View logs

@dmpetrov dmpetrov marked this pull request as draft July 8, 2025 23:24
Copy link

codecov bot commented Jul 9, 2025

Codecov Report

Attention: Patch coverage is 90.42553% with 9 lines in your changes missing coverage. Please review.

Project coverage is 88.69%. Comparing base (fd3795e) to head (09b8a6e).
Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
src/datachain/lib/dc/datachain.py 92.15% 1 Missing and 3 partials ⚠️
src/datachain/query/dataset.py 76.92% 2 Missing and 1 partial ⚠️
src/datachain/lib/convert/sql_to_python.py 60.00% 1 Missing and 1 partial ⚠️
Additional details and impacted files

Impacted file tree graph

@@           Coverage Diff           @@
##             main    #1215   +/-   ##
=======================================
  Coverage   88.68%   88.69%           
=======================================
  Files         152      152           
  Lines       13606    13680   +74     
  Branches     1893     1909   +16     
=======================================
+ Hits        12067    12133   +66     
- Misses       1093     1099    +6     
- Partials      446      448    +2     
Flag Coverage Δ
datachain 88.62% <90.42%> (+<0.01%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
src/datachain/lib/model_store.py 93.84% <100.00%> (+0.29%) ⬆️
src/datachain/lib/signal_schema.py 96.10% <100.00%> (+0.68%) ⬆️
src/datachain/lib/convert/sql_to_python.py 87.50% <60.00%> (-12.50%) ⬇️
src/datachain/query/dataset.py 93.33% <76.92%> (-0.27%) ⬇️
src/datachain/lib/dc/datachain.py 90.16% <92.15%> (+0.07%) ⬆️

... and 1 file with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@dmpetrov dmpetrov marked this pull request as ready for review July 12, 2025 00:33
@dmpetrov dmpetrov changed the title Propose PR for complex signals support partition_by with complex signals - includes agg and group_by Jul 12, 2025
Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @dmpetrov - I've reviewed your changes - here's some feedback:

  • Consider refactoring the complex signal expansion logic in _process_complex_signal_column and group_by into smaller shared helpers to reduce duplication and improve maintainability.
  • The to_partial method handles many nested edge cases in one large function—splitting it into dedicated helpers (e.g., full signal vs partial field handling) would make it more readable and testable.
  • Deeply nested model expansions can generate very wide queries; adding a safeguard or configuration to limit column explosion could help prevent potential performance issues.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- Consider refactoring the complex signal expansion logic in `_process_complex_signal_column` and `group_by` into smaller shared helpers to reduce duplication and improve maintainability.
- The `to_partial` method handles many nested edge cases in one large function—splitting it into dedicated helpers (e.g., full signal vs partial field handling) would make it more readable and testable.
- Deeply nested model expansions can generate very wide queries; adding a safeguard or configuration to limit column explosion could help prevent potential performance issues.

## Individual Comments

### Comment 1
<location> `src/datachain/lib/signal_schema.py:592` </location>
<code_context>
         ]

         if name:
+            if "." in name:
+                name = name.replace(".", "__")
+
             signals = [
</code_context>

<issue_to_address>
Replacing dots with double underscores in column names may cause collisions if original column names contain double underscores.

Recommend choosing a delimiter or escaping method that ensures unique column names, even if original names contain double underscores.
</issue_to_address>

<suggested_fix>
<<<<<<< SEARCH
        if name:
            if "." in name:
                name = name.replace(".", "__")
=======
        if name:
            # Escape underscores and dots to avoid collisions
            name = name.replace("_", "__UND__").replace(".", "__DOT__")
>>>>>>> REPLACE

</suggested_fix>

### Comment 2
<location> `src/datachain/lib/signal_schema.py:945` </location>
<code_context>
         partial_versions: dict[str, int] = {}

         def _type_name_to_partial(signal_name: str, type_name: str) -> str:
-            if "@" not in type_name:
+            # Check if we need to create a partial for this type
+            # Only create partials for custom types that are in the custom_types dict
+            if type_name not in custom_types:
                 return type_name
-            model_name, _ = ModelStore.parse_name_version(type_name)
</code_context>

<issue_to_address>
The check for custom_types before handling '@' in type_name may lead to inconsistent partial type handling.

Please review the logic to ensure partial types are created consistently, even for types that might be added to custom_types in the future.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Comment on lines 945 to +948
def _type_name_to_partial(signal_name: str, type_name: str) -> str:
if "@" not in type_name:
# Check if we need to create a partial for this type
# Only create partials for custom types that are in the custom_types dict
if type_name not in custom_types:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (bug_risk): The check for custom_types before handling '@' in type_name may lead to inconsistent partial type handling.

Please review the logic to ensure partial types are created consistently, even for types that might be added to custom_types in the future.

@@ -969,7 +1022,7 @@ def select_except(self, *args: str) -> "Self":
)

@delta_disabled # type: ignore[arg-type]
def group_by(
def group_by( # noqa: C901, PLR0912
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (code-quality): We've found these issues:


Explanation

The quality score for this function is below the quality threshold of 25%.
This score is a combination of the method length, cognitive complexity and working memory.

How can you solve this?

It might be worth refactoring this function to make it shorter and more readable.

  • Reduce the function length by extracting pieces of functionality out into
    their own functions. This is the most important thing you can do - ideally a
    function should be less than 10 lines.
  • Reduce nesting, perhaps by introducing guard clauses to return early.
  • Ensure that variables are tightly scoped, so that code using related concepts
    sits together within the function rather than being scattered.

Copy link
Contributor

@dreadatour dreadatour left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me 👍

@dreadatour dreadatour merged commit 05dfadf into main Jul 12, 2025
35 checks passed
@dreadatour dreadatour deleted the cursor/propose-pr-for-complex-signals-support-9ebe branch July 12, 2025 14:51
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Allow complex signals in partition_by
3 participants