Skip to content

feat(window): window functions SQL #4227

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 145 commits into
base: main
Choose a base branch
from
Open

feat(window): window functions SQL #4227

wants to merge 145 commits into from

Conversation

f4t4nt
Copy link
Contributor

@f4t4nt f4t4nt commented Apr 21, 2025

SQL support for window functions

f4t4nt added 30 commits March 26, 2025 18:25
f4t4nt and others added 18 commits April 22, 2025 11:18
## Changes Made

- fix format of udf docstring (resource requests + >>>)
- remove TableSource from Catalogs doc
- update headings in Iceberg doc

## Related Issues

n/a

## Checklist

- [ ] Documented in API Docs (if applicable)
- [ ] Documented in User Guide (if applicable)
- [ ] If adding a new documentation page, doc is added to
`docs/mkdocs.yml` navigation
- [ ] Documentation builds and is formatted properly (tag @/ccmao1130
for docs review)
…ctor pool UDF (#4222)

## Changes Made

There's currently an issue where returning extension type from actor
pool udf gives a cannot pickle error, see:
#4213. This is because the
Daft extension type class is a local object (for lazy loading purposes).
Instead of pickling it we serialize the data itself to arrow ipc.

Also, sending data via pipe is too slow. It is designed for message
passing not data transfer. One solution is to use shared memory.

Example script (With current implementation using mp.pipe, this takes
**2 minutes and 25s**. With shared memory, it takes **16s.**)
```
@daft.udf(return_dtype=daft.DataType.list(daft.DataType.int8()), batch_size=10)
def foo(x):
    data_size = 10 * 1024 * 1024 # 10 MB
    res = []
    for i in range(len(x)):
        res.append(np.random.randint(0, 100, size=(data_size,), dtype=np.int8))
    return res


if __name__ == "__main__":
    # total around 10 gb of data
    num_rows = 1000
    df = daft.from_pydict({"index": [i for i in range(num_rows)]})
    foo = foo.with_concurrency(12)
    df = df.with_column("bar", foo(df["index"])).collect()
```

Additionally, this PR also improves error handling.


## Related Issues

<!-- Link to related GitHub issues, e.g., "Closes #123" -->

## Checklist

- [ ] Documented in API Docs (if applicable)
- [ ] Documented in User Guide (if applicable)
- [ ] If adding a new documentation page, doc is added to
`docs/mkdocs.yml` navigation
- [ ] Documentation builds and is formatted properly (tag @/ccmao1130
for docs review)
## Changes Made

Adds an integration test to ensure that Glue/Iceberg -> Pytorch
dataloading works.
## Changes Made

This introduces a simple Term IR ([much like
pyiceberg](https://py.iceberg.apache.org/reference/pyiceberg/expressions/#pyiceberg.expressions.Term))
which is a model for pushdown expressions. This is scoped to the python
pushdown package, and the Term IR comes with a visitor which can be used
to translate to other domains. This enable easy consumption of PyExpr /
rust expressions. I have an example visitor which produces
s-expressions, and can add one to translate to the PyIceberg Expression
IR.

An alternative design is no IR and visitor-only - this is nice because
there is no additional IR, however it comes at the tradeoff of possible
tighter coupling to the Rust expressions and we may want an IR anyways
if we ever want to serde pushdown expressions. Please let me know what
you all think is best here. #4210
…4236)

## Changes Made

Simplify the example by using `GlueCatalog.from_session()` instead of
`load_glue()`
The commit moves SQLPlanner to use session references instead of owning
the Session through Rc. This simplifies lifetime handling and state
management.

## Changes Made

- `SQLPlanner` now takes `&Session` instead of `Rc<Session>`
- Removed `fork()` from Session API
- Added bound_tables to `PlannerContext` for local table bindings
- Updated all callers to pass session references



<!-- Describe what changes were made and why. Include implementation
details if necessary. -->

## Related Issues

closes #4207

## Checklist

- [ ] Documented in API Docs (if applicable)
- [ ] Documented in User Guide (if applicable)
- [ ] If adding a new documentation page, doc is added to
`docs/mkdocs.yml` navigation
- [ ] Documentation builds and is formatted properly (tag @/ccmao1130
for docs review)
## Changes Made

Previously `GlueCatalog.from_session()` only accepted a boto3 session.
Add a runtime check so that it can accept a botocore session too.

## Checklist

- [x] Documented in API Docs (if applicable)
## Changes Made

Adds a property map to the `create_table` APIs, context and use cases
are linked in #4195

## Related Issues

closes #4195

## Checklist

- [x] Documented in API Docs (if applicable)
- [x] Documented in User Guide (if applicable)
- [n/a] If adding a new documentation page, doc is added to
`docs/mkdocs.yml` navigation
- [n/a] Documentation builds and is formatted properly (tag @/ccmao1130
for docs review)
## Changes Made

Add an integration test for pandas to daft to glue iceberg.

Currently this does create a new iceberg commit per write, but I'm
trying to modify the Glue table so that these get cleaned up after a
day.
@f4t4nt f4t4nt force-pushed the f4t4nt/window-sql branch from 1d0c256 to 2c665a0 Compare April 24, 2025 19:58
@f4t4nt f4t4nt force-pushed the f4t4nt/window-sql branch 3 times, most recently from 2a4fc12 to 3e82d49 Compare April 24, 2025 20:23
Copy link

codecov bot commented Apr 24, 2025

Codecov Report

Attention: Patch coverage is 69.76744% with 65 lines in your changes missing coverage. Please review.

Project coverage is 78.47%. Comparing base (e99d8cc) to head (acc6cef).
Report is 4 commits behind head on main.

Files with missing lines Patch % Lines
src/daft-sql/src/functions.rs 36.36% 56 Missing ⚠️
src/daft-sql/src/modules/window.rs 93.93% 6 Missing ⚠️
src/daft-logical-plan/src/builder/resolve_expr.rs 89.47% 2 Missing ⚠️
src/daft-local-plan/src/plan.rs 66.66% 1 Missing ⚠️
Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main    #4227      +/-   ##
==========================================
- Coverage   78.48%   78.47%   -0.01%     
==========================================
  Files         798      799       +1     
  Lines      105351   105581     +230     
==========================================
+ Hits        82686    82856     +170     
- Misses      22665    22725      +60     
Files with missing lines Coverage Δ
daft/expressions/expressions.py 94.91% <100.00%> (ø)
...ecution/src/sinks/window_partition_and_order_by.rs 80.00% <ø> (ø)
src/daft-logical-plan/src/builder/mod.rs 88.95% <100.00%> (+0.04%) ⬆️
src/daft-local-plan/src/plan.rs 93.39% <66.66%> (+0.23%) ⬆️
src/daft-logical-plan/src/builder/resolve_expr.rs 81.96% <89.47%> (+0.63%) ⬆️
src/daft-sql/src/modules/window.rs 93.93% <93.93%> (ø)
src/daft-sql/src/functions.rs 70.77% <36.36%> (-10.89%) ⬇️

... and 11 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.


daft_result = df.with_column("row_num", row_number().over(window_spec)).sort(["category", "value"]).collect()

assert_df_equals(sql_result.to_pandas(), daft_result.to_pandas(), sort_key=["category", "value"])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any particular reason to use pandas for the comparison?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep assert_df_equals uses pandas to check equality of dataframes

def assert_df_equals(
    daft_df: pd.DataFrame,
    pd_df: pd.DataFrame,
    sort_key: str | list[str] = "Unique Key",
    assert_ordering: bool = False,
    check_dtype: bool = True,
):

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants