Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deprecate Arrow support in I/O #16132

Merged
merged 16 commits into from
Jul 20, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion python/cudf/cudf/_lib/pylibcudf/io/datasource.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ from pyarrow.lib cimport NativeFile
from cudf._lib.pylibcudf.libcudf.io.arrow_io_source cimport arrow_io_source
from cudf._lib.pylibcudf.libcudf.io.datasource cimport datasource

import warnings


cdef class Datasource:
cdef datasource* get_datasource(self) except * nogil:
Expand All @@ -16,10 +18,16 @@ cdef class Datasource:

cdef class NativeFileDatasource(Datasource):

def __cinit__(self, NativeFile native_file,):
def __cinit__(self, NativeFile native_file):

cdef shared_ptr[CRandomAccessFile] ra_src

warnings.warn(
"Support for reading pyarrow's NativeFile is deprecated "
"and will be removed in a future release of cudf.",
FutureWarning,
)

ra_src = native_file.get_random_access_file()
self.c_datasource.reset(new arrow_io_source(ra_src))

Expand Down
2 changes: 1 addition & 1 deletion python/cudf/cudf/io/csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def read_csv(
comment=None,
delim_whitespace=False,
byte_range=None,
use_python_file_object=True,
use_python_file_object=None,
storage_options=None,
bytes_per_thread=None,
):
Expand Down
33 changes: 22 additions & 11 deletions python/cudf/cudf/io/orc.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from cudf._lib import orc as liborc
from cudf.api.types import is_list_like
from cudf.utils import ioutils
from cudf.utils.utils import maybe_filter_deprecation


def _make_empty_df(filepath_or_buffer, columns):
Expand Down Expand Up @@ -280,7 +281,7 @@ def read_orc(
num_rows=None,
use_index=True,
timestamp_type=None,
use_python_file_object=True,
use_python_file_object=None,
lithomas1 marked this conversation as resolved.
Show resolved Hide resolved
storage_options=None,
bytes_per_thread=None,
):
Expand Down Expand Up @@ -320,6 +321,9 @@ def read_orc(
)

filepaths_or_buffers = []
have_nativefile = any(
isinstance(source, pa.NativeFile) for source in filepath_or_buffer
)
for source in filepath_or_buffer:
if ioutils.is_directory(
path_or_data=source, storage_options=storage_options
Expand Down Expand Up @@ -360,17 +364,24 @@ def read_orc(
stripes = selected_stripes

if engine == "cudf":
return DataFrame._from_data(
*liborc.read_orc(
filepaths_or_buffers,
columns,
stripes,
skiprows,
num_rows,
use_index,
timestamp_type,
# Don't want to warn if use_python_file_object causes us to get
# a NativeFile (there is a separate deprecation warning for that)
with maybe_filter_deprecation(
not have_nativefile,
message="Support for reading pyarrow's NativeFile is deprecated",
category=FutureWarning,
):
return DataFrame._from_data(
*liborc.read_orc(
filepaths_or_buffers,
columns,
stripes,
skiprows,
num_rows,
use_index,
timestamp_type,
)
)
)
else:
from pyarrow import orc

Expand Down
41 changes: 27 additions & 14 deletions python/cudf/cudf/io/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import numpy as np
import pandas as pd
import pyarrow as pa
from pyarrow import dataset as ds

import cudf
Expand All @@ -23,6 +24,7 @@
from cudf.core.column import as_column, build_categorical_column, column_empty
from cudf.utils import ioutils
from cudf.utils.performance_tracking import _performance_tracking
from cudf.utils.utils import maybe_filter_deprecation

BYTE_SIZES = {
"kb": 1000,
Expand Down Expand Up @@ -350,7 +352,7 @@ def read_parquet_metadata(filepath_or_buffer):
path_or_data=source,
compression=None,
fs=fs,
use_python_file_object=True,
use_python_file_object=None,
open_file_options=None,
storage_options=None,
bytes_per_thread=None,
Expand Down Expand Up @@ -532,7 +534,7 @@ def read_parquet(
filters=None,
row_groups=None,
use_pandas_metadata=True,
use_python_file_object=True,
use_python_file_object=None,
categorical_partitions=True,
open_file_options=None,
bytes_per_thread=None,
Expand Down Expand Up @@ -615,6 +617,9 @@ def read_parquet(
row_groups=row_groups,
fs=fs,
)
have_nativefile = any(
isinstance(source, pa.NativeFile) for source in filepath_or_buffer
)
for source in filepath_or_buffer:
tmp_source, compression = ioutils.get_reader_filepath_or_buffer(
path_or_data=source,
Expand Down Expand Up @@ -662,19 +667,27 @@ def read_parquet(
)

# Convert parquet data to a cudf.DataFrame
df = _parquet_to_frame(
filepaths_or_buffers,
engine,
*args,
columns=columns,
row_groups=row_groups,
use_pandas_metadata=use_pandas_metadata,
partition_keys=partition_keys,
partition_categories=partition_categories,
dataset_kwargs=dataset_kwargs,
**kwargs,
)

# Don't want to warn if use_python_file_object causes us to get
# a NativeFile (there is a separate deprecation warning for that)
# with filtered_deprecation(
lithomas1 marked this conversation as resolved.
Show resolved Hide resolved
with maybe_filter_deprecation(
not have_nativefile,
message="Support for reading pyarrow's NativeFile is deprecated",
category=FutureWarning,
):
df = _parquet_to_frame(
filepaths_or_buffers,
engine,
*args,
columns=columns,
row_groups=row_groups,
use_pandas_metadata=use_pandas_metadata,
partition_keys=partition_keys,
partition_categories=partition_categories,
dataset_kwargs=dataset_kwargs,
**kwargs,
)
# Apply filters row-wise (if any are defined), and return
df = _apply_post_filters(df, filters)
if projected_columns:
Expand Down
21 changes: 5 additions & 16 deletions python/cudf/cudf/pylibcudf_tests/io/test_source_sink_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,9 @@

import io

import pyarrow as pa
import pytest

import cudf._lib.pylibcudf as plc
from cudf._lib.pylibcudf.io.datasource import NativeFileDatasource


@pytest.fixture(params=[plc.io.SourceInfo, plc.io.SinkInfo])
Expand All @@ -18,10 +16,8 @@ def _skip_invalid_sinks(io_class, sink):
"""
Skip invalid sinks for SinkInfo
"""
if io_class is plc.io.SinkInfo and isinstance(
sink, (bytes, NativeFileDatasource)
):
pytest.skip(f"{sink} is not a valid input for SinkInfo")
if io_class is plc.io.SinkInfo and isinstance(sink, bytes):
pytest.skip("bytes is not a valid input for SinkInfo")


@pytest.mark.parametrize(
Expand All @@ -30,7 +26,6 @@ def _skip_invalid_sinks(io_class, sink):
"a.txt",
b"hello world",
io.BytesIO(b"hello world"),
NativeFileDatasource(pa.PythonFile(io.BytesIO(), mode="r")),
],
)
def test_source_info_ctor(io_class, source, tmp_path):
Expand All @@ -47,13 +42,12 @@ def test_source_info_ctor(io_class, source, tmp_path):
@pytest.mark.parametrize(
"sources",
[
["a.txt"],
[b"hello world"],
[io.BytesIO(b"hello world")],
["a.txt", "a.txt"],
[b"hello world", b"hello there"],
[io.BytesIO(b"hello world"), io.BytesIO(b"hello there")],
[
NativeFileDatasource(pa.PythonFile(io.BytesIO(), mode="r")),
NativeFileDatasource(pa.PythonFile(io.BytesIO(), mode="r")),
],
],
)
def test_source_info_ctor_multiple(io_class, sources, tmp_path):
Expand All @@ -79,11 +73,6 @@ def test_source_info_ctor_multiple(io_class, sources, tmp_path):
io.BytesIO(b"hello there"),
b"hello world",
],
[
NativeFileDatasource(pa.PythonFile(io.BytesIO(), mode="r")),
"awef.txt",
b"hello world",
],
],
)
def test_source_info_ctor_mixing_invalid(io_class, sources, tmp_path):
Expand Down
5 changes: 3 additions & 2 deletions python/cudf/cudf/tests/test_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -1085,8 +1085,9 @@ def test_csv_reader_arrow_nativefile(path_or_buf):
# Arrow FileSystem interface
expect = cudf.read_csv(path_or_buf("filepath"))
fs, path = pa_fs.FileSystem.from_uri(path_or_buf("filepath"))
with fs.open_input_file(path) as fil:
got = cudf.read_csv(fil)
with pytest.warns(FutureWarning):
with fs.open_input_file(path) as fil:
got = cudf.read_csv(fil)

assert_eq(expect, got)

Expand Down
3 changes: 2 additions & 1 deletion python/cudf/cudf/tests/test_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ def mock_size(*args):
# use_python_file_object=True, because the pyarrow
# `open_input_file` command will fail (since it doesn't
# use the monkey-patched `open` definition)
got = cudf.read_csv(f"gcs://{fpath}", use_python_file_object=False)
with pytest.warns(FutureWarning):
got = cudf.read_csv(f"gcs://{fpath}", use_python_file_object=False)
assert_eq(pdf, got)

# AbstractBufferedFile -> PythonFile conversion
Expand Down
19 changes: 11 additions & 8 deletions python/cudf/cudf/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -711,7 +711,8 @@ def test_parquet_reader_arrow_nativefile(parquet_path_or_buf):
expect = cudf.read_parquet(parquet_path_or_buf("filepath"))
fs, path = pa_fs.FileSystem.from_uri(parquet_path_or_buf("filepath"))
with fs.open_input_file(path) as fil:
got = cudf.read_parquet(fil)
with pytest.warns(FutureWarning):
got = cudf.read_parquet(fil)

assert_eq(expect, got)

Expand All @@ -726,16 +727,18 @@ def test_parquet_reader_use_python_file_object(
fs, _, paths = get_fs_token_paths(parquet_path_or_buf("filepath"))

# Pass open fsspec file
with fs.open(paths[0], mode="rb") as fil:
got1 = cudf.read_parquet(
fil, use_python_file_object=use_python_file_object
)
with pytest.warns(FutureWarning):
with fs.open(paths[0], mode="rb") as fil:
got1 = cudf.read_parquet(
fil, use_python_file_object=use_python_file_object
)
assert_eq(expect, got1)

# Pass path only
got2 = cudf.read_parquet(
paths[0], use_python_file_object=use_python_file_object
)
with pytest.warns(FutureWarning):
got2 = cudf.read_parquet(
paths[0], use_python_file_object=use_python_file_object
)
assert_eq(expect, got2)


Expand Down
Loading
Loading