Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(ci): Ensure integration workflow passes #643

Merged
merged 11 commits into from
Oct 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ services:
- ${NANOARROW_DOCKER_SOURCE_DIR}:/arrow-integration/nanoarrow
environment:
ARCHERY_INTEGRATION_TARGET_IMPLEMENTATIONS: "nanoarrow"
# Rust writes invalid flatbuffers:
# https://github.com/apache/arrow-rs/issues/5052
ARCHERY_INTEGRATION_WITH_RUST: "0"
command:
["echo '::group::Build nanoarrow' &&
conda run --no-capture-output /arrow-integration/ci/scripts/nanoarrow_build.sh /arrow-integration /build &&
Expand Down
16 changes: 9 additions & 7 deletions src/nanoarrow/ipc/decoder.c
Original file line number Diff line number Diff line change
Expand Up @@ -1024,12 +1024,11 @@ static inline int ArrowIpcDecoderReadHeaderPrefix(struct ArrowIpcDecoder* decode

ArrowErrorCode ArrowIpcDecoderPeekHeader(struct ArrowIpcDecoder* decoder,
struct ArrowBufferView data,
int32_t* prefix_size_bytes,
struct ArrowError* error) {
ArrowIpcDecoderResetHeaderInfo(decoder);
int32_t prefix_size_bytes;
NANOARROW_RETURN_NOT_OK(
ArrowIpcDecoderReadHeaderPrefix(decoder, &data, &prefix_size_bytes, error));
NANOARROW_UNUSED(prefix_size_bytes);
ArrowIpcDecoderReadHeaderPrefix(decoder, &data, prefix_size_bytes, error));
return NANOARROW_OK;
}

Expand Down Expand Up @@ -1187,15 +1186,15 @@ ArrowErrorCode ArrowIpcDecoderDecodeHeader(struct ArrowIpcDecoder* decoder,
decoder->body_size_bytes = ns(Message_bodyLength(message));

switch (decoder->metadata_version) {
case ns(MetadataVersion_V5):
case ns(MetadataVersion_V4):
case ns(MetadataVersion_V5):
break;
case ns(MetadataVersion_V1):
case ns(MetadataVersion_V2):
case ns(MetadataVersion_V3):
ArrowErrorSet(error, "Expected metadata version V4 or V5 but found %s",
ns(MetadataVersion_name(ns(Message_version(message)))));
return EINVAL;
case ns(MetadataVersion_V1):
case ns(MetadataVersion_V2):
case ns(MetadataVersion_V3):
default:
ArrowErrorSet(error, "Unexpected value for Message metadata version (%d)",
decoder->metadata_version);
Expand Down Expand Up @@ -1286,6 +1285,9 @@ ArrowErrorCode ArrowIpcDecoderDecodeFooter(struct ArrowIpcDecoder* decoder,
data.data.as_uint8 + data.size_bytes - footer_and_size_and_magic_size;
ns(Footer_table_t) footer = ns(Footer_as_root(footer_data));

NANOARROW_RETURN_NOT_OK(
ArrowIpcDecoderDecodeSchemaHeader(decoder, ns(Footer_schema(footer)), error));

NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderDecodeSchemaImpl(
ns(Footer_schema(footer)), &private_data->footer.schema, error));

Expand Down
5 changes: 4 additions & 1 deletion src/nanoarrow/ipc/decoder_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,10 @@ TEST(NanoarrowIpcTest, NanoarrowIpcPeekSimpleSchema) {
data.size_bytes = sizeof(kSimpleSchema);

ArrowIpcDecoderInit(&decoder);
EXPECT_EQ(ArrowIpcDecoderPeekHeader(&decoder, data, &error), NANOARROW_OK);
int32_t prefix_size_bytes = 0;
EXPECT_EQ(ArrowIpcDecoderPeekHeader(&decoder, data, &prefix_size_bytes, &error),
NANOARROW_OK);
EXPECT_EQ(prefix_size_bytes, 8);
EXPECT_EQ(decoder.header_size_bytes, sizeof(kSimpleSchema));
EXPECT_EQ(decoder.body_size_bytes, 0);

Expand Down
83 changes: 77 additions & 6 deletions src/nanoarrow/ipc/reader.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@
#define ENODATA 120
#endif

// Sentinel value to indicate that we haven't read a message yet
// and don't know the number of header prefix bytes to expect.
static const int32_t kExpectedHeaderPrefixSizeNotSet = -1;

void ArrowIpcInputStreamMove(struct ArrowIpcInputStream* src,
struct ArrowIpcInputStream* dst) {
memcpy(dst, src, sizeof(struct ArrowIpcInputStream));
Expand Down Expand Up @@ -186,6 +190,7 @@ struct ArrowIpcArrayStreamReaderPrivate {
int64_t field_index;
struct ArrowBuffer header;
struct ArrowBuffer body;
int32_t expected_header_prefix_size;
struct ArrowError error;
};

Expand Down Expand Up @@ -230,6 +235,21 @@ static int ArrowIpcArrayStreamReaderNextHeader(
// propagated higher (e.g., if the stream is empty and there's no schema message)
ArrowErrorSet(&private_data->error, "No data available on stream");
return ENODATA;
} else if (bytes_read == 4 && private_data->expected_header_prefix_size == 4) {
// Special case very, very old IPC streams that used 0x00000000 as the
// end-of-stream indicator. We may want to remove this case at some point:
// https://github.com/apache/arrow-nanoarrow/issues/648
uint32_t last_four_bytes = 0;
memcpy(&last_four_bytes, private_data->header.data, sizeof(uint32_t));
if (last_four_bytes == 0) {
ArrowErrorSet(&private_data->error, "No data available on stream");
return ENODATA;
} else {
ArrowErrorSet(&private_data->error,
"Expected 0x00000000 if exactly four bytes are available at the end "
"of a stream");
return EINVAL;
}
} else if (bytes_read != 8) {
ArrowErrorSet(&private_data->error,
"Expected at least 8 bytes in remainder of stream");
Expand All @@ -241,17 +261,58 @@ static int ArrowIpcArrayStreamReaderNextHeader(
input_view.size_bytes = private_data->header.size_bytes;

// Use PeekHeader to fill in decoder.header_size_bytes
NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderPeekHeader(&private_data->decoder, input_view,
&private_data->error));
int32_t prefix_size_bytes = 0;
NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderPeekHeader(
&private_data->decoder, input_view, &prefix_size_bytes, &private_data->error));

// Check for a consistent header prefix size
if (private_data->expected_header_prefix_size != kExpectedHeaderPrefixSizeNotSet &&
prefix_size_bytes != private_data->expected_header_prefix_size) {
ArrowErrorSet(&private_data->error,
"Expected prefix %d prefix header bytes but found %d",
(int)private_data->expected_header_prefix_size, (int)prefix_size_bytes);
return EINVAL;
} else {
private_data->expected_header_prefix_size = prefix_size_bytes;
}

// Legacy streams are missing the 0xFFFFFFFF at the start of the message. The
// decoder can handle this; however, verification will fail because flatbuffers
// must be 8-byte aligned. To handle this case, we prepend the continuation
// token to the start of the stream and ensure that we read four fewer bytes
// the next time we issue a read. We may be able to remove this case in the future:
// https://github.com/apache/arrow-nanoarrow/issues/648
int64_t extra_bytes_already_read = 0;
if (prefix_size_bytes == 4) {
NANOARROW_RETURN_NOT_OK_WITH_ERROR(ArrowBufferReserve(&private_data->header, 4),
&private_data->error);
memmove(private_data->header.data + 4, private_data->header.data,
private_data->header.size_bytes);
uint32_t continuation = 0xFFFFFFFFU;
memcpy(private_data->header.data, &continuation, sizeof(uint32_t));
private_data->header.size_bytes += 4;
extra_bytes_already_read = 4;

input_view.data.data = private_data->header.data;
input_view.size_bytes = private_data->header.size_bytes;

int32_t new_prefix_size_bytes;
NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderPeekHeader(&private_data->decoder, input_view,
&new_prefix_size_bytes,
&private_data->error));
NANOARROW_DCHECK(new_prefix_size_bytes == 8);
}

// Read the header bytes
int64_t expected_header_bytes = private_data->decoder.header_size_bytes - 8;
NANOARROW_RETURN_NOT_OK_WITH_ERROR(
ArrowBufferReserve(&private_data->header, expected_header_bytes),
ArrowBufferReserve(&private_data->header,
expected_header_bytes - extra_bytes_already_read),
&private_data->error);
NANOARROW_RETURN_NOT_OK(
private_data->input.read(&private_data->input, private_data->header.data + 8,
expected_header_bytes, &bytes_read, &private_data->error));
NANOARROW_RETURN_NOT_OK(private_data->input.read(
&private_data->input, private_data->header.data + private_data->header.size_bytes,
expected_header_bytes - extra_bytes_already_read, &bytes_read,
&private_data->error));
private_data->header.size_bytes += bytes_read;

// Verify + decode the header
Expand All @@ -260,6 +321,15 @@ static int ArrowIpcArrayStreamReaderNextHeader(
NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderVerifyHeader(&private_data->decoder, input_view,
&private_data->error));

// If we have a 4-byte header prefix, make sure the metadata version is V4
// (Note that some V4 IPC files have an 8 byte header prefix).
if (prefix_size_bytes == 4 &&
private_data->decoder.metadata_version != NANOARROW_IPC_METADATA_VERSION_V4) {
ArrowErrorSet(&private_data->error,
"Header prefix size of four bytes is only allowed for V4 metadata");
return EINVAL;
}

// Don't decode the message if it's of the wrong type (because the error message
// is better communicated by the caller)
if (private_data->decoder.message_type != message_type) {
Expand Down Expand Up @@ -445,6 +515,7 @@ ArrowErrorCode ArrowIpcArrayStreamReaderInit(
ArrowBufferInit(&private_data->body);
private_data->out_schema.release = NULL;
ArrowIpcInputStreamMove(input_stream, &private_data->input);
private_data->expected_header_prefix_size = kExpectedHeaderPrefixSizeNotSet;

if (options != NULL) {
private_data->field_index = options->field_index;
Expand Down
5 changes: 5 additions & 0 deletions src/nanoarrow/nanoarrow_ipc.h
Original file line number Diff line number Diff line change
Expand Up @@ -251,8 +251,13 @@ void ArrowIpcDecoderReset(struct ArrowIpcDecoder* decoder);
/// these bytes and returns ESPIPE if there are not enough remaining bytes in data to read
/// the entire header message, EINVAL if the first 8 bytes are not valid, ENODATA if the
/// Arrow end-of-stream indicator has been reached, or NANOARROW_OK otherwise.
///
/// Pre-1.0 messages were not prefixed with 0xFFFFFFFF. For these messages, a value
/// of 4 will be placed into prefix_size_bytes; otherwise a value of 8 will be placed
/// into prefix_size_bytes.
ArrowErrorCode ArrowIpcDecoderPeekHeader(struct ArrowIpcDecoder* decoder,
struct ArrowBufferView data,
int32_t* prefix_size_bytes,
struct ArrowError* error);

/// \brief Verify a message header
Expand Down
Loading