Skip to content

Fix partition_by for queries without sys__id #1234

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 14, 2025

Conversation

dreadatour
Copy link
Contributor

@dreadatour dreadatour commented Jul 13, 2025

Part of the #1213

In case query have no sys_id field (for example, group_by query), aggregate does not works (see traceback below).

In this PR I am fixing this specific case by creating temp table and copying query result to it.

Test was also added.

Traceback (most recent call last):
  File "<string>", line 35, in <module>
  File "/tmp/local/datachain_venv/python3.11/733fe23f9c9da996e532eb256ede753e0e46c859e1fbff91b8ec2ac80ba6364a/lib/python3.11/site-packages/datachain/lib/dc/datachain.py", line 635, in save
    query=self._query.save(
          ^^^^^^^^^^^^^^^^^
  File "/tmp/local/datachain_venv/python3.11/733fe23f9c9da996e532eb256ede753e0e46c859e1fbff91b8ec2ac80ba6364a/lib/python3.11/site-packages/datachain/query/dataset.py", line 1749, in save
    query = self.apply_steps()
            ^^^^^^^^^^^^^^^^^^
  File "/tmp/local/datachain_venv/python3.11/733fe23f9c9da996e532eb256ede753e0e46c859e1fbff91b8ec2ac80ba6364a/lib/python3.11/site-packages/datachain/query/dataset.py", line 1259, in apply_steps
    result = step.apply(
             ^^^^^^^^^^^
  File "/tmp/local/datachain_venv/python3.11/733fe23f9c9da996e532eb256ede753e0e46c859e1fbff91b8ec2ac80ba6364a/lib/python3.11/site-packages/datachain/query/dataset.py", line 609, in apply
    partition_tbl = self.create_partitions_table(query)
                    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/tmp/local/datachain_venv/python3.11/733fe23f9c9da996e532eb256ede753e0e46c859e1fbff91b8ec2ac80ba6364a/lib/python3.11/site-packages/datachain/query/dataset.py", line 578, in create_partitions_table
    query.selected_columns.sys__id,
    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/tmp/local/datachain_venv/python3.11/733fe23f9c9da996e532eb256ede753e0e46c859e1fbff91b8ec2ac80ba6364a/lib/python3.11/site-packages/sqlalchemy/sql/base.py", line 1631, in __getattr__
    raise AttributeError(key) from err
AttributeError: sys__id

Summary by Sourcery

Enable partitioning for queries that lack a sys__id column by copying the query results into a temporary table before applying partition logic, and add a test to verify aggregation after group_by.

Bug Fixes:

  • Support partition_by for queries without a sys__id column by creating a temporary table and copying the query results before partitioning

Tests:

  • Add test for aggregate operations following a group_by on queries that do not include sys__id

@dreadatour dreadatour requested review from shcheklein, dmpetrov, a team and Copilot July 13, 2025 13:48
@dreadatour dreadatour self-assigned this Jul 13, 2025
Copy link
Contributor

sourcery-ai bot commented Jul 13, 2025

Reviewer's Guide

Partitioning now supports queries lacking a sys__id column by automatically creating and using a temporary table, and a new test ensures aggregation after group_by works in this scenario.

Sequence diagram for partition_by with missing sys__id

sequenceDiagram
    participant Query as Query
    participant Dataset as Dataset
    participant Warehouse as Warehouse
    participant TempTable as TempTable

    Query->>Dataset: apply(partition_by)
    Dataset->>Query: Check for sys__id in selected_columns
    alt sys__id missing
        Dataset->>Warehouse: create_dataset_rows_table(temp_table_name, columns)
        Warehouse-->>TempTable: Return temp table
        Dataset->>Warehouse: copy_table(temp_table, query)
        Dataset->>TempTable: select()
        Dataset->>Dataset: Use temp table as new query
    end
    Dataset->>Dataset: create_partitions_table(query)
    Dataset->>Query: outerjoin with partition table
    Dataset-->>Query: Return partitioned query
Loading

Class diagram for Dataset partitioning logic update

classDiagram
    class Dataset {
        +partition_by
        +apply(query)
        +create_partitions_table(query)
    }
    class Warehouse {
        +create_dataset_rows_table(name, columns)
        +copy_table(temp_table, query)
        +temp_table_name()
    }
    Dataset --> Warehouse : uses
    Dataset : +apply(query)
    Dataset : +create_partitions_table(query)
    Warehouse : +create_dataset_rows_table(name, columns)
    Warehouse : +copy_table(temp_table, query)
    Warehouse : +temp_table_name()
Loading

File-Level Changes

Change Details Files
Introduce temporary table creation when sys__id is missing to enable partition_by
  • Assert presence of sys__id in create_partitions_table
  • Detect missing sys__id in apply step and build temp table
  • Copy original query results into temp table and reassign query to its select
src/datachain/query/dataset.py
Add integration test for aggregation following group_by without sys__id
  • Define a chain grouping by file path and summing fields
  • Invoke agg on grouped results to count files and total amount
  • Assert expected output for test_aggregate_after_group_by
tests/unit/lib/test_partition_by.py

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @dreadatour - I've reviewed your changes - here's some feedback:

  • Extract the duplicated sys__id existence checks in apply() and create_partitions_table() into a shared helper to reduce repetition.
  • Consider using a SQL window function (e.g. ROW_NUMBER) to synthesize a row identifier instead of physically copying to a temp table for better performance.
  • Move the temp‐table creation and copy logic into a private method (e.g. _ensure_sys_id) to keep apply() concise and improve readability.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- Extract the duplicated sys__id existence checks in apply() and create_partitions_table() into a shared helper to reduce repetition.
- Consider using a SQL window function (e.g. ROW_NUMBER) to synthesize a row identifier instead of physically copying to a temp table for better performance.
- Move the temp‐table creation and copy logic into a private method (e.g. _ensure_sys_id) to keep apply() concise and improve readability.

## Individual Comments

### Comment 1
<location> `src/datachain/query/dataset.py:566` </location>
<code_context>
         assert self.partition_by is not None
+        # Check if sys__id is in the query, we need it to be able to join
+        # the partition table with the udf table later.
+        assert any(c.name == "sys__id" for c in query.selected_columns), (
+            "Query must have sys__id column to use partitioning."
+        )

</code_context>

<issue_to_address>
Using assert for runtime validation may not be reliable in production.

Assertions can be disabled with the -O flag. Use a specific exception like ValueError to ensure this check is always enforced.
</issue_to_address>

<suggested_fix>
<<<<<<< SEARCH
        # Check if sys__id is in the query, we need it to be able to join
        # the partition table with the udf table later.
        assert any(c.name == "sys__id" for c in query.selected_columns), (
            "Query must have sys__id column to use partitioning."
        )
=======
        # Check if sys__id is in the query, we need it to be able to join
        # the partition table with the udf table later.
        if not any(c.name == "sys__id" for c in query.selected_columns):
            raise ValueError("Query must have sys__id column to use partitioning.")
>>>>>>> REPLACE

</suggested_fix>

### Comment 2
<location> `src/datachain/query/dataset.py:615` </location>
<code_context>

         # Apply partitioning if needed.
         if self.partition_by is not None:
+            if not any(c.name == "sys__id" for c in query.selected_columns):
+                # If sys__id is not in the query, we need to create a temp table
+                # to hold the query results, so we can join it with the
</code_context>

<issue_to_address>
Duplicated sys__id presence check could be refactored for clarity.

This logic is also present in create_partitions_table; centralizing it would help prevent inconsistencies and simplify maintenance.

Suggested implementation:

```python
        # the partition table with the udf table later.
        assert self._has_sys_id_column(query), (
            "Query must have sys__id column to use partitioning."
        )

```

```python
        # Apply partitioning if needed.
        if self.partition_by is not None:
            if not self._has_sys_id_column(query):
                # If sys__id is not in the query, we need to create a temp table
                # to hold the query results, so we can join it with the
                # partition table later.
                columns = [
                    c if isinstance(c, Column) else Column(c.name, c.type)
                    for c in query.subquery().columns
                ]
                temp_table = self.catalog.warehouse.create_dataset_rows_table(
                    self.catalog.warehouse.temp_table_name(),
                    columns=columns,
                )
                temp_tables.append(temp_table.name)

```

```python
    def _has_sys_id_column(self, query):
        """
        Returns True if the query's selected columns include a column named 'sys__id'.
        """
        return any(c.name == "sys__id" for c in query.selected_columns)

```

You should also update any other locations in this file (such as `create_partitions_table` or similar methods) where the sys__id presence check is performed, replacing the direct check with a call to `self._has_sys_id_column(query)`.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Comment on lines +564 to +568
# Check if sys__id is in the query, we need it to be able to join
# the partition table with the udf table later.
assert any(c.name == "sys__id" for c in query.selected_columns), (
"Query must have sys__id column to use partitioning."
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (bug_risk): Using assert for runtime validation may not be reliable in production.

Assertions can be disabled with the -O flag. Use a specific exception like ValueError to ensure this check is always enforced.

Suggested change
# Check if sys__id is in the query, we need it to be able to join
# the partition table with the udf table later.
assert any(c.name == "sys__id" for c in query.selected_columns), (
"Query must have sys__id column to use partitioning."
)
# Check if sys__id is in the query, we need it to be able to join
# the partition table with the udf table later.
if not any(c.name == "sys__id" for c in query.selected_columns):
raise ValueError("Query must have sys__id column to use partitioning.")

@@ -606,6 +612,22 @@ def apply(

# Apply partitioning if needed.
if self.partition_by is not None:
if not any(c.name == "sys__id" for c in query.selected_columns):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: Duplicated sys__id presence check could be refactored for clarity.

This logic is also present in create_partitions_table; centralizing it would help prevent inconsistencies and simplify maintenance.

Suggested implementation:

        # the partition table with the udf table later.
        assert self._has_sys_id_column(query), (
            "Query must have sys__id column to use partitioning."
        )
        # Apply partitioning if needed.
        if self.partition_by is not None:
            if not self._has_sys_id_column(query):
                # If sys__id is not in the query, we need to create a temp table
                # to hold the query results, so we can join it with the
                # partition table later.
                columns = [
                    c if isinstance(c, Column) else Column(c.name, c.type)
                    for c in query.subquery().columns
                ]
                temp_table = self.catalog.warehouse.create_dataset_rows_table(
                    self.catalog.warehouse.temp_table_name(),
                    columns=columns,
                )
                temp_tables.append(temp_table.name)
    def _has_sys_id_column(self, query):
        """
        Returns True if the query's selected columns include a column named 'sys__id'.
        """
        return any(c.name == "sys__id" for c in query.selected_columns)

You should also update any other locations in this file (such as create_partitions_table or similar methods) where the sys__id presence check is performed, replacing the direct check with a call to self._has_sys_id_column(query).

@@ -606,6 +612,22 @@ def apply(

# Apply partitioning if needed.
if self.partition_by is not None:
if not any(c.name == "sys__id" for c in query.selected_columns):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (code-quality): Invert any/all to simplify comparisons (invert-any-all)

Suggested change
if not any(c.name == "sys__id" for c in query.selected_columns):
if all(c.name != "sys__id" for c in query.selected_columns):

Copy link

codecov bot commented Jul 13, 2025

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 88.67%. Comparing base (8b3c25a) to head (7622d75).
Report is 3 commits behind head on main.

Additional details and impacted files

Impacted file tree graph

@@           Coverage Diff           @@
##             main    #1234   +/-   ##
=======================================
  Coverage   88.66%   88.67%           
=======================================
  Files         153      153           
  Lines       13793    13800    +7     
  Branches     1927     1928    +1     
=======================================
+ Hits        12230    12237    +7     
  Misses       1109     1109           
  Partials      454      454           
Flag Coverage Δ
datachain 88.60% <100.00%> (+<0.01%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
src/datachain/query/dataset.py 93.38% <100.00%> (+0.05%) ⬆️
🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Copy link
Contributor

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot encountered an error and was unable to review this pull request. You can try again by re-requesting a review.

Copy link
Member

@shcheklein shcheklein left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems a bit of an overkill tbh.

  • we do an optimization to use ids
  • but now we have to copy tables to have those ids

Q: can we avoid copying? (put assert to fail fast though if it's not present) - can all operations include sys_id at the end?
Q: are we relying on sys_id in some other operations - will we have to copy tables there as well eventually?

@dreadatour dreadatour closed this Jul 13, 2025
@dreadatour dreadatour reopened this Jul 13, 2025
@dreadatour
Copy link
Contributor Author

Seems a bit of an overkill tbh.

Agree, but I can not find a proper way to do this without materializing query to the temp table for the group_by.

Also the only case I know when query have no sys__id present is group_by result, which, in general, produce less columns and it should be "ok"-ish to copy this table.

Another possible option for query to do not have sys__id is when user select only limited set of columns, but in this case it is safe to manually add sys__id field.

  • we do an optimization to use ids
  • but now we have to copy tables to have those ids

No, this is different scenarios.

  1. We do have an optimization to use IDs, but this is only for UDFs and for UDFs we always copy table anyway, so this optimization is only to pass rows to the UDF workers. Nothing was changed there.

  2. We now have to copy table to have those IDs is because group_by query does not have IDs (in general) and it is very tricky to have those IDs in the query and moreover to have those IDs consistent across queries run.

Q: can we avoid copying? (put assert to fail fast though if it's not present) - can all operations include sys_id at the end?

The reason we cannot avoid copying is that some SQL queries do not return IDs in their results, and I have not found a reliable way to add those IDs to the queries.

I could add something like SELECT random() AS sys__id, ..., but these IDs would not be idempotent, they would change with each query execution.

I could add something like SELECT row_number() AS sys__id, ..., but I can not make GROUP BY work with such a complex query.

More details on internals below (in the next comment).

Q: are we relying on sys_id in some other operations - will we have to copy tables there as well eventually?

No, aggregation with partition_by is only place I know where it is crucial to have sys__id field in the query. See next comment.

@dreadatour
Copy link
Contributor Author

So why do we need sys__id to be set for aggregation with partitioning? Let me start from the very beginning, with obvious things.

DataChain in general follows classic ETL pattern:

  1. Extract data from sources (datasets, s3 buckets, CSV, etc);
  2. Transform data with operations (on SQL level and via UDFs);
  3. Load (save) results (to datasets, s3 buckets, etc).

Transformation can be done on two levels:

  1. Raw SQL queries: filtering, merging, limiting, group by, etc;
  2. Python User-Defined Functions (UDFs): mappers, generators, aggregators, etc.

All transformations can be chained, and results of the previous transformation became an input to the next one.
For SQL queries we are basically using subqueries (it is more complicated, but for now let's assume we are using basic subqueries), and result of the previous step became a subquery to the next step. For UDFs we are "materializing" queries to the temp tables and passing set of IDs to each UDF worker, so it can then fetch basic rows from temp table by IDs and process the rows, saving result to another temp table with results. Next step will do SELECT * FROM temp_results and pass it to the next transformer as raw SQL query and so on and so on.

One special case is aggregation with partitioning. Since query from all previous transformers (steps) can be complex, we are doing partitioning the following way:

  1. We do have query from all previous steps. This query should have sys__id (see below why).
  2. Create new temp table with two fields: sys__id and partition_id.
  3. Fill this table using sys__id from the query and partition_id = DENSE_RANK() OVER (PARTITION BY ...) (see here for SQLite, same for ClickHouse).
  4. Next when we do aggregation, we are selecting rows from the query, JOINed with the temp tables with partition_id by sys__id with ORDER BY partition_id, .... We can then collect all the rows for each partition and pass them to aggregation UDF.

The reason why we do need to have sys__id in query, and it should be set, is because we are running query twice:

  1. To fill temp table with partition_by
  2. To fetch rows for the next step (transformer).

Thus sys__id can not be generated (like SELECT random() AS sys__id).


Moreover, I am now thinking we should always copy query to the temp table, because we are running query twice and we need to be 100% results will be idempotent. We can get rid of temp table with two fields: sys__id and partition_id, we can use one single temp table for both "materializing" the query into temp table and to store partition_id.

@dreadatour
Copy link
Contributor Author

I also did some unsuccessful experiments.

  1. Using row_number as sys__id: Column udf_3dsrZD.partition_id is not under aggregate function and not in GROUP BY keys

    SELECT rowNumberInAllBlocks() + 1 AS sys__id,
           anon_2.file__path AS file__path,
           count(*) AS cnt,
           partition_id
    FROM
      (SELECT ds.sys__id AS sys__id,
              ds.sys__rand AS sys__rand,
              ds.file__source AS file__source,
              ds.file__path AS file__path,
              ds.file__size AS file__size,
              ds.file__version AS file__version,
              ds.file__etag AS file__etag,
              ds.file__is_latest AS file__is_latest,
              ds.file__last_modified AS file__last_modified,
              ds.file__location AS file__location
       FROM ds
       ORDER BY rand() ASC LIMIT 5000) AS anon_2
    LEFT JOIN udf_3dsrZD ON udf_3dsrZD.sys__id = (rowNumberInAllBlocks() + 1)
    GROUP BY file__path
    
  2. Selecting sys__id from the original table when doing GROUP_BY: `Column anon_1.sys__id is not under aggregate function and not in GROUP BY keys```

    SELECT anon_1.sys__id,
           dense_rank() OVER () AS partition_id
    FROM
      (SELECT ds.sys__id AS sys__id,
              ds.sys__rand AS sys__rand,
              ds.file__source AS file__source,
              ds.file__path AS file__path,
              ds.file__size AS file__size,
              ds.file__version AS file__version,
              ds.file__etag AS file__etag,
              ds.file__is_latest AS file__is_latest,
              ds.file__last_modified AS file__last_modified,
              ds.file__location AS file__location
       FROM ds
       ORDER BY rand() ASC LIMIT 5000) AS anon_1
    GROUP BY file__path
    

I keep investigating, but for now I am thinking creating temp table in case of aggregation with partitioning is a best way to handle this issue and to avoid any possible issues in future with queries running twice (e.g. someone want to use random() or something like that).

@dreadatour dreadatour merged commit 8925faf into main Jul 14, 2025
66 checks passed
@dreadatour dreadatour deleted the fix-partition-by-for-queries-without-sys-id branch July 14, 2025 16:20
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants