Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wip: use distributed.Queue as storage for metadata poc #305

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

lr4d
Copy link
Collaborator

@lr4d lr4d commented Jun 26, 2020

With the intention of improving resilience,

instead of storing the intermediate results (partition metadata) on dask workers we could submit those to a more central instance (an event bus or even simpler the dask scheduler)
this way the jobs would be safe from worker failures

cc @fjetter

@lr4d
Copy link
Collaborator Author

lr4d commented Jun 26, 2020

cc @marco-neumann-jdas @pacman82 in case you want to take a look at this

@marco-neumann-by
Copy link
Contributor

What kind of resilience do you get here? Against which exact failure scenario does this change help?

c = get_client() # noqa
metadata_store = metadata_storage_factory()
for _mp in mp.metapartitions:
metadata_store.put(_mp)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this task might get retried. Do you do any de-duplication of the results in the end?

Copy link
Collaborator Author

@lr4d lr4d Jul 7, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point

  • prevent duplicates

@lr4d
Copy link
Collaborator Author

lr4d commented Jul 7, 2020

What kind of resilience do you get here? Against which exact failure scenario does this change help?

This would help when a worker which has stored the data dies while holding the metadata, and would imply re-computing all dependencies for that data. Since the metadata is quite small, we can just move all metadata objects to a central place (i.e. the scheduler) to avoid having to re-compute the graph after data has been stored
An example when this can occur is at the last part of a "dataset update" graph, when most workers have already stored the data and the cluster may be getting downscaled because most workers are no longer necessary. However, removing a worker which is idle but holds metadata would imply re-computing all dependencies for that data, whereas with this implementation, the cluster could be downscaled up to a single worker (which would make the final metadata commit) without issues

@marco-neumann-by
Copy link
Contributor

This would help when a worker which has stored the data dies while holding the metadata, and would imply re-computing all dependencies for that data.

How does that work then? Dask doesn't know that you're bypassing its dependency system, so it will re-trigger the computation. And you don't know that all you can safely abort that and run some kind of manual collect+store operation w/o looking very carefully at the scheduler dashboard, because you cannot know that all dependencies where executed at least once. In case the cluster start "flapping", you don't know which parts of the dependency chain are recomputed because the got lost and which parts did not get computed in the first place. And every manual guess makes very strong assumption about the scheduling logic (which IIRC is an implementation detail of dask/distributed) and risks data loss.

@lr4d
Copy link
Collaborator Author

lr4d commented Jul 7, 2020

How does that work then? Dask doesn't know that you're bypassing its dependency system, so it will re-trigger the computation.

I remember this coming up when I started to experiment on this but I guess I didn't address that very properly here. I'll have a look when I have some spare time. Appreciate your feedback

@fjetter
Copy link
Collaborator

fjetter commented Jul 10, 2020

How does that work then? Dask doesn't know that you're bypassing its dependency system, so it will re-trigger the computation.

This could only be addressed by letting every task check whether a key/result was already logged on the scheduler. This way, the task would be rescheduled but it would be a "no-op"

We may also raise this as an issue in distributed. If this is nicely integrated in the scheduler that would be much better, of course. I can see various arguments pro/con so this will probably trigger a small debate. There are tasks in dask/distributed itself, though, which would benefit. Prime example is the parquet storage system of dask itself...

OTOH, this topic occasionally pops up and the "dask way" of dealing with this would probably be a duplication of results across the cluster but that doesn't properly work atm

@lr4d
Copy link
Collaborator Author

lr4d commented Jul 10, 2020

I got this working in a hacky way by calling compute on the tasks before the final metadata commit, thereby bypassing the dask dependency management

i.e. we first execute one graph which includes everything up until the metadata is at the distributed.Queue
image
And, finally, we perform the metadata commit with the information in the queue

@marco-neumann-by
Copy link
Contributor

Instead of adding more complexity to the payload code with store interactions, retries, no-op checks etc, could we rather implement this as some kind of graph rewrite?

@lr4d
Copy link
Collaborator Author

lr4d commented Jul 14, 2020

could we rather implement this as some kind of graph rewrite

I guess that may be the "cleanest" given the current state of distributed, and probably shouldn't be too difficult

Ideally, though, I would see this kind of functionality properly supported in distributed, as @fjetter mentions

We may also raise this as an issue in distributed. If this is nicely integrated in the scheduler that would be much better, of course

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

Successfully merging this pull request may close these issues.

3 participants