Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] dask-cudf, map_partitions(), ValueError: The columns in the computed data do not match the columns in the provided metadata #14109

Closed
rlratzel opened this issue Sep 13, 2023 · 15 comments
Labels
bug Something isn't working

Comments

@rlratzel
Copy link
Contributor

rlratzel commented Sep 13, 2023

Describe the bug
cuGraph has a case where multiple graph objects, created one after the other, result in an error when the amount of data used is small enough to result in empty partitions. The error seems to occur when a map_partitions() call is used to copy the dataframe in each partition, but it's still unknown if that call leads to the root cause.
The error is shown in the mre_output.txt attached file below. In the MRE, after the first "graph" object is created, a subsequent call to create another fails with:

ValueError: The columns in the computed data do not match the columns in the provided metadata
  Extra:   ['src', 'dst', 'value']
  Missing: []

The code, which is based on actual cugraph code to create a graph object, creates a copy of the input dask DataFrame, modifies it (which includes copying and dropping columns), then saves it for use later. It appears that the second call is somehow referencing DataFrames modified by the first call, resulting in the error.

Steps/Code to reproduce bug
An attempt at a minimal reproducer is attached to this issue. It is based on code used in cuGraph, but I suspect it is not truly minimal. We will continue to work on minimizing it, but we'll provide it now to hopefully get the investigation started.

To run the minimal reproducer, simply run CUDA_VISIBLE_DEVICES=0 python dask_cudf_mre.py. We've been running it restricted to a single GPU to simulate the CI environment where the failure is occurring.

mre_output.txt

Expected behavior
The reproducer script should run to completion without an exception being raised.
This can be seen when any of the following are changed in the script:

  • Remove this call: ddf = ddf.map_partitions(lambda df: df.copy())
  • Replace the call to map_paritions() above with ddf = ddf.copy() NOTE: for some reason, this does not help when done in cugraph.
  • Add a call to explicitly delete the G1 object (del G1) before creating G2
  • Create the input dataframe from a cudf DataFrame instead of reading a file from disk.

The expected output is also attached. It was generated by adding del G1 as described above.
mre_expected_output.txt

Environment overview (please complete the following information)
conda environment with dask 2023.6.0 or later

Environment details

(rapids) root@bfd6e155776f:/Projects/exp# conda list dask
# packages in environment at /opt/conda/envs/rapids:
#
# Name                    Version                   Build  Channel
dask                      2023.6.0                 pypi_0    pypi
dask-core                 2023.3.2           pyhd8ed1ab_0    conda-forge
dask-cudf                 23.8.0                   pypi_0    pypi
dask-glm                  0.2.0                      py_1    conda-forge
dask-labextension         6.1.0              pyhd8ed1ab_0    conda-forge
dask-ml                   2023.3.24          pyhd8ed1ab_1    conda-forge
raft-dask                 23.8.0                   pypi_0    pypi
(rapids) root@bfd6e155776f:/Projects/exp# conda list distributed
# packages in environment at /opt/conda/envs/rapids:
#
# Name                    Version                   Build  Channel
distributed               2023.6.0                 pypi_0    pypi

dask_cudf_mre.py

cc @VibhuJawa @jnke2016

@rlratzel rlratzel added bug Something isn't working Needs Triage Need team to review and classify labels Sep 13, 2023
@quasiben
Copy link
Member

@rlratzel can you confirm this work working prior to 2023.06.0 ?

@quasiben
Copy link
Member

The code, which is based on actual cugraph code to create a graph object, creates a copy of the input dask DataFrame, modifies it (which includes copying and dropping columns), then saves it for use later. It appears that the second call is somehow referencing DataFrames modified by the first call, resulting in the error.

@rlratzel can you also tell us why cuGraph does this ? copying a dask graph seems like an odd thing to do from a Dask perspective

@rlratzel
Copy link
Contributor Author

@rlratzel can you also tell us why cuGraph does this ? copying a dask graph seems like an odd thing to do from a Dask perspective

@VibhuJawa can add more detail. I think my choice of the word graph was confusing here. The goal is to copy the underlying dataframe, which we use as part of a cugraph graph (not a dask graph).

@quasiben
Copy link
Member

In a call @rjzamora suggested using token in the map_partitions call to help with clean up issues. We think this will fix the issue

ddf_copy = ddf_copy.map_partitions(lambda df: df.copy(), token='custom-'+str(random.random()))

@quasiben
Copy link
Member

I think this is now resolved, if not, let's open it up again

@rlratzel
Copy link
Contributor Author

Looks like even after specifying a unique token, we're unfortunately still seeing the same transient failure. I'll re-open this for now.

Here's the cugraph PR with the change, and here's the log showing the failure.

We're going to look more closely at the MRE and see if we can reproduce the problem locally again with the token specified (run it repeatedly in a loop?), then possibly look into some of the other suggestions mentioned offline.

@rlratzel rlratzel reopened this Sep 22, 2023
@quasiben
Copy link
Member

@rlratzel I don't see where in that PR a unique token is defined

@rlratzel
Copy link
Contributor Author

@rlratzel I don't see where in that PR a unique token is defined

Sorry, it looks like @naimnv reverted those changes and removed some other optimizations we were debugging with you in order to get CI to pass. Here's the two commits made based on our call that use the unique token: here and here.

@naimnv - can you restore your PR to the state where it has the above commits so we can continue to debug it?
@jnke2016 - can we have a separate PR that does not explicitly delete the tmp copied dataframe (as Naim did) so we can get CI passing again? This isn't what we ultimately want, but at least our MG use cases won't crash.

@quasiben
Copy link
Member

+1 to building a separate PR -- very helpful in focusing the discussion

@naimnv
Copy link

naimnv commented Sep 25, 2023

@rlratzel
I had a discussion with Joseph. As Joseph will continue debugging the problem, I find is easier that Jospeh does so on his own PR rather than mine, and that my PR is closed with changes to save us from crash and successful CI for MG tests.
cc @jnke2016

@jnke2016
Copy link

jnke2016 commented Sep 25, 2023 via email

@rlratzel
Copy link
Contributor Author

To recap, there's two PRs in the discussion: one that applies a fix for this issue and allows us to keep certain optimizations (we'll call this PR A), and another that removes the optimizations so we can pass CI tests (we'll call this PR B).

We want PR A merged because it (should?) fix a bug that allows us to use optimizations which result in less memory usage. We applied those changes but still observed CI failures, despite seeing the MRE passing. Some updates were made to PR A and it eventually passed CI and it was merged. However, due to the transient nature of the bug, we're going to monitor CI for other PRs to see if the issue shows up again, and apply PR B if it does. If we have to use PR B, we'll re-open this issue and continue to debug.

I'm not confident our MRE is reproducing the problem, but hopefully the coverage provided by CI will help us determine if the fix in PR A is adequate or not while we're still in development.

I'll close this again and re-open if we need to apply PR B. If that happens, we'll create a separate PR that re-applies PR A so we can debug separately.

@rlratzel
Copy link
Contributor Author

rlratzel commented Sep 26, 2023

Despite closing the issue, I'm worried that there's still a bug somewhere in dask-cudf or that it's allowing us to use it improperly. It seems like we shouldn't need to use token=, and I'm nervous that we possibly just reduced the chances of hitting the problem vs. eliminated it. @quasiben what do you think?

@quasiben
Copy link
Member

I don't think this is a bug in dask or dask-cudf as copy data out/manipulating data in place is not really a fully supported option within dask. In the case of dask-cudf looks like in the process of doing this there are still lingering key names which don't properly get cleaned up. @rjzamora any additional thoughts ?

@rjzamora
Copy link
Member

@rjzamora any additional thoughts ?

I think I would need to look carefully through the cugraph code to say if I think there is a bug somewhere in dask. Once you start interacting with the client and workers directly, you are not really using dask.dataframe (or dask-cudf) in a simple way, and so there are probably many places where things can go sideways. Either way, it does seem reasonable to work out a clear API for creating a "unique view" of a dask collection for cases like this. We can also do a better job of documenting best practices for workflows that need to operate on data in place.

@bdice bdice removed the Needs Triage Need team to review and classify label Mar 4, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
Archived in project
Development

No branches or pull requests

6 participants