Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test CI #153

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open

Test CI #153

wants to merge 3 commits into from

Conversation

jrbourbeau
Copy link
Member

No description provided.

@jrbourbeau
Copy link
Member Author

Hmm I can't seem to get CI running with the latest dask=2024.12.0 release. @rsignell and I ran into rechunker errors using the latest dask release over in coiled/feedback#308. Rolling back one release to dask=2024.11.2, things are working fine for me. I see the same thing when running the rechunker test suite locally.

From what I can tell the errors stem from manually constructing task graphs (in particular using blockwise -- cc @fjetter for visibility). This diff seems to fix things

diff --git a/rechunker/executors/dask.py b/rechunker/executors/dask.py
index ee75081..347ec66 100644
--- a/rechunker/executors/dask.py
+++ b/rechunker/executors/dask.py
@@ -6,6 +6,7 @@ import dask
 from dask.blockwise import BlockwiseDepDict, blockwise
 from dask.delayed import Delayed
 from dask.highlevelgraph import HighLevelGraph
+from dask._task_spec import TaskRef, Task

 from rechunker.types import ParallelPipelines, Pipeline, PipelineExecutor

@@ -67,7 +68,7 @@ def _make_pipeline(pipeline: Pipeline) -> Delayed:
         if stage.mappable is None:
             stage_key = append_token(stage.name, token)
             func = wrap_standalone_task(stage.function)
-            layers[stage_key] = {stage_key: (func, config_key, prev_key)}
+            layers[stage_key] = {stage_key: Task(stage_key, func, TaskRef(config_key), TaskRef(prev_key))}
             dependencies[stage_key] = {config_key, prev_key}
         else:
             func = wrap_map_task(stage.function)
@@ -79,9 +80,9 @@ def _make_pipeline(pipeline: Pipeline) -> Delayed:
                 BlockwiseDepDict({(i,): x for i, x in enumerate(stage.mappable)}),
                 # ^ this is extra annoying. `BlockwiseDepList` at least would be nice.
                 "x",
-                config_key,
+                TaskRef(config_key),
                 None,
-                prev_key,
+                TaskRef(prev_key),
                 None,
                 numblocks={},
                 # ^ also annoying; the default of None breaks Blockwise
@@ -89,7 +90,8 @@ def _make_pipeline(pipeline: Pipeline) -> Delayed:
             dependencies[map_key] = {config_key, prev_key}

             stage_key = f"{stage.name}-checkpoint-{token}"
-            layers[stage_key] = {stage_key: (checkpoint, *map_layer.get_output_keys())}
+            keys_ = [TaskRef(k) for k in map_layer.get_output_keys()]
+            layers[stage_key] = {stage_key: Task(stage_key, checkpoint, *keys_)}
             dependencies[stage_key] = {map_key}
         prev_key = stage_key

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant