Skip to content

Commit

Permalink
Use Arrow C Data Interface functions for Python interop (#15904)
Browse files Browse the repository at this point in the history
This PR replaces the internals of `from_arrow` in pylibcudf with an implementation that uses the [Arrow C Data Interface](https://arrow.apache.org/docs/format/CDataInterface.html) using the [Python Capsule interface](https://arrow.apache.org/docs/format/CDataInterface/PyCapsuleInterface.html). This allows us to decouple our Python builds from using pyarrow Cython (partially, we haven't replaced the `to_arrow` conversion yet) and it will also allow us to support any other Python package that is a producer of the data interface.

To support the above functionality, the following additional changes were needed in this PR:
- Added the ability to produce cudf tables from `ArrowArrayStream` objects since that is what `pyarrow.Table` produces. This function is a simple wrapper around the existing `from_arrrow(ArrowArray)` API.
- Added support for the large strings type, for which support has improved throughout cudf since the `from_arrow_host` API was added and for which we now require a basic overload for tests to pass. I did not add corresponding support for `from_arrow_device` to avoid ballooning the scope of this PR, so that work can be done in a follow-up.
- Proper handling of `type_id::EMPTY` in concatenate because the most natural implementation of the ArrowArrayStream processing is to run `from_arrow` on each chunk and then concatenate the outputs, and from the Python side we can produce chunks of all null arrays from arrow.

Contributes to #14926

Authors:
  - Vyas Ramasubramani (https://github.com/vyasr)

Approvers:
  - Matthew Roeschke (https://github.com/mroeschke)
  - Robert Maynard (https://github.com/robertmaynard)
  - David Wendt (https://github.com/davidwendt)

URL: #15904
  • Loading branch information
vyasr authored Jul 2, 2024
1 parent 08552f8 commit a4be7bd
Show file tree
Hide file tree
Showing 14 changed files with 466 additions and 25 deletions.
1 change: 1 addition & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,7 @@ add_library(
src/interop/to_arrow_device.cu
src/interop/from_arrow_device.cu
src/interop/from_arrow_host.cu
src/interop/from_arrow_stream.cu
src/interop/to_arrow_schema.cpp
src/interop/detail/arrow_allocator.cpp
src/io/avro/avro.cpp
Expand Down
38 changes: 30 additions & 8 deletions cpp/include/cudf/interop.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ struct ArrowSchema;

struct ArrowArray;

struct ArrowArrayStream;

namespace cudf {
/**
* @addtogroup interop_dlpack
Expand Down Expand Up @@ -367,10 +369,11 @@ std::unique_ptr<cudf::scalar> from_arrow(
* @param mr Device memory resource used to allocate `cudf::table`
* @return cudf table generated from given arrow data
*/
std::unique_ptr<cudf::table> from_arrow(ArrowSchema const* schema,
ArrowArray const* input,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr);
std::unique_ptr<cudf::table> from_arrow(
ArrowSchema const* schema,
ArrowArray const* input,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/**
* @brief Create `cudf::column` from a given ArrowArray and ArrowSchema input
Expand All @@ -385,10 +388,11 @@ std::unique_ptr<cudf::table> from_arrow(ArrowSchema const* schema,
* @param mr Device memory resource used to allocate `cudf::column`
* @return cudf column generated from given arrow data
*/
std::unique_ptr<cudf::column> from_arrow_column(ArrowSchema const* schema,
ArrowArray const* input,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr);
std::unique_ptr<cudf::column> from_arrow_column(
ArrowSchema const* schema,
ArrowArray const* input,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/**
* @brief Create `cudf::table` from given ArrowDeviceArray input
Expand All @@ -414,6 +418,24 @@ std::unique_ptr<table> from_arrow_host(
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/**
* @brief Create `cudf::table` from given ArrowArrayStream input
*
* @throws std::invalid_argument if input is NULL
*
* The conversion WILL release the input ArrayArrayStream and its constituent
* arrays or schema since Arrow streams are not suitable for multiple reads.
*
* @param input `ArrowArrayStream` pointer to object that will produce ArrowArray data
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource used to perform cuda allocation
* @return cudf table generated from the given Arrow data
*/
std::unique_ptr<table> from_arrow_stream(
ArrowArrayStream* input,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/**
* @brief Create `cudf::column` from given ArrowDeviceArray input
*
Expand Down
28 changes: 24 additions & 4 deletions cpp/src/copying/concatenate.cu
Original file line number Diff line number Diff line change
Expand Up @@ -463,10 +463,6 @@ void traverse_children::operator()<cudf::list_view>(host_span<column_view const>
*/
void bounds_and_type_check(host_span<column_view const> cols, rmm::cuda_stream_view stream)
{
CUDF_EXPECTS(cudf::all_have_same_types(cols.begin(), cols.end()),
"Type mismatch in columns to concatenate.",
cudf::data_type_error);

// total size of all concatenated rows
size_t const total_row_count =
std::accumulate(cols.begin(), cols.end(), std::size_t{}, [](size_t a, auto const& b) {
Expand All @@ -476,6 +472,21 @@ void bounds_and_type_check(host_span<column_view const> cols, rmm::cuda_stream_v
"Total number of concatenated rows exceeds the column size limit",
std::overflow_error);

if (std::any_of(cols.begin(), cols.end(), [](column_view const& c) {
return c.type().id() == cudf::type_id::EMPTY;
})) {
CUDF_EXPECTS(
std::all_of(cols.begin(),
cols.end(),
[](column_view const& c) { return c.type().id() == cudf::type_id::EMPTY; }),
"Mismatch in columns to concatenate.",
cudf::data_type_error);
return;
}
CUDF_EXPECTS(cudf::all_have_same_types(cols.begin(), cols.end()),
"Type mismatch in columns to concatenate.",
cudf::data_type_error);

// traverse children
cudf::type_dispatcher(cols.front().type(), traverse_children{}, cols, stream);
}
Expand All @@ -498,6 +509,15 @@ std::unique_ptr<column> concatenate(host_span<column_view const> columns_to_conc
return empty_like(columns_to_concat.front());
}

// For empty columns, we can just create an EMPTY column of the appropriate length.
if (columns_to_concat.front().type().id() == cudf::type_id::EMPTY) {
auto length = std::accumulate(
columns_to_concat.begin(), columns_to_concat.end(), 0, [](auto a, auto const& b) {
return a + b.size();
});
return std::make_unique<column>(
data_type(type_id::EMPTY), length, rmm::device_buffer{}, rmm::device_buffer{}, length);
}
return type_dispatcher<dispatch_storage_type>(
columns_to_concat.front().type(), concatenate_dispatch{columns_to_concat, stream, mr});
}
Expand Down
3 changes: 2 additions & 1 deletion cpp/src/interop/arrow_utilities.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ data_type arrow_to_cudf_type(ArrowSchemaView const* arrow_view)
case NANOARROW_TYPE_FLOAT: return data_type(type_id::FLOAT32);
case NANOARROW_TYPE_DOUBLE: return data_type(type_id::FLOAT64);
case NANOARROW_TYPE_DATE32: return data_type(type_id::TIMESTAMP_DAYS);
case NANOARROW_TYPE_STRING: return data_type(type_id::STRING);
case NANOARROW_TYPE_STRING:
case NANOARROW_TYPE_LARGE_STRING: return data_type(type_id::STRING);
case NANOARROW_TYPE_LIST: return data_type(type_id::LIST);
case NANOARROW_TYPE_DICTIONARY: return data_type(type_id::DICTIONARY32);
case NANOARROW_TYPE_STRUCT: return data_type(type_id::STRUCT);
Expand Down
3 changes: 3 additions & 0 deletions cpp/src/interop/from_arrow_device.cu
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,9 @@ dispatch_tuple_t dispatch_from_arrow_device::operator()<cudf::string_view>(
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
CUDF_EXPECTS(schema->type != NANOARROW_TYPE_LARGE_STRING,
"Large strings are not yet supported in from_arrow_device",
cudf::data_type_error);
if (input->length == 0) {
return std::make_tuple<column_view, owned_columns_t>(
{type,
Expand Down
32 changes: 26 additions & 6 deletions cpp/src/interop/from_arrow_host.cu
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,16 @@ std::unique_ptr<column> dispatch_copy_from_arrow_host::operator()<cudf::string_v

// chars_column does not contain any nulls, they are tracked by the parent string column
// itself instead. So we pass nullptr for the validity bitmask.
size_type const char_data_length =
reinterpret_cast<int32_t const*>(offset_buffers[1])[input->length + input->offset];
int64_t const char_data_length = [&]() {
if (schema->type == NANOARROW_TYPE_LARGE_STRING) {
return reinterpret_cast<int64_t const*>(offset_buffers[1])[input->length + input->offset];
} else if (schema->type == NANOARROW_TYPE_STRING) {
return static_cast<int64_t>(
reinterpret_cast<int32_t const*>(offset_buffers[1])[input->length + input->offset]);
} else {
CUDF_FAIL("Unsupported string type", cudf::data_type_error);
}
}();
void const* char_buffers[2] = {nullptr, input->buffers[2]};
ArrowArray char_array = {
.length = char_data_length,
Expand All @@ -210,15 +218,27 @@ std::unique_ptr<column> dispatch_copy_from_arrow_host::operator()<cudf::string_v
// offset and char data columns for us.
ArrowSchemaView view;
NANOARROW_THROW_NOT_OK(ArrowSchemaViewInit(&view, offset_schema.get(), nullptr));
auto offsets_column =
this->operator()<int32_t>(&view, &offsets_array, data_type(type_id::INT32), true);
auto offsets_column = [&]() {
if (schema->type == NANOARROW_TYPE_LARGE_STRING) {
return this->operator()<int64_t>(&view, &offsets_array, data_type(type_id::INT64), true);
} else if (schema->type == NANOARROW_TYPE_STRING) {
return this->operator()<int32_t>(&view, &offsets_array, data_type(type_id::INT32), true);
} else {
CUDF_FAIL("Unsupported string type", cudf::data_type_error);
}
}();
NANOARROW_THROW_NOT_OK(ArrowSchemaViewInit(&view, char_data_schema.get(), nullptr));
auto chars_column = this->operator()<int8_t>(&view, &char_array, data_type(type_id::INT8), true);

rmm::device_buffer chars(char_data_length, stream, mr);
CUDF_CUDA_TRY(cudaMemcpyAsync(chars.data(),
reinterpret_cast<uint8_t const*>(char_array.buffers[1]),
chars.size(),
cudaMemcpyDefault,
stream.value()));
auto const num_rows = offsets_column->size() - 1;
auto out_col = make_strings_column(num_rows,
std::move(offsets_column),
std::move(chars_column->release().data.release()[0]),
std::move(chars),
input->null_count,
std::move(*get_mask_buffer(input)));

Expand Down
143 changes: 143 additions & 0 deletions cpp/src/interop/from_arrow_stream.cu
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "arrow_utilities.hpp"

#include <cudf/column/column_factories.hpp>
#include <cudf/detail/concatenate.hpp>
#include <cudf/detail/nvtx/ranges.hpp>
#include <cudf/interop.hpp>
#include <cudf/table/table.hpp>

#include <rmm/cuda_stream_view.hpp>
#include <rmm/mr/device/device_memory_resource.hpp>
#include <rmm/mr/device/per_device_resource.hpp>

#include <nanoarrow/nanoarrow.h>
#include <nanoarrow/nanoarrow.hpp>

#include <memory>
#include <stdexcept>
#include <utility>
#include <vector>

namespace cudf {
namespace detail {

namespace {

std::unique_ptr<column> make_empty_column_from_schema(ArrowSchema const* schema,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
ArrowSchemaView schema_view;
NANOARROW_THROW_NOT_OK(ArrowSchemaViewInit(&schema_view, schema, nullptr));

auto const type{arrow_to_cudf_type(&schema_view)};
switch (type.id()) {
case type_id::EMPTY: {
return std::make_unique<column>(
data_type(type_id::EMPTY), 0, rmm::device_buffer{}, rmm::device_buffer{}, 0);
}
case type_id::LIST: {
return cudf::make_lists_column(0,
cudf::make_empty_column(data_type{type_id::INT32}),
make_empty_column_from_schema(schema->children[0], stream, mr),
0,
{},
stream,
mr);
}
case type_id::STRUCT: {
std::vector<std::unique_ptr<column>> child_columns;
child_columns.reserve(schema->n_children);
std::transform(
schema->children,
schema->children + schema->n_children,
std::back_inserter(child_columns),
[&](auto const& child) { return make_empty_column_from_schema(child, stream, mr); });
return cudf::make_structs_column(0, std::move(child_columns), 0, {}, stream, mr);
}
default: {
return cudf::make_empty_column(type);
}
}
}

} // namespace

std::unique_ptr<table> from_arrow_stream(ArrowArrayStream* input,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
CUDF_EXPECTS(input != nullptr, "input ArrowArrayStream must not be NULL", std::invalid_argument);

// Potential future optimization: Since the from_arrow API accepts an
// ArrowSchema we're allocating one here instead of using a view, which we
// could avoid with a different underlying implementation.
ArrowSchema schema;
NANOARROW_THROW_NOT_OK(ArrowArrayStreamGetSchema(input, &schema, nullptr));

std::vector<std::unique_ptr<cudf::table>> chunks;
ArrowArray chunk;
while (true) {
NANOARROW_THROW_NOT_OK(ArrowArrayStreamGetNext(input, &chunk, nullptr));
if (chunk.release == nullptr) { break; }
chunks.push_back(from_arrow(&schema, &chunk, stream, mr));
chunk.release(&chunk);
}
input->release(input);

if (chunks.empty()) {
if (schema.n_children == 0) {
schema.release(&schema);
return std::make_unique<cudf::table>();
}

// If there are no chunks but the schema has children, we need to construct a suitable empty
// table.
std::vector<std::unique_ptr<cudf::column>> columns;
columns.reserve(chunks.size());
std::transform(
schema.children,
schema.children + schema.n_children,
std::back_inserter(columns),
[&](auto const& child) { return make_empty_column_from_schema(child, stream, mr); });
schema.release(&schema);
return std::make_unique<cudf::table>(std::move(columns));
}

schema.release(&schema);

auto chunk_views = std::vector<table_view>{};
chunk_views.reserve(chunks.size());
std::transform(
chunks.begin(), chunks.end(), std::back_inserter(chunk_views), [](auto const& chunk) {
return chunk->view();
});
return cudf::detail::concatenate(chunk_views, stream, mr);
}

} // namespace detail

std::unique_ptr<table> from_arrow_stream(ArrowArrayStream* input,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
CUDF_FUNC_RANGE();
return detail::from_arrow_stream(input, stream, mr);
}
} // namespace cudf
1 change: 1 addition & 0 deletions cpp/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ ConfigureTest(
interop/from_arrow_test.cpp
interop/from_arrow_device_test.cpp
interop/from_arrow_host_test.cpp
interop/from_arrow_stream_test.cpp
interop/dlpack_test.cpp
EXTRA_LIB
nanoarrow
Expand Down
Loading

0 comments on commit a4be7bd

Please sign in to comment.