From dd6bb21008e24a12f9a2418051b7ba47e9a3847e Mon Sep 17 00:00:00 2001 From: fjetter Date: Thu, 31 Oct 2024 15:37:18 +0100 Subject: [PATCH 1/6] Remove recursion in task spec --- dask_expr/_shuffle.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dask_expr/_shuffle.py b/dask_expr/_shuffle.py index c4fc319d..a522ff0c 100644 --- a/dask_expr/_shuffle.py +++ b/dask_expr/_shuffle.py @@ -592,7 +592,7 @@ def _layer(self): _barrier_key, p2p_barrier, token, - transfer_keys, + *transfer_keys, spec=DataFrameShuffleSpec( id=shuffle_id, npartitions=self.npartitions_out, From f66f88e96c702430231ba5efbbca5d036d264850 Mon Sep 17 00:00:00 2001 From: fjetter Date: Mon, 4 Nov 2024 11:15:37 +0100 Subject: [PATCH 2/6] run against fork --- .github/workflows/dask_test.yaml | 4 ++-- .github/workflows/test.yaml | 4 ++-- ci/environment.yml | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/.github/workflows/dask_test.yaml b/.github/workflows/dask_test.yaml index 6c03f465..3414c99d 100644 --- a/.github/workflows/dask_test.yaml +++ b/.github/workflows/dask_test.yaml @@ -47,10 +47,10 @@ jobs: cache-environment-key: environment-${{ steps.date.outputs.date }}-0 - name: Install current main versions of dask - run: python -m pip install git+https://github.com/dask/dask + run: python -m pip install git+https://github.com/fjetter/dask@task_spec_remove_recursion - name: Install current main versions of distributed - run: python -m pip install git+https://github.com/dask/distributed + run: python -m pip install git+https://github.com/fjetter/distributed@task_spec_remove_recursion - name: Install dask-expr run: python -m pip install -e . --no-deps diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index b8cca6b6..325c5878 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -47,11 +47,11 @@ jobs: cache-environment-key: environment-${{ steps.date.outputs.date }}-1 - name: Install current main versions of dask - run: python -m pip install git+https://github.com/dask/dask + run: python -m pip install git+https://github.com/fjetter/dask@task_spec_remove_recursion if: ${{ matrix.environment-file == 'ci/environment.yml' }} - name: Install current main versions of distributed - run: python -m pip install git+https://github.com/dask/distributed + run: python -m pip install git+https://github.com/fjetter/distributed@task_spec_remove_recursion if: ${{ matrix.environment-file == 'ci/environment.yml' }} - name: Install dask-expr diff --git a/ci/environment.yml b/ci/environment.yml index e2877f43..b6569d66 100644 --- a/ci/environment.yml +++ b/ci/environment.yml @@ -12,5 +12,5 @@ dependencies: - sqlalchemy - xarray - pip: - - git+https://github.com/dask/distributed - - git+https://github.com/dask/dask + - git+https://github.com/fjetter/distributed@task_spec_remove_recursion + - git+https://github.com/fjetter/dask@task_spec_remove_recursion From 6499f3716165ab7f7ad292226ebe65491013b2ae Mon Sep 17 00:00:00 2001 From: fjetter Date: Fri, 8 Nov 2024 17:56:12 +0100 Subject: [PATCH 3/6] use list types --- dask_expr/_merge.py | 4 ++-- dask_expr/io/io.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/dask_expr/_merge.py b/dask_expr/_merge.py index d387ac57..0db97957 100644 --- a/dask_expr/_merge.py +++ b/dask_expr/_merge.py @@ -680,7 +680,7 @@ def _layer(self) -> dict: _barrier_key_left, p2p_barrier, token_left, - transfer_keys_left, + *transfer_keys_left, spec=DataFrameShuffleSpec( id=token_left, npartitions=self.npartitions, @@ -698,7 +698,7 @@ def _layer(self) -> dict: _barrier_key_right, p2p_barrier, token_right, - transfer_keys_right, + *transfer_keys_right, spec=DataFrameShuffleSpec( id=token_right, npartitions=self.npartitions, diff --git a/dask_expr/io/io.py b/dask_expr/io/io.py index 1ef37683..6a2e9d6c 100644 --- a/dask_expr/io/io.py +++ b/dask_expr/io/io.py @@ -6,7 +6,7 @@ import numpy as np import pyarrow as pa -from dask._task_spec import Task +from dask._task_spec import List, Task from dask.dataframe import methods from dask.dataframe._pyarrow import to_pyarrow_string from dask.dataframe.core import apply_and_enforce, is_dataframe_like, make_meta @@ -135,7 +135,7 @@ def _task(self, name: Key, index: int) -> Task: bucket = self._fusion_buckets[index] # FIXME: This will likely require a wrapper return Task( - name, methods.concat, [expr._filtered_task(name, i) for i in bucket] + name, methods.concat, List(*(expr._filtered_task(name, i) for i in bucket)) ) @functools.cached_property From 5310f1806d7436ad5a1a02058e552771b75d783a Mon Sep 17 00:00:00 2001 From: fjetter Date: Fri, 8 Nov 2024 18:05:35 +0100 Subject: [PATCH 4/6] more generous size --- dask_expr/io/tests/test_distributed.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dask_expr/io/tests/test_distributed.py b/dask_expr/io/tests/test_distributed.py index b8e919d2..f2c0ba0e 100644 --- a/dask_expr/io/tests/test_distributed.py +++ b/dask_expr/io/tests/test_distributed.py @@ -63,4 +63,4 @@ def test_pickle_size(tmpdir, filesystem): df = read_parquet(tmpdir, filesystem=filesystem) from distributed.protocol import dumps - assert len(b"".join(dumps(df.optimize().dask))) <= 9000 + assert len(b"".join(dumps(df.optimize().dask))) <= 9100 From 0c74e3f4c5202cf79f3270bbacb44492b9053d21 Mon Sep 17 00:00:00 2001 From: fjetter Date: Wed, 13 Nov 2024 14:37:20 +0100 Subject: [PATCH 5/6] use Dict --- dask_expr/_expr.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dask_expr/_expr.py b/dask_expr/_expr.py index 3a42d3d9..f1105d58 100644 --- a/dask_expr/_expr.py +++ b/dask_expr/_expr.py @@ -10,7 +10,7 @@ import numpy as np import pandas as pd -from dask._task_spec import Alias, DataNode, Task, TaskRef, execute_graph +from dask._task_spec import Alias, DataNode, Dict, Task, TaskRef, execute_graph from dask.array import Array from dask.core import flatten from dask.dataframe import methods From d265b37331afaa8e9a2d752ab291cc1c8ea5904c Mon Sep 17 00:00:00 2001 From: fjetter Date: Wed, 13 Nov 2024 17:55:06 +0100 Subject: [PATCH 6/6] fix lint --- dask_expr/_expr.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dask_expr/_expr.py b/dask_expr/_expr.py index f1105d58..3a42d3d9 100644 --- a/dask_expr/_expr.py +++ b/dask_expr/_expr.py @@ -10,7 +10,7 @@ import numpy as np import pandas as pd -from dask._task_spec import Alias, DataNode, Dict, Task, TaskRef, execute_graph +from dask._task_spec import Alias, DataNode, Task, TaskRef, execute_graph from dask.array import Array from dask.core import flatten from dask.dataframe import methods