Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deprecate Arrow support in I/O #16132

Merged
merged 16 commits into from
Jul 20, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion python/cudf/cudf/_lib/pylibcudf/io/datasource.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ from pyarrow.lib cimport NativeFile
from cudf._lib.pylibcudf.libcudf.io.arrow_io_source cimport arrow_io_source
from cudf._lib.pylibcudf.libcudf.io.datasource cimport datasource

import warnings


cdef class Datasource:
cdef datasource* get_datasource(self) except * nogil:
Expand All @@ -16,10 +18,16 @@ cdef class Datasource:

cdef class NativeFileDatasource(Datasource):

def __cinit__(self, NativeFile native_file,):
def __cinit__(self, NativeFile native_file):

cdef shared_ptr[CRandomAccessFile] ra_src

warnings.warn(
"Support for reading pyarrow's NativeFile is deprecated "
"and will be removed in a future release of cudf.",
FutureWarning,
)

ra_src = native_file.get_random_access_file()
self.c_datasource.reset(new arrow_io_source(ra_src))

Expand Down
2 changes: 1 addition & 1 deletion python/cudf/cudf/io/csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def read_csv(
comment=None,
delim_whitespace=False,
byte_range=None,
use_python_file_object=True,
use_python_file_object=None,
storage_options=None,
bytes_per_thread=None,
):
Expand Down
34 changes: 23 additions & 11 deletions python/cudf/cudf/io/orc.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ def read_orc(
num_rows=None,
use_index=True,
timestamp_type=None,
use_python_file_object=True,
use_python_file_object=None,
lithomas1 marked this conversation as resolved.
Show resolved Hide resolved
storage_options=None,
bytes_per_thread=None,
):
Expand Down Expand Up @@ -320,7 +320,10 @@ def read_orc(
)

filepaths_or_buffers = []
saw_nativefile = False
for source in filepath_or_buffer:
if isinstance(source, pa.NativeFile):
saw_nativefile = True
if ioutils.is_directory(
path_or_data=source, storage_options=storage_options
):
Expand Down Expand Up @@ -360,17 +363,26 @@ def read_orc(
stripes = selected_stripes

if engine == "cudf":
return DataFrame._from_data(
*liborc.read_orc(
filepaths_or_buffers,
columns,
stripes,
skiprows,
num_rows,
use_index,
timestamp_type,
with warnings.catch_warnings():
# Don't want to warn if use_python_file_object causes us to get
# a NativeFile (there is a separate deprecation warning for that)
if not saw_nativefile:
lithomas1 marked this conversation as resolved.
Show resolved Hide resolved
warnings.filterwarnings(
action="ignore",
message="Support for reading pyarrow's NativeFile is deprecated",
category=FutureWarning,
)
return DataFrame._from_data(
*liborc.read_orc(
filepaths_or_buffers,
columns,
stripes,
skiprows,
num_rows,
use_index,
timestamp_type,
)
)
)
else:
from pyarrow import orc

Expand Down
43 changes: 28 additions & 15 deletions python/cudf/cudf/io/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import numpy as np
import pandas as pd
import pyarrow as pa
from pyarrow import dataset as ds

import cudf
Expand Down Expand Up @@ -342,7 +343,7 @@ def read_parquet_metadata(filepath_or_buffer):
path_or_data=source,
compression=None,
fs=fs,
use_python_file_object=True,
use_python_file_object=None,
open_file_options=None,
storage_options=None,
bytes_per_thread=None,
Expand Down Expand Up @@ -524,7 +525,7 @@ def read_parquet(
filters=None,
row_groups=None,
use_pandas_metadata=True,
use_python_file_object=True,
use_python_file_object=None,
categorical_partitions=True,
open_file_options=None,
bytes_per_thread=None,
Expand All @@ -539,6 +540,7 @@ def read_parquet(
)
# Do not allow the user to set file-opening options
# when `use_python_file_object=False` is specified
# TODO: what to do here??? deprecate this too?
lithomas1 marked this conversation as resolved.
Show resolved Hide resolved
if use_python_file_object is False:
if open_file_options:
raise ValueError(
Expand Down Expand Up @@ -607,7 +609,10 @@ def read_parquet(
row_groups=row_groups,
fs=fs,
)
saw_nativefile = False
for source in filepath_or_buffer:
if isinstance(source, pa.NativeFile):
saw_nativefile = True
lithomas1 marked this conversation as resolved.
Show resolved Hide resolved
tmp_source, compression = ioutils.get_reader_filepath_or_buffer(
path_or_data=source,
compression=None,
Expand Down Expand Up @@ -654,19 +659,27 @@ def read_parquet(
)

# Convert parquet data to a cudf.DataFrame
df = _parquet_to_frame(
filepaths_or_buffers,
engine,
*args,
columns=columns,
row_groups=row_groups,
use_pandas_metadata=use_pandas_metadata,
partition_keys=partition_keys,
partition_categories=partition_categories,
dataset_kwargs=dataset_kwargs,
**kwargs,
)

with warnings.catch_warnings():
# Don't want to warn if use_python_file_object causes us to get
# a NativeFile (there is a separate deprecation warning for that)
if not saw_nativefile:
warnings.filterwarnings(
action="ignore",
message="Support for reading pyarrow's NativeFile is deprecated",
category=FutureWarning,
)
df = _parquet_to_frame(
filepaths_or_buffers,
engine,
*args,
columns=columns,
row_groups=row_groups,
use_pandas_metadata=use_pandas_metadata,
partition_keys=partition_keys,
partition_categories=partition_categories,
dataset_kwargs=dataset_kwargs,
**kwargs,
)
# Apply filters row-wise (if any are defined), and return
df = _apply_post_filters(df, filters)
if projected_columns:
Expand Down
35 changes: 3 additions & 32 deletions python/cudf/cudf/pylibcudf_tests/test_source_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,44 +2,20 @@

import io

import pyarrow as pa
import pytest

import cudf._lib.pylibcudf as plc
from cudf._lib.pylibcudf.io.datasource import NativeFileDatasource


@pytest.mark.parametrize(
"source",
[
"a.txt",
b"hello world",
io.BytesIO(b"hello world"),
NativeFileDatasource(pa.PythonFile(io.BytesIO(), mode="r")),
],
)
def test_source_info_ctor(source, tmp_path):
if isinstance(source, str):
file = tmp_path / source
file.write_bytes("hello world".encode("utf-8"))
source = str(file)

plc.io.SourceInfo([source])

# TODO: test contents of source_info buffer is correct
# once buffers are exposed on python side


@pytest.mark.parametrize(
"sources",
[
["a.txt"],
[b"hello world"],
[io.BytesIO(b"hello world")],
["a.txt", "a.txt"],
[b"hello world", b"hello there"],
[io.BytesIO(b"hello world"), io.BytesIO(b"hello there")],
[
NativeFileDatasource(pa.PythonFile(io.BytesIO(), mode="r")),
NativeFileDatasource(pa.PythonFile(io.BytesIO(), mode="r")),
],
],
)
def test_source_info_ctor_multiple(sources, tmp_path):
Expand All @@ -66,11 +42,6 @@ def test_source_info_ctor_multiple(sources, tmp_path):
io.BytesIO(b"hello there"),
b"hello world",
],
[
NativeFileDatasource(pa.PythonFile(io.BytesIO(), mode="r")),
"awef.txt",
b"hello world",
],
],
)
def test_source_info_ctor_mixing_invalid(sources, tmp_path):
Expand Down
5 changes: 3 additions & 2 deletions python/cudf/cudf/tests/test_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -1085,8 +1085,9 @@ def test_csv_reader_arrow_nativefile(path_or_buf):
# Arrow FileSystem interface
expect = cudf.read_csv(path_or_buf("filepath"))
fs, path = pa_fs.FileSystem.from_uri(path_or_buf("filepath"))
with fs.open_input_file(path) as fil:
got = cudf.read_csv(fil)
with pytest.warns(FutureWarning):
with fs.open_input_file(path) as fil:
got = cudf.read_csv(fil)

assert_eq(expect, got)

Expand Down
3 changes: 2 additions & 1 deletion python/cudf/cudf/tests/test_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ def mock_size(*args):
# use_python_file_object=True, because the pyarrow
# `open_input_file` command will fail (since it doesn't
# use the monkey-patched `open` definition)
got = cudf.read_csv(f"gcs://{fpath}", use_python_file_object=False)
with pytest.warns(FutureWarning):
got = cudf.read_csv(f"gcs://{fpath}", use_python_file_object=False)
assert_eq(pdf, got)

# AbstractBufferedFile -> PythonFile conversion
Expand Down
19 changes: 11 additions & 8 deletions python/cudf/cudf/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -711,7 +711,8 @@ def test_parquet_reader_arrow_nativefile(parquet_path_or_buf):
expect = cudf.read_parquet(parquet_path_or_buf("filepath"))
fs, path = pa_fs.FileSystem.from_uri(parquet_path_or_buf("filepath"))
with fs.open_input_file(path) as fil:
got = cudf.read_parquet(fil)
with pytest.warns(FutureWarning):
got = cudf.read_parquet(fil)

assert_eq(expect, got)

Expand All @@ -726,16 +727,18 @@ def test_parquet_reader_use_python_file_object(
fs, _, paths = get_fs_token_paths(parquet_path_or_buf("filepath"))

# Pass open fsspec file
with fs.open(paths[0], mode="rb") as fil:
got1 = cudf.read_parquet(
fil, use_python_file_object=use_python_file_object
)
with pytest.warns(FutureWarning):
with fs.open(paths[0], mode="rb") as fil:
got1 = cudf.read_parquet(
fil, use_python_file_object=use_python_file_object
)
assert_eq(expect, got1)

# Pass path only
got2 = cudf.read_parquet(
paths[0], use_python_file_object=use_python_file_object
)
with pytest.warns(FutureWarning):
got2 = cudf.read_parquet(
paths[0], use_python_file_object=use_python_file_object
)
assert_eq(expect, got2)


Expand Down
Loading
Loading