Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions python/cudf/cudf/pandas/scripts/conftest-patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -10635,6 +10635,8 @@ def set_copy_on_write_option():
"tests/window/test_pairwise.py::TestPairwise::test_no_flex[pairwise_frames5-<lambda>1]",
"tests/window/test_pairwise.py::TestPairwise::test_no_flex[pairwise_frames8-<lambda>1]",
"tests/window/test_win_type.py::test_cmov_window_frame[None-var-xp2]",
"tests/series/indexing/test_indexing.py::test_basic_indexing",
"tests/indexing/test_iloc.py::TestILocErrors::test_iloc_float_raises[string-python-Series]",
}


Expand Down
14 changes: 9 additions & 5 deletions python/pylibcudf/pylibcudf/contiguous_split.pxd
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
# SPDX-FileCopyrightText: Copyright (c) 2024-2025, NVIDIA CORPORATION.
# SPDX-FileCopyrightText: Copyright (c) 2024-2026, NVIDIA CORPORATION.
# SPDX-License-Identifier: Apache-2.0

from libc.stdint cimport uint8_t
from libc.stdint cimport uint8_t, uintptr_t
from libc.stddef cimport size_t
from libcpp cimport bool
from libcpp.memory cimport unique_ptr
from libcpp.vector cimport vector
from pylibcudf.libcudf.contiguous_split cimport packed_columns, chunked_pack
from pylibcudf.libcudf.types cimport bitmask_type
from pylibcudf.libcudf.utilities.span cimport device_span
from rmm.pylibrmm.device_buffer cimport DeviceBuffer
from rmm.pylibrmm.memory_resource cimport DeviceMemoryResource
from rmm.pylibrmm.stream cimport Stream
Expand All @@ -15,6 +17,8 @@ from .gpumemoryview cimport gpumemoryview
from .table cimport Table


cdef device_span[uint8_t] _get_device_span(object obj) except *
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want this to be in pylibcudf.utils and be capable of creating different types of device_spans (e.g. device_span<byte> which I'll need for cudf::table_to_array). I tried to use a fused type as the template type, but Cython cannot infer the type I want. Here is the compile error I get.

Return type is not specified as argument type

Since the input here input is just object object, the cython cannot infer the return type. Any ideas here @vyasr?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cython doesn't support this natively. The options are either to write separate Cython functions that return each concrete type and then having a wrapper function that dispatches to the right one (e.g. based on an enum value) or to use inline C++ to define the function so that you can export a templated version directly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the only APIs that are really relevant for #21069 are in contiguous split and reshape (please correct me if I'm missing something). Since its only two, its probably okay to have two seperate functions to create device_span[uint8_t] and a device_span[byte].

I'm going to punt the reshape API (ie. pylibcudf.reshape.table_to_array) to a follow up PR because it would be a breaking change. Currently the API takes in a ptr and a size. The new API would take Span, so we'd probably have to deprecate it.

Aside: There are other places like in I/O types and null mask. But they really only need type checks using is_span (I've added them in this PR).


cdef class HostBuffer:
cdef unique_ptr[vector[uint8_t]] c_obj
cdef size_t nbytes
Expand Down Expand Up @@ -46,10 +50,10 @@ cdef class ChunkedPack:
cdef Stream stream

cpdef bool has_next(self)
cpdef size_t next(self, DeviceBuffer buf)
cpdef size_t next(self, object buf)
cpdef size_t get_total_contiguous_size(self)
cpdef memoryview build_metadata(self)
cpdef tuple pack_to_host(self, DeviceBuffer buf)
cpdef tuple pack_to_host(self, object buf)


cpdef PackedColumns pack(Table input)
Expand All @@ -58,6 +62,6 @@ cpdef Table unpack(PackedColumns input, Stream stream=*)

cpdef Table unpack_from_memoryviews(
memoryview metadata,
gpumemoryview gpu_data,
object gpu_data,
Stream stream=*,
)
10 changes: 5 additions & 5 deletions python/pylibcudf/pylibcudf/contiguous_split.pyi
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
# SPDX-FileCopyrightText: Copyright (c) 2024-2025, NVIDIA CORPORATION.
# SPDX-FileCopyrightText: Copyright (c) 2024-2026, NVIDIA CORPORATION.
# SPDX-License-Identifier: Apache-2.0

from rmm import DeviceBuffer
from rmm.mr import DeviceMemoryResource
from rmm.pylibrmm.stream import Stream

from pylibcudf.gpumemoryview import gpumemoryview
from pylibcudf.span import Span
from pylibcudf.table import Table

class PackedColumns:
Expand All @@ -18,7 +18,7 @@ def pack(input: Table, stream: Stream | None = None) -> PackedColumns: ...
def unpack(input: PackedColumns, stream: Stream | None = None) -> Table: ...
def unpack_from_memoryviews(
metadata: memoryview[bytes],
gpu_data: gpumemoryview,
gpu_data: Span,
stream: Stream | None = None,
) -> Table: ...

Expand All @@ -33,8 +33,8 @@ class ChunkedPack:
) -> ChunkedPack: ...
def has_next(self) -> bool: ...
def get_total_contiguous_size(self) -> int: ...
def next(self, buf: DeviceBuffer) -> int: ...
def next(self, buf: Span) -> int: ...
def build_metadata(self) -> memoryview[bytes]: ...
def pack_to_host(
self, buf: DeviceBuffer
self, buf: Span
) -> tuple[memoryview[bytes], memoryview[bytes]]: ...
46 changes: 27 additions & 19 deletions python/pylibcudf/pylibcudf/contiguous_split.pyx
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# SPDX-FileCopyrightText: Copyright (c) 2024-2025, NVIDIA CORPORATION.
# SPDX-FileCopyrightText: Copyright (c) 2024-2026, NVIDIA CORPORATION.
# SPDX-License-Identifier: Apache-2.0

from cython.operator cimport dereference
Expand Down Expand Up @@ -33,6 +33,7 @@ from rmm.pylibrmm.stream cimport Stream

from .gpumemoryview cimport gpumemoryview
from .table cimport Table
from .span import is_span
from .utils cimport _get_stream, _get_memory_resource


Expand All @@ -44,6 +45,14 @@ __all__ = [
"unpack_from_memoryviews",
]

cdef device_span[uint8_t] _get_device_span(object obj) except *:
if not is_span(obj):
raise TypeError(
f"Object of type {type(obj)} does not implement the Span protocol"
)

return device_span[uint8_t](<uint8_t*><uintptr_t>obj.ptr, <size_t>obj.size)

cdef class HostBuffer:
"""Owning host buffer that implements the buffer protocol"""
@staticmethod
Expand Down Expand Up @@ -212,13 +221,13 @@ cdef class ChunkedPack:
with nogil:
return dereference(self.c_obj).get_total_contiguous_size()

cpdef size_t next(self, DeviceBuffer buf):
cpdef size_t next(self, object buf):
"""
Pack the next chunk into the provided device buffer.

Parameters
----------
buf
buf : Span-like object
The device buffer to use as a staging buffer, must be at
least as large as the `user_buffer_size` used to construct the
packer.
Expand All @@ -232,9 +241,8 @@ cdef class ChunkedPack:
This is stream-ordered with respect to the stream used when
creating the `ChunkedPack`.
"""
cdef device_span[uint8_t] d_span = device_span[uint8_t](
<uint8_t *>buf.c_data(), buf.c_size()
)
cdef device_span[uint8_t] d_span = _get_device_span(buf)

with nogil:
return dereference(self.c_obj).next(d_span)

Expand All @@ -251,16 +259,16 @@ cdef class ChunkedPack:
metadata = move(dereference(self.c_obj).build_metadata())
return memoryview(HostBuffer.from_unique_ptr(move(metadata)))

cpdef tuple pack_to_host(self, DeviceBuffer buf):
cpdef tuple pack_to_host(self, object buf):
"""
Pack the entire table into a host buffer.

Parameters
----------
buf
The device buffer to use as a staging buffer, must be at
least as large as the `user_buffer_size` used to construct the
packer.
buf : Span-like object
The device buffer to use as a staging buffer, must be at
least as large as the `user_buffer_size` used to construct the
packer.

Returns
-------
Expand All @@ -279,10 +287,8 @@ cdef class ChunkedPack:
"""
cdef size_t offset = 0
cdef size_t size
cdef device_span[uint8_t] d_span = device_span[uint8_t](
<uint8_t *>buf.c_data(), buf.c_size()
)
cdef cudaError_t err
cdef device_span[uint8_t] d_span = _get_device_span(buf)
cdef cudaError_t err = cudaError.cudaSuccess
cdef unique_ptr[vector[uint8_t]] h_buf = (
make_unique[vector[uint8_t]](
dereference(self.c_obj).get_total_contiguous_size()
Expand Down Expand Up @@ -379,7 +385,7 @@ cpdef Table unpack(PackedColumns input, Stream stream=None):

cpdef Table unpack_from_memoryviews(
memoryview metadata,
gpumemoryview gpu_data,
object gpu_data,
Stream stream=None,
):
"""Deserialize the result of `pack`.
Expand All @@ -392,7 +398,7 @@ cpdef Table unpack_from_memoryviews(
----------
metadata : memoryview
The packed metadata to unpack.
gpu_data : gpumemoryview
gpu_data : Span-like object
The packed gpu_data to unpack.
stream : Stream | None
CUDA stream on which to perform the operation.
Expand All @@ -403,8 +409,10 @@ cpdef Table unpack_from_memoryviews(
Copy of the packed columns.
"""
stream = _get_stream(stream)
cdef device_span[uint8_t] d_span = _get_device_span(gpu_data)

if metadata.nbytes == 0:
if gpu_data.__cuda_array_interface__["data"][0] != 0:
if d_span.data() != NULL:
raise ValueError("Expected an empty gpu_data from unpacking an empty table")
# For an empty table we just attach the default mr since it will not be
# used for any operations.
Expand All @@ -417,7 +425,7 @@ cpdef Table unpack_from_memoryviews(
# Extract the raw data pointers
cdef const uint8_t[::1] _metadata = metadata
cdef const uint8_t* metadata_ptr = &_metadata[0]
cdef const uint8_t* gpu_data_ptr = <uint8_t*>gpu_data.ptr
cdef const uint8_t* gpu_data_ptr = d_span.data()

cdef table_view v
with nogil:
Expand Down
18 changes: 9 additions & 9 deletions python/pylibcudf/pylibcudf/io/types.pyx
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
# SPDX-FileCopyrightText: Copyright (c) 2024-2025, NVIDIA CORPORATION.
# SPDX-FileCopyrightText: Copyright (c) 2024-2026, NVIDIA CORPORATION.
# SPDX-License-Identifier: Apache-2.0
from cpython.buffer cimport PyBUF_READ
from cpython.memoryview cimport PyMemoryView_FromMemory

from cython.operator cimport dereference

from libc.stdint cimport int32_t, uint8_t
from libc.stdint cimport int32_t, uint8_t, uintptr_t

from libcpp cimport bool
from libcpp.memory cimport unique_ptr
Expand All @@ -28,11 +28,12 @@ from pylibcudf.libcudf.io.types cimport (
table_with_metadata,
)
from pylibcudf.libcudf.types cimport size_type
from pylibcudf.libcudf.utilities.span cimport host_span, device_span
from pylibcudf.libcudf.utilities.span cimport device_span, host_span

from pylibcudf.span import is_span

from rmm.pylibrmm.device_buffer cimport DeviceBuffer
from rmm.pylibrmm.stream cimport Stream
from rmm.pylibrmm.memory_resource cimport DeviceMemoryResource
from rmm.pylibrmm.stream cimport Stream

import codecs
import errno
Expand Down Expand Up @@ -525,14 +526,13 @@ cdef class SourceInfo:
self._init_byte_like_sources(sources, bytes)
elif isinstance(sources[0], io.BytesIO):
self._init_byte_like_sources(sources, io.BytesIO)
elif isinstance(sources[0], DeviceBuffer):
if not all(isinstance(s, DeviceBuffer) for s in sources):
elif is_span(sources[0]):
if not all(is_span(s) for s in sources):
raise ValueError("All sources must be of the same type!")
self.device_sources = sources
for buf in sources:
d_buf = <DeviceBuffer>buf
d_span = device_span[const_byte](
<const_byte *>d_buf.c_data(), d_buf.c_size()
<const_byte *><uintptr_t>buf.ptr, <size_t>buf.size
)
d_spans.push_back(d_span)
self.c_obj = move(source_info(host_span[device_span[const_byte]](d_spans)))
Expand Down
12 changes: 12 additions & 0 deletions python/pylibcudf/pylibcudf/null_mask.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ from rmm.pylibrmm.memory_resource cimport DeviceMemoryResource

from pylibcudf.libcudf.types import mask_state as MaskState # no-cython-lint

from .span import is_span as py_is_span

from .column cimport Column
from .table cimport Table
from .utils cimport _get_stream, _get_memory_resource
Expand Down Expand Up @@ -99,6 +101,11 @@ cpdef DeviceBuffer copy_bitmask_from_bitmask(
A ``DeviceBuffer`` containing ``col``'s bitmask, or an empty
``DeviceBuffer`` if ``col`` is not nullable
"""
if not py_is_span(bitmask):
raise TypeError(
f"bitmask must satisfy Span protocol (have .ptr and .size), "
f"got {type(bitmask).__name__}"
)
cdef device_buffer db
stream = _get_stream(stream)
mr = _get_memory_resource(mr)
Expand Down Expand Up @@ -263,6 +270,11 @@ cpdef size_type null_count(
int
The number of null elements in the specified range.
"""
if not py_is_span(bitmask):
raise TypeError(
f"bitmask must satisfy Span protocol (have .ptr and .size), "
f"got {type(bitmask).__name__}"
)
cdef uintptr_t ptr = bitmask.ptr
stream = _get_stream(stream)
with nogil:
Expand Down
21 changes: 19 additions & 2 deletions python/pylibcudf/tests/io/test_parquet.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# SPDX-FileCopyrightText: Copyright (c) 2024-2025, NVIDIA CORPORATION.
# SPDX-FileCopyrightText: Copyright (c) 2024-2026, NVIDIA CORPORATION.
# SPDX-License-Identifier: Apache-2.0
import io

Expand Down Expand Up @@ -169,16 +169,32 @@ def test_read_parquet_filters(
)


class FooSpan:
def __init__(self, owner):
# Keep the owning object alive
self._data = owner

@property
def ptr(self):
return self._data.ptr

@property
def size(self):
return self._data.size


@pytest.mark.parametrize("num_buffers", [1, 2])
@pytest.mark.parametrize("stream", [None, Stream()])
@pytest.mark.parametrize("columns", [None, ["col_int64", "col_bool"]])
@pytest.mark.parametrize("use_foo_span", [False, True])
def test_read_parquet_from_device_buffers(
table_data,
binary_source_or_sink,
nrows_skiprows,
stream,
columns,
num_buffers,
use_foo_span,
):
_, pa_table = table_data
nrows, skiprows = nrows_skiprows
Expand All @@ -188,9 +204,10 @@ def test_read_parquet_from_device_buffers(
binary_source_or_sink, pa_table, **_COMMON_PARQUET_SOURCE_KWARGS
)

buf = DeviceBuffer.to_device(
rmm_buf = DeviceBuffer.to_device(
get_bytes_from_source(source), plc.utils._get_stream(stream)
)
buf = FooSpan(rmm_buf) if use_foo_span else rmm_buf

options = plc.io.parquet.ParquetReaderOptions.builder(
plc.io.SourceInfo([buf] * num_buffers)
Expand Down
15 changes: 14 additions & 1 deletion python/pylibcudf/tests/test_contiguous_split.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# SPDX-FileCopyrightText: Copyright (c) 2024-2025, NVIDIA CORPORATION.
# SPDX-FileCopyrightText: Copyright (c) 2024-2026, NVIDIA CORPORATION.
# SPDX-License-Identifier: Apache-2.0

import string
Expand Down Expand Up @@ -82,3 +82,16 @@ def test_chunked_pack(bufsize, stream):
)

assert_table_eq(h_table, result)


def test_unpack_from_memoryviews_empty_metadata_non_empty_data():
empty_metadata = memoryview(b"")
non_empty_data = plc.gpumemoryview(rmm.DeviceBuffer(size=64))

with pytest.raises(
ValueError,
match="Expected an empty gpu_data from unpacking an empty table",
):
plc.contiguous_split.unpack_from_memoryviews(
empty_metadata, non_empty_data
)
Loading
Loading