Skip to content

Commit 601dc3d

Browse files
EnricoMilidavidmpitrou
authored
apacheGH-32276: [C++][FlightRPC] Add option to align RecordBatch buffers given to IPC reader (apache#44279) (#6)
### Rationale for this change Data retrieved via IPC is expected to provide memory-aligned arrays, but data retrieved via C++ Flight client is mis-aligned. Datafusion (Rust), which requires data type-specific alignment, cannot handle such data: apache#43552. https://arrow.apache.org/docs/format/Columnar.html#buffer-alignment-and-padding ### What changes are included in this PR? This adds option `arrow::ipc::IpcReadOptions.ensure_alignment` of type `arrow::ipc::Alignment` to configure how RecordBatch array buffers decoded by IPC are realigned. It supports no realignment (default), data type-specific alignment and 64-byte alignment. Implementation mirrors that of [`align_buffers` in arrow-rs](https://github.com/apache/arrow-rs/blob/3293a8c2f9062fca93bee2210d540a1d25155bf5/arrow-data/src/data.rs#L698-L711) (apache/arrow-rs#4681). ### Are these changes tested? Configuration flag tested in unit test. Integration test with Flight server. Manually end-to-end tested that memory alignment fixes issue with reproduction code provided in apache#43552. ### Are there any user-facing changes? Adds option `IpcReadOptions.ensure_alignment` and enum type `Alignment`. * GitHub Issue: apache#32276 Lead-authored-by: Enrico Minack <[email protected]> Signed-off-by: David Li <[email protected]> Co-authored-by: David Li <[email protected]> Co-authored-by: Antoine Pitrou <[email protected]>
1 parent 50dfe89 commit 601dc3d

File tree

10 files changed

+252
-47
lines changed

10 files changed

+252
-47
lines changed

cpp/src/arrow/flight/integration_tests/flight_integration_test.cc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ TEST(FlightIntegration, AuthBasicProto) { ASSERT_OK(RunScenario("auth:basic_prot
5353

5454
TEST(FlightIntegration, Middleware) { ASSERT_OK(RunScenario("middleware")); }
5555

56+
TEST(FlightIntegration, Alignment) { ASSERT_OK(RunScenario("alignment")); }
57+
5658
TEST(FlightIntegration, Ordered) { ASSERT_OK(RunScenario("ordered")); }
5759

5860
TEST(FlightIntegration, ExpirationTimeDoGet) {

cpp/src/arrow/flight/integration_tests/test_integration.cc

Lines changed: 145 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
#include "arrow/table.h"
4646
#include "arrow/table_builder.h"
4747
#include "arrow/testing/gtest_util.h"
48+
#include "arrow/util/align_util.h"
4849
#include "arrow/util/checked_cast.h"
4950
#include "arrow/util/string.h"
5051
#include "arrow/util/value_parsing.h"
@@ -281,6 +282,137 @@ class MiddlewareScenario : public Scenario {
281282
std::shared_ptr<TestClientMiddlewareFactory> client_middleware_;
282283
};
283284

285+
/// \brief The server used for testing FlightClient data alignment.
286+
///
287+
/// The server always returns the same data of various byte widths.
288+
/// The client should return data that is aligned according to the data type
289+
/// if FlightCallOptions.read_options.ensure_memory_alignment is true.
290+
///
291+
/// This scenario is passed only when the client returns aligned data.
292+
class AlignmentServer : public FlightServerBase {
293+
Status GetFlightInfo(const ServerCallContext& context,
294+
const FlightDescriptor& descriptor,
295+
std::unique_ptr<FlightInfo>* result) override {
296+
auto schema = BuildSchema();
297+
std::vector<FlightEndpoint> endpoints{
298+
FlightEndpoint{{"align-data"}, {}, std::nullopt, ""}};
299+
ARROW_ASSIGN_OR_RAISE(
300+
auto info, FlightInfo::Make(*schema, descriptor, endpoints, -1, -1, false));
301+
*result = std::make_unique<FlightInfo>(info);
302+
return Status::OK();
303+
}
304+
305+
Status DoGet(const ServerCallContext& context, const Ticket& request,
306+
std::unique_ptr<FlightDataStream>* stream) override {
307+
if (request.ticket != "align-data") {
308+
return Status::KeyError("Could not find flight: ", request.ticket);
309+
}
310+
auto record_batch = RecordBatchFromJSON(BuildSchema(), R"([
311+
[1, 1, false],
312+
[2, 2, true],
313+
[3, 3, false]
314+
])");
315+
std::vector<std::shared_ptr<RecordBatch>> record_batches{record_batch};
316+
ARROW_ASSIGN_OR_RAISE(auto record_batch_reader,
317+
RecordBatchReader::Make(record_batches));
318+
*stream = std::make_unique<RecordBatchStream>(record_batch_reader);
319+
return Status::OK();
320+
}
321+
322+
private:
323+
std::shared_ptr<Schema> BuildSchema() {
324+
return arrow::schema({
325+
arrow::field("int32", arrow::int32(), false),
326+
arrow::field("int64", arrow::int64(), false),
327+
arrow::field("bool", arrow::boolean(), false),
328+
});
329+
}
330+
};
331+
332+
/// \brief The alignment scenario.
333+
///
334+
/// This tests that the client provides aligned data if requested.
335+
class AlignmentScenario : public Scenario {
336+
Status MakeServer(std::unique_ptr<FlightServerBase>* server,
337+
FlightServerOptions* options) override {
338+
server->reset(new AlignmentServer());
339+
return Status::OK();
340+
}
341+
342+
Status MakeClient(FlightClientOptions* options) override { return Status::OK(); }
343+
344+
arrow::Result<std::shared_ptr<Table>> GetTable(FlightClient* client,
345+
const FlightCallOptions& call_options) {
346+
ARROW_ASSIGN_OR_RAISE(auto info,
347+
client->GetFlightInfo(FlightDescriptor::Command("alignment")));
348+
std::vector<std::shared_ptr<arrow::Table>> tables;
349+
for (const auto& endpoint : info->endpoints()) {
350+
if (!endpoint.locations.empty()) {
351+
std::stringstream ss;
352+
ss << "[";
353+
for (const auto& location : endpoint.locations) {
354+
if (ss.str().size() != 1) {
355+
ss << ", ";
356+
}
357+
ss << location.ToString();
358+
}
359+
ss << "]";
360+
return Status::Invalid(
361+
"Expected to receive empty locations to use the original service: ",
362+
ss.str());
363+
}
364+
ARROW_ASSIGN_OR_RAISE(auto reader, client->DoGet(call_options, endpoint.ticket));
365+
ARROW_ASSIGN_OR_RAISE(auto table, reader->ToTable());
366+
tables.push_back(table);
367+
}
368+
return ConcatenateTables(tables);
369+
}
370+
371+
Status RunClient(std::unique_ptr<FlightClient> client) override {
372+
for (ipc::Alignment ensure_alignment :
373+
{ipc::Alignment::kAnyAlignment, ipc::Alignment::kDataTypeSpecificAlignment,
374+
ipc::Alignment::k64ByteAlignment}) {
375+
auto call_options = FlightCallOptions();
376+
call_options.read_options.ensure_alignment = ensure_alignment;
377+
ARROW_ASSIGN_OR_RAISE(auto table, GetTable(client.get(), call_options));
378+
379+
// Check read data
380+
auto expected_row_count = 3;
381+
if (table->num_rows() != expected_row_count) {
382+
return Status::Invalid("Read table size isn't expected\n", "Expected rows:\n",
383+
expected_row_count, "Actual rows:\n", table->num_rows());
384+
}
385+
auto expected_column_count = 3;
386+
if (table->num_columns() != expected_column_count) {
387+
return Status::Invalid("Read table size isn't expected\n", "Expected columns:\n",
388+
expected_column_count, "Actual columns:\n",
389+
table->num_columns());
390+
}
391+
// Check data alignment
392+
std::vector<bool> needs_alignment;
393+
if (ensure_alignment == ipc::Alignment::kAnyAlignment) {
394+
// this is not a requirement but merely an observation:
395+
// with ensure_alignment=false, flight client returns mis-aligned data
396+
// if this is not the case any more, feel free to remove this assertion
397+
if (util::CheckAlignment(*table, arrow::util::kValueAlignment,
398+
&needs_alignment)) {
399+
return Status::Invalid(
400+
"Read table has aligned data, which is good, but unprecedented");
401+
}
402+
} else {
403+
// with ensure_alignment != kValueAlignment, we require data to be aligned
404+
// the value of the Alignment enum provides us with the byte alignment value
405+
if (!util::CheckAlignment(*table, static_cast<int64_t>(ensure_alignment),
406+
&needs_alignment)) {
407+
return Status::Invalid("Read table has unaligned data");
408+
}
409+
}
410+
}
411+
412+
return Status::OK();
413+
}
414+
};
415+
284416
/// \brief The server used for testing FlightInfo.ordered.
285417
///
286418
/// If the given command is "ordered", the server sets
@@ -316,25 +448,16 @@ class OrderedServer : public FlightServerBase {
316448

317449
Status DoGet(const ServerCallContext& context, const Ticket& request,
318450
std::unique_ptr<FlightDataStream>* stream) override {
319-
ARROW_ASSIGN_OR_RAISE(auto builder, RecordBatchBuilder::Make(
320-
BuildSchema(), arrow::default_memory_pool()));
321-
auto number_builder = builder->GetFieldAs<Int32Builder>(0);
451+
std::shared_ptr<RecordBatch> record_batch;
322452
if (request.ticket == "1") {
323-
ARROW_RETURN_NOT_OK(number_builder->Append(1));
324-
ARROW_RETURN_NOT_OK(number_builder->Append(2));
325-
ARROW_RETURN_NOT_OK(number_builder->Append(3));
453+
record_batch = RecordBatchFromJSON(BuildSchema(), "[[1], [2], [3]]");
326454
} else if (request.ticket == "2") {
327-
ARROW_RETURN_NOT_OK(number_builder->Append(10));
328-
ARROW_RETURN_NOT_OK(number_builder->Append(20));
329-
ARROW_RETURN_NOT_OK(number_builder->Append(30));
455+
record_batch = RecordBatchFromJSON(BuildSchema(), "[[10], [20], [30]]");
330456
} else if (request.ticket == "3") {
331-
ARROW_RETURN_NOT_OK(number_builder->Append(100));
332-
ARROW_RETURN_NOT_OK(number_builder->Append(200));
333-
ARROW_RETURN_NOT_OK(number_builder->Append(300));
457+
record_batch = RecordBatchFromJSON(BuildSchema(), "[[100], [200], [300]]");
334458
} else {
335459
return Status::KeyError("Could not find flight: ", request.ticket);
336460
}
337-
ARROW_ASSIGN_OR_RAISE(auto record_batch, builder->Flush());
338461
std::vector<std::shared_ptr<RecordBatch>> record_batches{record_batch};
339462
ARROW_ASSIGN_OR_RAISE(auto record_batch_reader,
340463
RecordBatchReader::Make(record_batches));
@@ -390,19 +513,9 @@ class OrderedScenario : public Scenario {
390513

391514
// Build expected table
392515
auto schema = arrow::schema({arrow::field("number", arrow::int32(), false)});
393-
ARROW_ASSIGN_OR_RAISE(auto builder,
394-
RecordBatchBuilder::Make(schema, arrow::default_memory_pool()));
395-
auto number_builder = builder->GetFieldAs<Int32Builder>(0);
396-
ARROW_RETURN_NOT_OK(number_builder->Append(1));
397-
ARROW_RETURN_NOT_OK(number_builder->Append(2));
398-
ARROW_RETURN_NOT_OK(number_builder->Append(3));
399-
ARROW_RETURN_NOT_OK(number_builder->Append(10));
400-
ARROW_RETURN_NOT_OK(number_builder->Append(20));
401-
ARROW_RETURN_NOT_OK(number_builder->Append(30));
402-
ARROW_RETURN_NOT_OK(number_builder->Append(100));
403-
ARROW_RETURN_NOT_OK(number_builder->Append(200));
404-
ARROW_RETURN_NOT_OK(number_builder->Append(300));
405-
ARROW_ASSIGN_OR_RAISE(auto expected_record_batch, builder->Flush());
516+
auto expected_record_batch = RecordBatchFromJSON(schema, R"([
517+
[1], [2], [3], [10], [20], [30], [100], [200], [300]
518+
])");
406519
std::vector<std::shared_ptr<RecordBatch>> expected_record_batches{
407520
expected_record_batch};
408521
ARROW_ASSIGN_OR_RAISE(auto expected_table,
@@ -490,11 +603,8 @@ class ExpirationTimeServer : public FlightServerBase {
490603
}
491604
}
492605
status.num_gets++;
493-
ARROW_ASSIGN_OR_RAISE(auto builder, RecordBatchBuilder::Make(
494-
BuildSchema(), arrow::default_memory_pool()));
495-
auto number_builder = builder->GetFieldAs<UInt32Builder>(0);
496-
ARROW_RETURN_NOT_OK(number_builder->Append(index));
497-
ARROW_ASSIGN_OR_RAISE(auto record_batch, builder->Flush());
606+
auto record_batch =
607+
RecordBatchFromJSON(BuildSchema(), "[[" + std::to_string(index) + "]]");
498608
std::vector<std::shared_ptr<RecordBatch>> record_batches{record_batch};
499609
ARROW_ASSIGN_OR_RAISE(auto record_batch_reader,
500610
RecordBatchReader::Make(record_batches));
@@ -621,13 +731,7 @@ class ExpirationTimeDoGetScenario : public Scenario {
621731

622732
// Build expected table
623733
auto schema = arrow::schema({arrow::field("number", arrow::uint32(), false)});
624-
ARROW_ASSIGN_OR_RAISE(auto builder,
625-
RecordBatchBuilder::Make(schema, arrow::default_memory_pool()));
626-
auto number_builder = builder->GetFieldAs<UInt32Builder>(0);
627-
ARROW_RETURN_NOT_OK(number_builder->Append(0));
628-
ARROW_RETURN_NOT_OK(number_builder->Append(1));
629-
ARROW_RETURN_NOT_OK(number_builder->Append(2));
630-
ARROW_ASSIGN_OR_RAISE(auto expected_record_batch, builder->Flush());
734+
auto expected_record_batch = RecordBatchFromJSON(schema, "[[0], [1], [2]]");
631735
std::vector<std::shared_ptr<RecordBatch>> expected_record_batches{
632736
expected_record_batch};
633737
ARROW_ASSIGN_OR_RAISE(auto expected_table,
@@ -2382,6 +2486,9 @@ Status GetScenario(const std::string& scenario_name, std::shared_ptr<Scenario>*
23822486
} else if (scenario_name == "middleware") {
23832487
*out = std::make_shared<MiddlewareScenario>();
23842488
return Status::OK();
2489+
} else if (scenario_name == "alignment") {
2490+
*out = std::make_shared<AlignmentScenario>();
2491+
return Status::OK();
23852492
} else if (scenario_name == "ordered") {
23862493
*out = std::make_shared<OrderedScenario>();
23872494
return Status::OK();

cpp/src/arrow/ipc/options.h

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
#include "arrow/ipc/type_fwd.h"
2626
#include "arrow/status.h"
2727
#include "arrow/type_fwd.h"
28+
#include "arrow/util/align_util.h"
2829
#include "arrow/util/compression.h"
2930
#include "arrow/util/visibility.h"
3031

@@ -128,6 +129,18 @@ struct ARROW_EXPORT IpcWriteOptions {
128129
static IpcWriteOptions Defaults();
129130
};
130131

132+
/// \brief Alignment of data in memory
133+
/// Alignment values larger than 0 are taken directly as byte alignment value
134+
/// See util::EnsureAlignment(..., int64_t alignment, ...)
135+
enum class Alignment : int64_t {
136+
/// \brief data is aligned depending on the actual data type
137+
kDataTypeSpecificAlignment = util::kValueAlignment,
138+
/// \brief no particular alignment enforced
139+
kAnyAlignment = 0,
140+
/// \brief data is aligned to 64-byte boundary
141+
k64ByteAlignment = 64
142+
};
143+
131144
/// \brief Options for reading Arrow IPC messages
132145
struct ARROW_EXPORT IpcReadOptions {
133146
/// \brief The maximum permitted schema nesting depth.
@@ -161,6 +174,16 @@ struct ARROW_EXPORT IpcReadOptions {
161174
/// RecordBatchStreamReader and StreamDecoder classes.
162175
bool ensure_native_endian = true;
163176

177+
/// \brief How to align data if mis-aligned
178+
///
179+
/// Data is copied to aligned memory locations allocated via the
180+
/// MemoryPool configured as \ref arrow::ipc::IpcReadOptions::memory_pool.
181+
/// Some use cases might require data to have a specific alignment, for example,
182+
/// for the data buffer of an Int32 array to be aligned on a 4-byte boundary.
183+
///
184+
/// Default (kAnyAlignment) keeps the alignment as is, so no copy of data occurs.
185+
Alignment ensure_alignment = Alignment::kAnyAlignment;
186+
164187
/// \brief Options to control caching behavior when pre-buffering is requested
165188
///
166189
/// The lazy property will always be reset to true to deliver the expected behavior

cpp/src/arrow/ipc/reader.cc

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
#include "arrow/table.h"
4848
#include "arrow/type.h"
4949
#include "arrow/type_traits.h"
50+
#include "arrow/util/align_util.h"
5051
#include "arrow/util/bit_util.h"
5152
#include "arrow/util/bitmap_ops.h"
5253
#include "arrow/util/checked_cast.h"
@@ -636,8 +637,17 @@ Result<std::shared_ptr<RecordBatch>> LoadRecordBatchSubset(
636637
arrow::internal::SwapEndianArrayData(filtered_column));
637638
}
638639
}
639-
return RecordBatch::Make(std::move(filtered_schema), metadata->length(),
640-
std::move(filtered_columns));
640+
auto batch = RecordBatch::Make(std::move(filtered_schema), metadata->length(),
641+
std::move(filtered_columns));
642+
643+
if (ARROW_PREDICT_FALSE(context.options.ensure_alignment != Alignment::kAnyAlignment)) {
644+
return util::EnsureAlignment(batch,
645+
// the numerical value of ensure_alignment enum is taken
646+
// literally as byte alignment
647+
static_cast<int64_t>(context.options.ensure_alignment),
648+
context.options.memory_pool);
649+
}
650+
return batch;
641651
}
642652

643653
Result<std::shared_ptr<RecordBatch>> LoadRecordBatch(

cpp/src/arrow/util/align_util.cc

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
#include "arrow/array.h"
2121
#include "arrow/chunked_array.h"
22+
#include "arrow/extension_type.h"
2223
#include "arrow/record_batch.h"
2324
#include "arrow/table.h"
2425
#include "arrow/type_fwd.h"
@@ -28,6 +29,8 @@
2829

2930
namespace arrow {
3031

32+
using internal::checked_cast;
33+
3134
namespace util {
3235

3336
bool CheckAlignment(const Buffer& buffer, int64_t alignment) {
@@ -44,9 +47,13 @@ namespace {
4447
Type::type GetTypeForBuffers(const ArrayData& array) {
4548
Type::type type_id = array.type->storage_id();
4649
if (type_id == Type::DICTIONARY) {
47-
return ::arrow::internal::checked_pointer_cast<DictionaryType>(array.type)
48-
->index_type()
49-
->id();
50+
// return index type id, provided by the DictionaryType array.type or
51+
// array.type->storage_type() if array.type is an ExtensionType
52+
DataType* dict_type = array.type.get();
53+
if (array.type->id() == Type::EXTENSION) {
54+
dict_type = checked_cast<ExtensionType*>(dict_type)->storage_type().get();
55+
}
56+
return checked_cast<DictionaryType*>(dict_type)->index_type()->id();
5057
}
5158
return type_id;
5259
}

docs/source/cpp/flight.rst

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,17 @@ Memory management
239239
-----------------
240240

241241
Flight tries to reuse allocations made by gRPC to avoid redundant
242-
data copies. However, this means that those allocations may not
242+
data copies. However, experience shows that such data is frequently
243+
misaligned. Some use cases might require data to have data type-specific
244+
alignment (for example, for the data buffer of an Int32 array to be aligned
245+
on a 4-byte boundary), which can be enforced
246+
by setting :member:`arrow::ipc::IpcReadOptions::ensure_alignment`
247+
to :member:`arrow::ipc::Alignment::kDataTypeSpecificAlignment`.
248+
This uses the :member:`arrow::ipc::IpcReadOptions::memory_pool`
249+
to allocate memory with aligned addresses, but only for mis-aligned data.
250+
However, this creates data copies of your data received via Flight.
251+
252+
Unless gRPC data are copied as described above, allocations made by gRPC may not
243253
be tracked by the Arrow memory pool, and that memory usage behavior,
244254
such as whether free memory is returned to the system, is dependent
245255
on the allocator that gRPC uses (usually the system allocator).
@@ -361,5 +371,4 @@ Closing unresponsive connections
361371
.. _ARROW-15764: https://issues.apache.org/jira/browse/ARROW-15764
362372
.. _ARROW-16697: https://issues.apache.org/jira/browse/ARROW-16697
363373
.. _ARROW-6062: https://issues.apache.org/jira/browse/ARROW-6062
364-
365374
.. _gRPC: https://grpc.io/

python/pyarrow/includes/libarrow.pxd

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1874,12 +1874,18 @@ cdef extern from "arrow/ipc/api.h" namespace "arrow::ipc" nogil:
18741874
@staticmethod
18751875
CIpcWriteOptions Defaults()
18761876

1877+
ctypedef enum CAlignment" arrow::ipc::Alignment":
1878+
CAlignment_Any" arrow::ipc::Alignment::kAnyAlignment"
1879+
CAlignment_DataTypeSpecific" arrow::ipc::Alignment::kDataTypeSpecificAlignment"
1880+
CAlignment_64Byte" arrow::ipc::Alignment::k64ByteAlignment"
1881+
18771882
cdef cppclass CIpcReadOptions" arrow::ipc::IpcReadOptions":
18781883
int max_recursion_depth
18791884
CMemoryPool* memory_pool
18801885
vector[int] included_fields
18811886
c_bool use_threads
18821887
c_bool ensure_native_endian
1888+
CAlignment ensure_alignment
18831889

18841890
@staticmethod
18851891
CIpcReadOptions Defaults()

0 commit comments

Comments
 (0)