Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Experimental] Optimize cudf/dask-cudf read_parquet for s3/remote filesystems #9225

Closed
wants to merge 22 commits into from

Conversation

rjzamora
Copy link
Member

Background

The current versions of cudf.read_parquet and dask_cudf.read_parquet are poorly optimized for remote storage (e.g. s3 and gcs). As discussed in #7475, this is because cudf and dask-cudf use fsspec as a universal file-system adapter, and libcudf's parquet logic cannot call into a python-based fsspec file-system object to seek/read specific byte ranges from the remote file. For this reason, cudf/dask-cudf currently call read to copy all contents of each parquet file into a local memory buffer before passing that buffer to libcudf. This is okay if the process calling read_parquet intends to read the entire file, and the file is reasonably small. However, there are other common cases:

  • CASE 1 - The process is selecting only a subset of parquet column-chunks: If the user has specified a specific set of columns and/or row-groups in the read_parquet call, then it is clearly inefficient to copy the entire file to host memory.
  • CASE 2 - The parquet file is large: If the file is large, a concurrent gather operation will most-likely outperform a vanilla read operation.

Changes in This PR

This PR builds upon the cpp/cython changes in #8961 to enable the creation and/or processing of Arrow NativeFile objects in cudf.read_parquet. Since Arrow-backed FileSystem definitions do not exist for all remote file-systems (e.g. GCS), this PR also optimizes the transfer of data from remote storage to host memory with Fsspec (taking advantage of the same optimization targeted in fsspec#744. More specifically, we use a local "dummy buffer", and avoid transferring any data that is not actually required by the underlying libcudf parquet read. We also use a concurrent read operation to transfer the bytes of the file in parallel.

Although the fsspec optimization was originally intended as a "temporary" solution for GCS, it is actually more performant (and stable) than the Arrow-NativeFile approach. The only disadvantage of using fsspec is that we still need enough host memory to store the entire parquet file (even if we do not actually populate most of the buffer with remote data).

Experimental API

We introduce the arrow_filesystem= option to both the cudf and dask-cudf read_parquet APIs. This argument is a boolean, with a default value of False. It determines whether a url-based path input should be used to infer an Arrow-based filesystem object. If url-based file-sytem inference fails, both cudf and dask-cudf will fall back to fsspec for file-system handling. We also introduce a legacy_transfer= option (default False) to allow the user to avoid the optimized data-transfer logic in the case that fsspec is used.

Default APIs

df = cudf.read_parquet(<path-or-handle>, arrow_filesystem=False, legacy_transfer=False)
ddf = dask_cudf.read_parquet(<path-or-handle>, arrow_filesystem=False, legacy_transfer=False)

API Notes

  • arrow_filesystem: Setting this value to True tells cudf/dask-cudf to try to infer an arrow-based filesystem object that will enable random access in libcudf. When the underlying parquet file can be opened as an arrow NativeFile, cudf no longer needs to copy the data into a local host buffer before calling down to libcudf (because libcudf can seek/read from the arrow file object directly).
    • Using arrow_filesystem=True currently works in most cases, but does not perform as well as the optimized fsspec data transfer, and still fails in some cases (especially in Dask).
  • legacy_transfer: Setting this value to True will avoid the new fsspec data-transfer optimization. The option is only included for debugging and comparison, and may be removed before this PR is ready to merge.

@rjzamora rjzamora added 2 - In Progress Currently a work in progress Python Affects Python cuDF API. dask Dask issue Cython labels Sep 13, 2021
@github-actions github-actions bot added the libcudf Affects libcudf (C++/CUDA) code. label Sep 13, 2021
@rjzamora rjzamora added the non-breaking Non-breaking change label Sep 13, 2021
@rjzamora
Copy link
Member Author

rjzamora commented Sep 13, 2021

This work is related to dask#8132 and NVT#1088

Comment on lines 1619 to 1625
# We have an fsspec filesystem and a path
with fs.open(path_or_fob, mode="rb", cache_type="none") as fob:
fob.seek(offset)
local_buffer[offset : offset + nbytes] = np.frombuffer(
fob.read(nbytes), dtype="b",
)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@leiterenato - Perhaps you can comment on the most optimal API to read a specific set of bytes in gcs?

Hopefully, we can add those optimizations directly to gcsfs so that a simple fs.read_block(...) call would be optimal here. Note that I am using seek/read for now, since read_block will actually open the file with read-ahead caching, and then call seek/read.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rjzamora
Currently the tool gcloud alpha storage cp has the most optimized implementation.
Download link.
The source code is in this directory: lib/googlecloudsdk/command_lib/storage/tasks/cp/.
Blog post with more information.

Copy link

@martindurant martindurant Sep 20, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gcsfs supports reading the single block using cat, possibly for multiple blocks in multiple files concurrently. We just need to push fsspec/filesystem_spec#744 over the line for it to be available in all async implementations.
(the method is available for all backends, but, of course, not concurrent if the implementation is not async)

@jorisvandenbossche
Copy link

Although the fsspec optimization was originally intended as a "temporary" solution for GCS, it is actually more performant (and stable) than the Arrow-NativeFile approach. T

Question here: does the libcudf parquet reader do equivalent byte range optimization logic (based on the parquet metadata, which columns/row groups to read, etc) as you implemented here in Python in the "Fsspec Data-transfer Optimization Code" ?
(since you are comparing performance / seeing a difference. I know that the Parquet reader in Arrow C++ has similar optimizations, but I think that's not being used here, except for reading the metadata)

@rjzamora
Copy link
Member Author

Question here: does the libcudf parquet reader do equivalent byte range optimization logic (based on the parquet metadata, which columns/row groups to read, etc) as you implemented here in Python in the "Fsspec Data-transfer Optimization Code" ?

I'm not completely sure what optimizations libcudf uses for data access, but it will certainly use partial IO and should try to minimize how much data is read from disk (cc @devavret and @vuule in case they have input here). It seems like you are implying that the Arrow-NativeFile approach should have similar (if not better) performance than fsspec if the backend is using the NativeFile efficiently. If so, I agree with you :)

@devavret
Copy link
Contributor

does the libcudf parquet reader do equivalent byte range optimization logic (based on the parquet metadata, which columns/row groups to read, etc)

libcudf has the options to specify columns and rowgroups and only reads the ones selected.

@jorisvandenbossche
Copy link

It seems like you are implying that the Arrow-NativeFile approach should have similar (if not better) performance than fsspec if the backend is using the NativeFile efficiently. If so, I agree with you :)

Yes, and so my question was to try to understand what would be the cause (and thus where potential improvements could be made): are there things we can improve in the FileSystem/RandomAccessFile interface (in Arrow), or is it because the Parquet reader in libcudf can do more optimizations in what it asks from the file? (eg it might not do all optimizations as you now implemented in python for fsspec, and then it's not necessarily the filesystem interface (fsspec or arrow) that's the cause for a difference in performance).

libcudf has the options to specify columns and rowgroups and only reads the ones selected.

Just to point out that "only reads the ones selected" can be a bit ambiguous (not knowing the library): only deserializing the requested columns/row groups from parquet into libcudf data structures vs actually only downloading he subet of bytes of the file that are needed to deserialize the columns/row groups.

@devavret
Copy link
Contributor

Just to point out that "only reads the ones selected" can be a bit ambiguous (not knowing the library): only deserializing the requested columns/row groups from parquet into libcudf data structures vs actually only downloading he subet of bytes of the file that are needed to deserialize the columns/row groups.

Apologies, I see why that would be ambiguous. What I meant was that libcudf does not depend on the entire file's contents being available to it. We have a datasource class that can be used to make your own classes that implement a read(offset, size, *dst) method. As long as your custom datasource can understand this and copy the result in *dst, it works.

@rjzamora rjzamora added the improvement Improvement / enhancement to an existing function label Sep 15, 2021
rapids-bot bot pushed a commit that referenced this pull request Sep 22, 2021
This PR strips the pyarrow-NativeFile component out of #9225 (since those changes are not yet stable).  I feel that it is reasonable to start by merging these fsspec-specific optimizations for 21.10, because they are stable and already result in a significant performance boost over the existing approach to remote storage. I still think it is very important that we eventually plumb NativeFile support into python (cudf and dask_cudf), but we will likely need to target 21.12 for that improvement.

Authors:
  - Richard (Rick) Zamora (https://github.com/rjzamora)

Approvers:
  - Ashwin Srinath (https://github.com/shwina)
  - Benjamin Zaitlen (https://github.com/quasiben)

URL: #9265
@codecov
Copy link

codecov bot commented Sep 22, 2021

Codecov Report

Merging #9225 (587ee5b) into branch-21.12 (ab4bfaa) will increase coverage by 0.04%.
The diff coverage is 0.00%.

Impacted file tree graph

@@               Coverage Diff                @@
##           branch-21.12    #9225      +/-   ##
================================================
+ Coverage         10.79%   10.83%   +0.04%     
================================================
  Files               116      116              
  Lines             18869    19260     +391     
================================================
+ Hits               2036     2087      +51     
- Misses            16833    17173     +340     
Impacted Files Coverage Δ
python/cudf/cudf/core/series.py 0.00% <0.00%> (ø)
python/cudf/cudf/io/csv.py 0.00% <0.00%> (ø)
python/cudf/cudf/io/orc.py 0.00% <0.00%> (ø)
python/cudf/cudf/utils/ioutils.py 0.00% <0.00%> (ø)
python/cudf/cudf/io/hdf.py 0.00% <0.00%> (ø)
python/cudf/cudf/_version.py 0.00% <0.00%> (ø)
python/cudf/cudf/core/abc.py 0.00% <0.00%> (ø)
python/cudf/cudf/api/types.py 0.00% <0.00%> (ø)
python/cudf/cudf/io/dlpack.py 0.00% <0.00%> (ø)
... and 50 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 2718443...587ee5b. Read the comment docs.

@jorisvandenbossche
Copy link

libcudf does not depend on the entire file's contents being available to it. We have a datasource class that can be used to make your own classes that implement a read(offset, size, *dst) method. As long as your custom datasource can understand this and copy the result in *dst, it works.

@devavret thanks for the clarification. One other aspect to point out is that the optimizations that @rjzamora added here (moved to #9265 now, I think) go further than just reading the required bytes. For example, it will also merge small / adjacent ranges to read to decrease the number of requests.

@devavret
Copy link
Contributor

it will also merge small / adjacent ranges to read to decrease the number of requests

We already have that

// Transfer chunk data, coalescing adjacent chunks

Although we have plans to move this logic out of format specific readers and into a general reader class that will look for these optimizations.

@jorisvandenbossche
Copy link

OK, good to know. In that case, it would actually be interesting to understand where the performance difference between libcudf and fsspec comes from (since both should be doing similar optimizations then).

@martindurant
Copy link

we have plans to move this logic out of format specific readers

It's actually something that fsspec would appreciate! Could be upstreamed?

@devavret
Copy link
Contributor

it would actually be interesting to understand where the performance difference between libcudf and fsspec comes from

One reason is that the fsspec optimization added in #9265 only deals with the reading from file to host memory whereas the optimization in libcudf is primarily to coalesce transfers from host memory to GPU memory.

The fsspec optimization only kicks in when the file is not local, in which case it reads the data into host memory and passes to libcudf. In this case, the benefits from #9265 and libcudf are additive.

In case of a local filesystem, libcudf effectively does both the reads (disk -> host, host -> device) and the coalescing is the same for both transfers.

@rjzamora rjzamora changed the base branch from branch-21.10 to branch-21.12 September 24, 2021 15:17
rapids-bot bot pushed a commit that referenced this pull request Oct 4, 2021
…csv in cudf (#9304)

This PR implements a simple but critical subset of the the features implemented and discussed in #8961 and #9225. Note that I suggest those PRs be closed in favor of a few simpler PRs (like this one).

**What this PR DOES do**:

- Enables users to pass Arrow-based file objects directly to the cudf `read_parquet` and `read_csv` functions. For example:

```python
import cudf
import pyarrow.fs as pa_fs

fs, path = pa_fs.FileSystem.from_uri("s3://my-bucket/some-file.parquet")
with fs.open_input_file(path) as fil:
    gdf = cudf.read_parquet(fil)
```

- Adds automatic conversion of fsspec `AbstractBufferedFile` objects into Arrow-backed `PythonFile` objects. For `read_parquet`, an Arrow-backed `PythonFile` object can be used (in place of an optimized fsspec transfer) by passing `use_python_file_object=True`:

```python
import cudf

gdf = cudf.read_parquet(path, use_python_file_object=True)
```

or 

```python
import cudf
from fsspec.core import get_fs_token_paths

fs = get_fs_token_paths(path)[0]
with fs.open(path, mode="rb") as fil:
    gdf = cudf.read_parquet(fil, use_python_file_object=True)
```


**What this PR does NOT do**:

- cudf will **not** automatically produce "direct" (e.g. HadoopFileSystem/S3FileSystem-based) Arrow NativeFile objects for explicit file-path input. It is still up to the user to create/supply a direct NativeFile object to read_csv/parquet if they do not want any python overhead.
- cudf will **not** accept NativeFile input for IO functions other than read_csv and read_parquet
- dask-cudf does not yet have a mechanism to open/process s3 files as "direct" NativeFile objects - Those changes only apply to direct cudf usage


Props to @shridharathi for doing most of the work for this in #8961 (this PR only extends that work to include parquet and add tests).

Authors:
  - Richard (Rick) Zamora (https://github.com/rjzamora)

Approvers:
  - Charles Blackmon-Luca (https://github.com/charlesbluca)
  - Vyas Ramasubramani (https://github.com/vyasr)
  - Jake Hemstad (https://github.com/jrhemstad)

URL: #9304
@rjzamora
Copy link
Member Author

This PR was effectrively replaced with #9265, #9377, #9304 and #9376

@rjzamora rjzamora closed this Oct 18, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
2 - In Progress Currently a work in progress dask Dask issue improvement Improvement / enhancement to an existing function libcudf Affects libcudf (C++/CUDA) code. non-breaking Non-breaking change Python Affects Python cuDF API.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants