Skip to content

Commit

Permalink
feat: Add IPC writer scaffolding (#564)
Browse files Browse the repository at this point in the history
Add `ArrowIpcEncoder`, init/reset, and tests. Extracted from
#555 (review)
  • Loading branch information
bkietz authored Jul 25, 2024
1 parent d6368d0 commit 2040e74
Show file tree
Hide file tree
Showing 8 changed files with 256 additions and 64 deletions.
67 changes: 27 additions & 40 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,9 @@ if(NANOARROW_IPC)
endif()

if(NOT NANOARROW_BUNDLE)
set(NANOARROW_IPC_BUILD_SOURCES src/nanoarrow/ipc/decoder.c
src/nanoarrow/ipc/reader.c)
set(NANOARROW_IPC_BUILD_SOURCES
src/nanoarrow/ipc/decoder.c src/nanoarrow/ipc/encoder.c
src/nanoarrow/ipc/reader.c)
endif()

add_library(nanoarrow_ipc ${NANOARROW_IPC_BUILD_SOURCES})
Expand Down Expand Up @@ -418,51 +419,37 @@ if(NANOARROW_BUILD_TESTS)
endif()

enable_testing()

add_executable(nanoarrow_ipc_decoder_test src/nanoarrow/ipc/decoder_test.cc)
add_executable(nanoarrow_ipc_reader_test src/nanoarrow/ipc/reader_test.cc)
add_executable(nanoarrow_ipc_files_test src/nanoarrow/ipc/files_test.cc)
add_executable(nanoarrow_ipc_hpp_test src/nanoarrow/ipc/ipc_hpp_test.cc)
include(GoogleTest)

if(NANOARROW_CODE_COVERAGE)
target_compile_options(ipc_coverage_config INTERFACE -O0 -g --coverage)
target_link_options(ipc_coverage_config INTERFACE --coverage)
target_link_libraries(nanoarrow_ipc PRIVATE ipc_coverage_config)
endif()
target_link_libraries(nanoarrow_ipc_decoder_test
nanoarrow_ipc
nanoarrow
flatccrt
${NANOARROW_ARROW_TARGET}
gtest_main
ipc_coverage_config)
target_link_libraries(nanoarrow_ipc_reader_test
nanoarrow_ipc
nanoarrow
flatccrt
gtest_main
ipc_coverage_config)
target_link_libraries(nanoarrow_ipc_files_test
nanoarrow_ipc
nanoarrow
flatccrt
${NANOARROW_ARROW_TARGET}
nlohmann_json
ZLIB::ZLIB
gtest_main
ipc_coverage_config)
target_link_libraries(nanoarrow_ipc_hpp_test
nanoarrow_ipc
nanoarrow
${NANOARROW_ARROW_TARGET}
gtest_main
ipc_coverage_config)

include(GoogleTest)
gtest_discover_tests(nanoarrow_ipc_decoder_test)
gtest_discover_tests(nanoarrow_ipc_reader_test)
gtest_discover_tests(nanoarrow_ipc_files_test)
gtest_discover_tests(nanoarrow_ipc_hpp_test)
foreach(name
decoder
encoder
reader
files
ipc_hpp)
add_executable(nanoarrow_ipc_${name}_test src/nanoarrow/ipc/${name}_test.cc)

target_link_libraries(nanoarrow_ipc_${name}_test
nanoarrow_ipc
nanoarrow
${NANOARROW_ARROW_TARGET}
gtest_main
ipc_coverage_config)

if(NOT (name MATCHES "_hpp_"))
target_link_libraries(nanoarrow_ipc_${name}_test flatccrt)
endif()

gtest_discover_tests(nanoarrow_ipc_${name}_test)
endforeach()

target_link_libraries(nanoarrow_ipc_files_test nlohmann_json ZLIB::ZLIB)
endif()

if(NANOARROW_DEVICE)
Expand Down
1 change: 1 addition & 0 deletions ci/scripts/bundle.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ def bundle_nanoarrow_ipc(
[
src_dir / "ipc" / "flatcc_generated.h",
src_dir / "ipc" / "decoder.c",
src_dir / "ipc" / "encoder.c",
src_dir / "ipc" / "reader.c",
]
)
Expand Down
11 changes: 0 additions & 11 deletions src/nanoarrow/ipc/decoder.c
Original file line number Diff line number Diff line change
Expand Up @@ -110,17 +110,6 @@ ArrowErrorCode ArrowIpcCheckRuntime(struct ArrowError* error) {
return NANOARROW_OK;
}

static enum ArrowIpcEndianness ArrowIpcSystemEndianness(void) {
uint32_t check = 1;
char first_byte;
memcpy(&first_byte, &check, sizeof(char));
if (first_byte) {
return NANOARROW_IPC_ENDIANNESS_LITTLE;
} else {
return NANOARROW_IPC_ENDIANNESS_BIG;
}
}

#if NANOARROW_IPC_USE_STDATOMIC
struct ArrowIpcSharedBufferPrivate {
struct ArrowBuffer src;
Expand Down
14 changes: 1 addition & 13 deletions src/nanoarrow/ipc/decoder_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@

using namespace arrow;

// Copied from nanoarrow_ipc.c so we can test the internal state
// of the decoder
// Copied from decoder.c so we can test the internal state
extern "C" {
struct ArrowIpcField {
struct ArrowArrayView* array_view;
Expand All @@ -51,17 +50,6 @@ struct ArrowIpcDecoderPrivate {
};
}

static enum ArrowIpcEndianness ArrowIpcSystemEndianness(void) {
uint32_t check = 1;
char first_byte;
memcpy(&first_byte, &check, sizeof(char));
if (first_byte) {
return NANOARROW_IPC_ENDIANNESS_LITTLE;
} else {
return NANOARROW_IPC_ENDIANNESS_BIG;
}
}

TEST(NanoarrowIpcCheckRuntime, CheckRuntime) {
EXPECT_EQ(ArrowIpcCheckRuntime(nullptr), NANOARROW_OK);
}
Expand Down
84 changes: 84 additions & 0 deletions src/nanoarrow/ipc/encoder.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include <errno.h>
#include <inttypes.h>
#include <stdio.h>
#include <string.h>

#include "flatcc/flatcc_builder.h"
#include "nanoarrow/nanoarrow.h"
#include "nanoarrow/nanoarrow_ipc.h"

struct ArrowIpcEncoderPrivate {
flatcc_builder_t builder;
struct ArrowBuffer buffers;
struct ArrowBuffer nodes;
};

ArrowErrorCode ArrowIpcEncoderInit(struct ArrowIpcEncoder* encoder) {
NANOARROW_DCHECK(encoder != NULL);
memset(encoder, 0, sizeof(struct ArrowIpcEncoder));
encoder->encode_buffer = NULL;
encoder->encode_buffer_state = NULL;
encoder->codec = NANOARROW_IPC_COMPRESSION_TYPE_NONE;
encoder->private_data = ArrowMalloc(sizeof(struct ArrowIpcEncoderPrivate));
struct ArrowIpcEncoderPrivate* private =
(struct ArrowIpcEncoderPrivate*)encoder->private_data;
if (flatcc_builder_init(&private->builder) == -1) {
ArrowFree(private);
return ESPIPE;
}
ArrowBufferInit(&private->buffers);
ArrowBufferInit(&private->nodes);
return NANOARROW_OK;
}

void ArrowIpcEncoderReset(struct ArrowIpcEncoder* encoder) {
NANOARROW_DCHECK(encoder != NULL && encoder->private_data != NULL);
struct ArrowIpcEncoderPrivate* private =
(struct ArrowIpcEncoderPrivate*)encoder->private_data;
flatcc_builder_clear(&private->builder);
ArrowBufferReset(&private->nodes);
ArrowBufferReset(&private->buffers);
ArrowFree(private);
memset(encoder, 0, sizeof(struct ArrowIpcEncoder));
}

ArrowErrorCode ArrowIpcEncoderFinalizeBuffer(struct ArrowIpcEncoder* encoder,
struct ArrowBuffer* out) {
NANOARROW_DCHECK(encoder != NULL && encoder->private_data != NULL && out != NULL);
struct ArrowIpcEncoderPrivate* private =
(struct ArrowIpcEncoderPrivate*)encoder->private_data;

int64_t size = (int64_t)flatcc_builder_get_buffer_size(&private->builder);
if (size == 0) {
// Finalizing an empty flatcc_builder_t triggers an assertion
return NANOARROW_OK;
}

void* data = flatcc_builder_get_direct_buffer(&private->builder, NULL);
if (data == NULL) {
return ENOMEM;
}

NANOARROW_RETURN_NOT_OK(ArrowBufferAppend(out, data, size));

// don't deallocate yet, just wipe the builder's current Message
flatcc_builder_reset(&private->builder);
return NANOARROW_OK;
}
62 changes: 62 additions & 0 deletions src/nanoarrow/ipc/encoder_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include <gtest/gtest.h>

#include "flatcc/flatcc_builder.h"
#include "nanoarrow/nanoarrow.hpp"
#include "nanoarrow/nanoarrow_ipc.hpp"

// Copied from encoder.c so we can test the internal state
extern "C" {
struct ArrowIpcEncoderPrivate {
flatcc_builder_t builder;
struct ArrowBuffer buffers;
struct ArrowBuffer nodes;
};
}

TEST(NanoarrowIpcTest, NanoarrowIpcEncoderConstruction) {
nanoarrow::ipc::UniqueEncoder encoder;

EXPECT_EQ(ArrowIpcEncoderInit(encoder.get()), NANOARROW_OK);

EXPECT_EQ(encoder->codec, NANOARROW_IPC_COMPRESSION_TYPE_NONE);
EXPECT_EQ(encoder->body_length, 0);
EXPECT_EQ(encoder->encode_buffer, nullptr);
EXPECT_EQ(encoder->encode_buffer_state, nullptr);

auto* priv = static_cast<struct ArrowIpcEncoderPrivate*>(encoder->private_data);
ASSERT_NE(priv, nullptr);
for (auto* b : {&priv->buffers, &priv->nodes}) {
// Buffers are empty but initialized with the default allocator
EXPECT_EQ(b->size_bytes, 0);

auto default_allocator = ArrowBufferAllocatorDefault();
EXPECT_EQ(memcmp(&b->allocator, &default_allocator, sizeof(b->allocator)), 0);
}

// Empty buffer works
nanoarrow::UniqueBuffer buffer;
EXPECT_EQ(ArrowIpcEncoderFinalizeBuffer(encoder.get(), buffer.get()), NANOARROW_OK);
EXPECT_EQ(buffer->size_bytes, 0);

// Append a string (finalizing an empty buffer is an error for flatcc_builder_t)
EXPECT_NE(flatcc_builder_create_string_str(&priv->builder, "hello world"), 0);
EXPECT_EQ(ArrowIpcEncoderFinalizeBuffer(encoder.get(), buffer.get()), NANOARROW_OK);
EXPECT_GT(buffer->size_bytes, sizeof("hello world"));
}
62 changes: 62 additions & 0 deletions src/nanoarrow/nanoarrow_ipc.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@
NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcInputStreamMove)
#define ArrowIpcArrayStreamReaderInit \
NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcArrayStreamReaderInit)
#define ArrowIpcEncoderInit NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcEncoderInit)
#define ArrowIpcEncoderReset NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcEncoderReset)
#define ArrowIpcEncoderFinalizeBuffer \
NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcEncoderFinalizeBuffer)

#endif

Expand Down Expand Up @@ -117,6 +121,18 @@ enum ArrowIpcCompressionType {
/// \brief Checks the nanoarrow runtime to make sure the run/build versions match
ArrowErrorCode ArrowIpcCheckRuntime(struct ArrowError* error);

/// \brief Get the endianness of the current runtime
static inline enum ArrowIpcEndianness ArrowIpcSystemEndianness(void) {
uint32_t check = 1;
char first_byte;
memcpy(&first_byte, &check, sizeof(char));
if (first_byte) {
return NANOARROW_IPC_ENDIANNESS_LITTLE;
} else {
return NANOARROW_IPC_ENDIANNESS_BIG;
}
}

/// \brief A structure representing a reference-counted buffer that may be passed to
/// ArrowIpcDecoderDecodeArrayFromShared().
struct ArrowIpcSharedBuffer {
Expand Down Expand Up @@ -379,6 +395,52 @@ ArrowErrorCode ArrowIpcArrayStreamReaderInit(
struct ArrowArrayStream* out, struct ArrowIpcInputStream* input_stream,
struct ArrowIpcArrayStreamReaderOptions* options);

/// \brief Encoder for Arrow IPC messages
///
/// This structure is intended to be allocated by the caller,
/// initialized using ArrowIpcEncoderInit(), and released with
/// ArrowIpcEncoderReset().
struct ArrowIpcEncoder {
/// \brief Compression to encode in the next RecordBatch message.
enum ArrowIpcCompressionType codec;

/// \brief Callback invoked against each buffer to be encoded
///
/// Encoding of buffers is left as a callback to accommodate dissociated data storage.
/// One implementation of this callback might copy all buffers into a contiguous body
/// for use in an arrow IPC stream, another implementation might store offsets and
/// lengths relative to a known arena.
ArrowErrorCode (*encode_buffer)(struct ArrowBufferView buffer_view,
struct ArrowIpcEncoder* encoder, int64_t* offset,
int64_t* length, struct ArrowError* error);

/// \brief Pointer to arbitrary data used by encode_buffer()
void* encode_buffer_state;

/// \brief Finalized body length of the most recently encoded RecordBatch message
///
/// (This is initially 0 and encode_buffer() is expected to update it. After all
/// buffers are encoded, this will be written to the RecordBatch's .bodyLength)
int64_t body_length;

/// \brief Private resources managed by this library
void* private_data;
};

/// \brief Initialize an encoder
///
/// If NANOARROW_OK is returned, the caller must call ArrowIpcEncoderReset()
/// to release resources allocated by this function.
ArrowErrorCode ArrowIpcEncoderInit(struct ArrowIpcEncoder* encoder);

/// \brief Release all resources attached to an encoder
void ArrowIpcEncoderReset(struct ArrowIpcEncoder* encoder);

/// \brief Finalize the most recently encoded message to a buffer
///
/// The bytes of the encoded message will be appended to the provided buffer.
ArrowErrorCode ArrowIpcEncoderFinalizeBuffer(struct ArrowIpcEncoder* encoder,
struct ArrowBuffer* out);
/// @}

#ifdef __cplusplus
Expand Down
Loading

0 comments on commit 2040e74

Please sign in to comment.