Skip to content

Commit

Permalink
Support Arrow NativeFile and PythonFile for remote ORC storage (#9377)
Browse files Browse the repository at this point in the history
This is a follow-up to #9304, and is more-or-less the ORC version of #9376

These changes will enable partial IO to behave "correctly" for `cudf.read_orc` from remote storage. Simpe multi-stripe file example:

```python
# After this PR
%time gdf = cudf.read_orc(orc_path, stripes=[0], storage_options=storage_options)
CPU times: user 579 ms, sys: 166 ms, total: 744 ms
Wall time: 2.38 s

# Before this PR
%time gdf = cudf.read_orc(orc_path, stripes=[0], storage_options=storage_options)
CPU times: user 3.9 s, sys: 1.47 s, total: 5.37 s
Wall time: 8.5 s
```

Authors:
  - Richard (Rick) Zamora (https://github.com/rjzamora)

Approvers:
  - Charles Blackmon-Luca (https://github.com/charlesbluca)

URL: #9377
  • Loading branch information
rjzamora authored Oct 7, 2021
1 parent c6bc111 commit f4ff454
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 3 deletions.
10 changes: 10 additions & 0 deletions python/cudf/cudf/_lib/orc.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ from cudf._lib.cpp.io.types cimport (
)
from cudf._lib.cpp.table.table_view cimport table_view
from cudf._lib.cpp.types cimport data_type, size_type, type_id
from cudf._lib.io.datasource cimport NativeFileDatasource
from cudf._lib.io.utils cimport (
make_sink_info,
make_source_info,
Expand All @@ -53,6 +54,8 @@ from cudf._lib.utils cimport (
table_view_from_table,
)

from pyarrow.lib import NativeFile

from cudf._lib.utils import _index_level_name, generate_pandas_metadata
from cudf.api.types import is_list_dtype, is_struct_dtype

Expand All @@ -66,6 +69,10 @@ cpdef read_raw_orc_statistics(filepath_or_buffer):
cudf.io.orc.read_orc_statistics
"""

# Handle NativeFile input
if isinstance(filepath_or_buffer, NativeFile):
filepath_or_buffer = NativeFileDatasource(filepath_or_buffer)

cdef raw_orc_statistics raw = (
libcudf_read_raw_orc_statistics(make_source_info([filepath_or_buffer]))
)
Expand Down Expand Up @@ -209,6 +216,9 @@ cdef orc_reader_options make_orc_reader_options(
object decimal_cols_as_float
) except*:

for i, datasource in enumerate(filepaths_or_buffers):
if isinstance(datasource, NativeFile):
filepaths_or_buffers[i] = NativeFileDatasource(datasource)
cdef vector[string] c_column_names
cdef vector[vector[size_type]] strps = stripes
c_column_names.reserve(len(column_names))
Expand Down
6 changes: 5 additions & 1 deletion python/cudf/cudf/io/orc.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ def read_orc(
use_index=True,
decimal_cols_as_float=None,
timestamp_type=None,
use_python_file_object=True,
**kwargs,
):
"""{docstring}"""
Expand Down Expand Up @@ -321,7 +322,10 @@ def read_orc(
source = fs.sep.join([source, "*.orc"])

tmp_source, compression = ioutils.get_filepath_or_buffer(
path_or_data=source, compression=None, **kwargs,
path_or_data=source,
compression=None,
use_python_file_object=use_python_file_object,
**kwargs,
)
if compression is not None:
raise ValueError(
Expand Down
33 changes: 31 additions & 2 deletions python/cudf/cudf/tests/test_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,9 @@ def test_read_json(s3_base, s3so):
assert_eq(expect, got)


def test_read_orc(s3_base, s3so, datadir):
@pytest.mark.parametrize("use_python_file_object", [False, True])
@pytest.mark.parametrize("columns", [None, ["string1"]])
def test_read_orc(s3_base, s3so, datadir, use_python_file_object, columns):
source_file = str(datadir / "orc" / "TestOrcFile.testSnappy.orc")
fname = "test_orc_reader.orc"
bname = "orc"
Expand All @@ -337,9 +339,36 @@ def test_read_orc(s3_base, s3so, datadir):

with s3_context(s3_base=s3_base, bucket=bname, files={fname: buffer}):
got = cudf.read_orc(
"s3://{}/{}".format(bname, fname), storage_options=s3so
"s3://{}/{}".format(bname, fname),
columns=columns,
storage_options=s3so,
use_python_file_object=use_python_file_object,
)

if columns:
expect = expect[columns]
assert_eq(expect, got)


@pytest.mark.parametrize("columns", [None, ["string1"]])
def test_read_orc_arrow_nativefile(s3_base, s3so, datadir, columns):
source_file = str(datadir / "orc" / "TestOrcFile.testSnappy.orc")
fname = "test_orc_reader.orc"
bname = "orc"
expect = pa.orc.ORCFile(source_file).read().to_pandas()

with open(source_file, "rb") as f:
buffer = f.read()

with s3_context(s3_base=s3_base, bucket=bname, files={fname: buffer}):
fs = pa_fs.S3FileSystem(
endpoint_override=s3so["client_kwargs"]["endpoint_url"],
)
with fs.open_input_file("{}/{}".format(bname, fname)) as fil:
got = cudf.read_orc(fil, columns=columns)

if columns:
expect = expect[columns]
assert_eq(expect, got)


Expand Down
4 changes: 4 additions & 0 deletions python/cudf/cudf/utils/ioutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,10 @@
decimal_cols_as_float: list, default None
If specified, names of the columns that should be converted from
Decimal to Float64 in the resulting dataframe.
use_python_file_object : boolean, default True
If True, Arrow-backed PythonFile objects will be used in place of fsspec
AbstractBufferedFile objects at IO time. This option is likely to improve
performance when making small reads from larger ORC files.
kwargs are passed to the engine
Returns
Expand Down

0 comments on commit f4ff454

Please sign in to comment.