Skip to content

Use dask.compute instead of get_sync #588

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed

Conversation

fjetter
Copy link

@fjetter fjetter commented May 5, 2025

Closes dask/dask#11927

There is no reason to use the low level API get_sync. We're refactoring interfaces around HLGs and I can't guarantee that these intermediate interfaces stay stable for HLGs.

This is forwards and backwards compatible

@martindurant
Copy link
Collaborator

Well, it runs at least.

@pfackeldey @ikrommyd , any chance this is because of the awkward release rather than dask?

@pfackeldey
Copy link
Collaborator

Thanks @fjetter for the fix!

The failing CI looks like it's something with the latest awkward release. I'll look into it 👍

@pfackeldey
Copy link
Collaborator

I'm not sure what's going on here in the tests. I'm having these failures even with an older awkward version. and those tests were passing before. Is there any chance that this is related to using get_sync instead of dask.compute? Do you know why we used get_sync before @martindurant ?

@martindurant
Copy link
Collaborator

martindurant commented May 5, 2025

It was the recommended way to "get set of keys out of a graph" ( https://docs.dask.org/en/stable/scheduler-overview.html?highlight=get#the-get-function - still there), whereas compute() was designed for "dask collections", which I would not have thought includes HLGs. I haven't seen this style of invocation before, which doesn't appear to match the signature

dask.base.compute(*args, traverse=True, optimize_graph=True, scheduler=None, get=None, **kwargs)

In addition, compute() would by default run optimize, which we are in the middle of at this line - but what that does to a HLG (as opposed to the dak collection(s)), I don't know.

@fjetter
Copy link
Author

fjetter commented May 5, 2025

It was the recommended way to "get set of keys out of a graph"

Well, HLGs are weird beasts.

@ikrommyd
Copy link
Contributor

ikrommyd commented May 5, 2025

@martindurant from looking at it locally as well, the optimization doesn't work correctly with this dask.compute and all the errors are because we're hitting placeholders and/or unknown_length during compute. I'm getting the errors with older verions of awkward as well. It seems like this dask.compute is not doing what get_sync used to do one to one.

@pfackeldey
Copy link
Collaborator

@martindurant suggested to try wrapping the HLG in a "dummy" collection, which I tried using:

diff --git a/pyproject.toml b/pyproject.toml
index c7ea29d..fca871f 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -37,7 +37,7 @@ classifiers = [
 ]
 dependencies = [
   "awkward >=2.5.1",
-  "dask >=2023.04.0,<2025.4.0",
+  "dask >=2023.04.0",
   "cachetools",
   "typing_extensions >=4.8.0",
 ]
diff --git a/src/dask_awkward/lib/optimize.py b/src/dask_awkward/lib/optimize.py
index b01869d..62ee439 100644
--- a/src/dask_awkward/lib/optimize.py
+++ b/src/dask_awkward/lib/optimize.py
@@ -40,6 +40,20 @@ https://dask-awkward.readthedocs.io/en/stable/more/faq.html

 """

+class Appeaser:
+    def __init__(self, hlg, keys):
+        self.hlg = hlg
+        self.keys = keys
+
+    def __dask_graph__(self):
+        return self.hlg
+
+    def __dask_keys__(self):
+        return self.keys
+
+    def __dask_postcompute__(self):
+        return lambda x: x, ()
+

 def all_optimizations(dsk: Mapping, keys: Sequence[Key], **_: Any) -> Mapping:
     """Run all optimizations that benefit dask-awkward computations.
@@ -146,7 +160,7 @@ def _prepare_buffer_projection(
     try:
         for layer in hlg.layers.values():
             layer.__dict__.pop("_cached_dict", None)
-        results = dask.compute(hlg, list(minimal_keys), scheduler="sync")
+        results = dask.compute(Appeaser(hlg=hlg, keys=list(minimal_keys)), scheduler="sync", optimize_graph=False)

Unfortunately, this still "hangs indefinitely". I have the feeling that this is connected to the switch to the new Exprs, i.e.: https://github.com/dask/dask/blob/main/dask/base.py#L658, as the get_sync will be called eventually by dask.compute as well as far as I understand: https://github.com/dask/dask/blob/main/dask/base.py#L681.

So unfortunately, we still don't have a working version or an idea how we can fix this, is there any chance you could have a look again at this @fjetter ?

@pfackeldey
Copy link
Collaborator

I should add that something seems odd with the Expr that is created - some bound methods are not working (as expected?):

from dask.base import unpack_collections, collections_to_expr

collection = Appeaser(hlg=hlg, keys=list(minimal_keys))
collections, repack = unpack_collections(collection, traverse=False)
expr = collections_to_expr(collections, False)
print(expr)
# ExprSequence(HLGExpr(dsk=HighLevelGraph with 8 layers.
# <dask.highlevelgraph.HighLevelGraph object at 0x10bca8b80>
#  0. from-json-files-3aa8b66c618a08f85ed59dea80a9a365
#  1. points-3764f6b6a11fe3526281e3f09eb81fba
#  2. with-name-cf86d5baf1617093b367fe26169a4372
#  3. from-json-files-7da845b8d371f02fab1594c5931c0d74
#  4. points-ea4c37ddaad9cb5e38ae97f614feaed8
#  5. with-name-fa8fcb5449e4f43f9c5ce34b2a72fe34
#  6. distance-296077b9f4e8e28269f53fd0b8637520
#  7. finalize-hlgfinalizecompute-149928c7ba374697a7edff585578d91d
# , low_level_optimizer=None, output_keys=['finalize-hlgfinalizecompute-149928c7ba374697a7edff585578d91d'], # postcompute=(<function Appeaser.__dask_postcompute__.<locals>.<lambda> at 0x10f17c9d0>, ()), # _cached_optimized=None))

# the following methods are broken:
expr.simplify()
# ... dask/_expr.py:208, in Expr.operand(self, key)
#     205 def operand(self, key):
#     206     # Access an operand unambiguously
#     207     # (e.g. if the key is reserved by a method/property)
# --> 208     return self.operands[type(self)._parameters.index(key)]
# ValueError: 'name' is not in list

expr.optimize()
# ... dask/_expr.py:208, in Expr.operand(self, key)
#     205 def operand(self, key):
#     206     # Access an operand unambiguously
#     207     # (e.g. if the key is reserved by a method/property)
# --> 208     return self.operands[type(self)._parameters.index(key)]
# ValueError: 'name' is not in list

expr.pprint()
# E
# x
# p
# r
# S
# e
# q
# u
# e
# n
# c
# e
# (
# H
# L
# G
# ...

expr.tree_repr()
# "E\nx\np\nr\nS\ne\nq\nu\ne\nn\nc\ne\n...

@fjetter
Copy link
Author

fjetter commented May 8, 2025

Yeah, so the dask.compute only worked in this specific example because it was just not doing anything. I didn't verify that, sorry.

The problem is that the graph that is being computed here is corrupt.

image

There is a finalize task that assumes dependencies that don't exist. The sync scheduler is rather stupid and apparently doesn't verify something essential as this and instead of raising it just hangs because there are no "ready" tasks.

The problem is that the optimizations you are using here are rewriting layers in a way dask doesn't expect.

The change in dask that is breaking this is that dask now generates the postcompute task before the optimization runs. This adds a MaterializedLayer to your graph that your optimizer isn't handling well (it also doesn't account for the change in keys). I don't understand what your optimizer is doing here and I believe I won't be able to submit a fix.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

CI tests using dask v2025.4.0 (or newer) hang indefinitely
4 participants